Linux內(nèi)核中的等待隊(duì)列
Linux 內(nèi)核的等待隊(duì)列是以雙循環(huán)鏈表為基礎(chǔ)數(shù)據(jù)結(jié)構(gòu),與進(jìn)程調(diào)度機(jī)制緊密結(jié)合,能夠用于實(shí)現(xiàn)核心的異步事件通知機(jī)制。在Linux2.4.21中,等待隊(duì)列在源 代碼樹include/linux/wait.h中,這是一個(gè)通過(guò)list_head連接的典型雙循環(huán)鏈表,如下圖所示。 在這個(gè)鏈表中,有兩種數(shù)據(jù)結(jié)構(gòu):等待隊(duì)列頭(wait_queue_head_t)和等待隊(duì)列項(xiàng)(wait_queue_t)。等待隊(duì) 列頭和等待隊(duì)列項(xiàng)中都包含一個(gè)list_head類型的域作為"連接件"。由于我們只需要對(duì)隊(duì)列進(jìn)行添加和刪除操作,并不會(huì)修改其中的對(duì)象(等待隊(duì)列 項(xiàng)),因此,我們只需要提供一把保護(hù)整個(gè)基礎(chǔ)設(shè)施和所有對(duì)象的鎖,這把鎖保存在等待隊(duì)列頭中,為wq_lock_t類型。在實(shí)現(xiàn)中,可以支持讀寫鎖 (rwlock)或自旋鎖(spinlock)兩種類型,通過(guò)一個(gè)宏定義來(lái)切換。如果使用讀寫鎖,將wq_lock_t定義為rwlock_t類型;如果 是自旋鎖,將wq_lock_t定義為spinlock_t類型。無(wú)論哪種情況,分別相應(yīng)設(shè)置wq_read_lock、wq_read_unlock、 wq_read_lock_irqsave、wq_read_unlock_irqrestore、wq_write_lock_irq、 wq_write_unlock、wq_write_lock_irqsave和wq_write_unlock_irqrestore等宏。 等待隊(duì)列頭 struct __wait_queue_head { wq_lock_t lock; struct list_head task_list; }; typedef struct __wait_queue_head wait_queue_head_t; 前面已經(jīng)說(shuō)過(guò),等待隊(duì)列的主體是進(jìn)程,這反映在每個(gè)等待隊(duì)列項(xiàng)中,是一個(gè)任務(wù)結(jié)構(gòu)指針(struct task_struct * task)。flags為該進(jìn)程的等待標(biāo)志,當(dāng)前只支持互斥。 等待隊(duì)列項(xiàng) struct __wait_queue { unsigned int flags; #define WQ_FLAG_EXCLUSIVE 0x01 struct task_struct * task; struct list_head task_list; }; typedef struct __wait_queue wait_queue_t; 聲明和初始化 #define DECLARE_WAITQUEUE(name, tsk) \ wait_queue_t name = __WAITQUEUE_INITIALIZER(name, tsk) #define __WAITQUEUE_INITIALIZER(name, tsk) { \ task: tsk, \ task_list: { NULL, NULL }, \ __WAITQUEUE_DEBUG_INIT(name)} 通過(guò)DECLARE_WAITQUEUE宏將等待隊(duì)列項(xiàng)初始化成對(duì)應(yīng)的任務(wù)結(jié)構(gòu),并且用于連接的相關(guān)指針均設(shè)置為空。其中加入了調(diào)試相關(guān)代碼。 #define DECLARE_WAIT_QUEUE_HEAD(name) \ wait_queue_head_t name = __WAIT_QUEUE_HEAD_INITIALIZER(name) #define __WAIT_QUEUE_HEAD_INITIALIZER(name) { \ lock: WAITQUEUE_RW_LOCK_UNLOCKED, \ task_list: { &(name).task_list, &(name).task_list }, \ __WAITQUEUE_HEAD_DEBUG_INIT(name)} 通過(guò)DECLARE_WAIT_QUEUE_HEAD宏初始化一個(gè)等待隊(duì)列頭,使得其所在鏈表為空,并設(shè)置鏈表為"未上鎖"狀態(tài)。其中加入了調(diào)試相關(guān)代碼。 static inline void init_waitqueue_head(wait_queue_head_t *q) 該函數(shù)初始化一個(gè)已經(jīng)存在的等待隊(duì)列頭,它將整個(gè)隊(duì)列設(shè)置為"未上鎖"狀態(tài),并將鏈表指針prev和next指向它自身。 { q->lock = WAITQUEUE_RW_LOCK_UNLOCKED; INIT_LIST_HEAD(&q->task_list); } static inline void init_waitqueue_entry(wait_queue_t *q, struct task_struct *p) 該函數(shù)初始化一個(gè)已經(jīng)存在的等待隊(duì)列項(xiàng),它設(shè)置對(duì)應(yīng)的任務(wù)結(jié)構(gòu),同時(shí)將標(biāo)志位清0。 { q->flags = 0; q->task = p; } static inline int waitqueue_active(wait_queue_head_t *q) 該函數(shù)檢查等待隊(duì)列是否為空。 { return !list_empty(&q->task_list); } static inline void __add_wait_queue(wait_queue_head_t *head, wait_queue_t *new) 將指定的等待隊(duì)列項(xiàng)new添加到等待隊(duì)列頭head所在的鏈表頭部,該函數(shù)假設(shè)已經(jīng)獲得鎖。 { list_add(&new->task_list, &head->task_list); } static inline void __add_wait_queue_tail(wait_queue_head_t *head, wait_queue_t *new) 將指定的等待隊(duì)列項(xiàng)new添加到等待隊(duì)列頭head所在的鏈表尾部,該函數(shù)假設(shè)已經(jīng)獲得鎖。 { list_add_tail(&new->task_list, &head->task_list); } static inline void __remove_wait_queue(wait_queue_head_t *head, wait_queue_t *old) 將函數(shù)從等待隊(duì)列頭head所在的鏈表中刪除指定等待隊(duì)列項(xiàng)old,該函數(shù)假設(shè)已經(jīng)獲得鎖,并且old在head所在鏈表中。 { list_del(&old->task_list); } 睡眠和喚醒操作 對(duì) 等待隊(duì)列的操作包括睡眠和喚醒(相關(guān)函數(shù)保存在源代碼樹的/kernel/sched.c和include/linux/sched.h中)。思想是更改 當(dāng)前進(jìn)程(CURRENT)的任務(wù)狀態(tài),并要求重新調(diào)度,因?yàn)檫@時(shí)這個(gè)進(jìn)程的狀態(tài)已經(jīng)改變,不再在調(diào)度表的就緒隊(duì)列中,因此無(wú)法再獲得執(zhí)行機(jī)會(huì),進(jìn)入"睡 眠"狀態(tài),直至被"喚醒",即其任務(wù)狀態(tài)重新被修改回就緒態(tài)。 常用的睡眠操作有interruptible_sleep_on和 sleep_on。兩個(gè)函數(shù)類似,只不過(guò)前者將進(jìn)程的狀態(tài)從就緒態(tài)(TASK_RUNNING)設(shè)置為TASK_INTERRUPTIBLE,允許通過(guò)發(fā) 送signal喚醒它(即可中斷的睡眠狀態(tài));而后者將進(jìn)程的狀態(tài)設(shè)置為TASK_UNINTERRUPTIBLE,在這種狀態(tài)下,不接收任何 singal。以interruptible_sleep_on為例,其展開(kāi)后的代碼是: void interruptible_sleep_on(wait_queue_head_t *q) { unsigned long flags; wait_queue_t wait; /* 構(gòu)造當(dāng)前進(jìn)程對(duì)應(yīng)的等待隊(duì)列項(xiàng) */ init_waitqueue_entry(&wait, current); /* 將當(dāng)前進(jìn)程的狀態(tài)從TASK_RUNNING改為TASK_INTERRUPTIBLE */
current->state = TASK_INTERRUPTIBLE; /* 將等待隊(duì)列項(xiàng)添加到指定鏈表中 */
wq_write_lock_irqsave(&q->lock,flags); __add_wait_queue(q, &wait); wq_write_unlock(&q->lock); /* 進(jìn)程重新調(diào)度,放棄執(zhí)行權(quán) */
schedule(); /* 本進(jìn)程被喚醒,重新獲得執(zhí)行權(quán),首要之事是將等待隊(duì)列項(xiàng)從鏈表中刪除 */
wq_write_lock_irq(&q->lock); __remove_wait_queue(q, &wait); wq_write_unlock_irqrestore(&q->lock,flags); /* 至此,等待過(guò)程結(jié)束,本進(jìn)程可以正常執(zhí)行下面的邏輯 */ } 對(duì) 應(yīng)的喚醒操作包括wake_up_interruptible和wake_up。wake_up函數(shù)不僅可以喚醒狀態(tài)為 TASK_UNINTERRUPTIBLE的進(jìn)程,而且可以喚醒狀態(tài)為TASK_INTERRUPTIBLE的進(jìn)程。 wake_up_interruptible只負(fù)責(zé)喚醒狀態(tài)為TASK_INTERRUPTIBLE的進(jìn)程。這兩個(gè)宏的定義如下: #define wake_up(x) __wake_up((x),TASK_UNINTERRUPTIBLE | TASK_INTERRUPTIBLE, 1) #define wake_up_interruptible(x) __wake_up((x),TASK_INTERRUPTIBLE, 1) __wake_up函數(shù)主要是獲取隊(duì)列操作的鎖,具體工作是調(diào)用__wake_up_common完成的。 void __wake_up(wait_queue_head_t *q, unsigned int mode, int nr) { if (q) { unsigned long flags; wq_read_lock_irqsave(&q->lock, flags); __wake_up_common(q, mode, nr, 0); wq_read_unlock_irqrestore(&q->lock, flags); } } /* The core wakeup function. Non-exclusive wakeups (nr_exclusive == 0) just wake everything up. If it's an exclusive wakeup (nr_exclusive == small +ve number) then we wake all the non-exclusive tasks and one exclusive task. There are circumstances in which we can try to wake a task which has already started to run but is not in state TASK_RUNNING. try_to_wake_up() returns zero in this (rare) case, and we handle it by contonuing to scan the queue. */ static inline void __wake_up_common (wait_queue_head_t *q, unsigned int mode, int nr_exclusive, const int sync) 參數(shù)q表示要操作的等待隊(duì)列,mode表示要喚醒任務(wù)的狀態(tài),如TASK_UNINTERRUPTIBLE或TASK_INTERRUPTIBLE等。nr_exclusive是要喚醒的互斥進(jìn)程數(shù)目,在這之前遇到的非互斥進(jìn)程將被無(wú)條件喚醒。sync表示??? { struct list_head *tmp; struct task_struct *p; CHECK_MAGIC_WQHEAD(q);
WQ_CHECK_LIST_HEAD(&q->task_list); /* 遍歷等待隊(duì)列 */
list_for_each(tmp,&q->task_list) { unsigned int state; /* 獲得當(dāng)前等待隊(duì)列項(xiàng) */ wait_queue_t *curr = list_entry(tmp, wait_queue_t, task_list); CHECK_MAGIC(curr->__magic);
/* 獲得對(duì)應(yīng)的進(jìn)程 */ p = curr->task; state = p->state; /* 如果我們需要處理這種狀態(tài)的進(jìn)程 */
if (state & mode) { WQ_NOTE_WAKER(curr); if (try_to_wake_up(p, sync) && (curr->flags&WQ_FLAG_EXCLUSIVE) && !--nr_exclusive) break; } } } /* 喚醒一個(gè)進(jìn)程,將它放到運(yùn)行隊(duì)列中,如果它還不在運(yùn)行隊(duì)列的話。"當(dāng)前"進(jìn)程總是在運(yùn)行隊(duì)列中的(except when the
actual re-schedule is in progress),and as such you're allowed to do the
simpler "current->state = TASK_RUNNING" to mark yourself runnable
without the overhead of this. */
static inline int try_to_wake_up(struct task_struct * p, int synchronous) { unsigned long flags; int success = 0; /* 由于我們需要操作運(yùn)行隊(duì)列,必須獲得對(duì)應(yīng)的鎖 */
spin_lock_irqsave(&runqueue_lock, flags); /* 將進(jìn)程狀態(tài)設(shè)置為TASK_RUNNING */ p->state = TASK_RUNNING; /* 如果進(jìn)程已經(jīng)在運(yùn)行隊(duì)列中,釋放鎖退出 */ if (task_on_runqueue(p)) goto out; /* 否則將進(jìn)程添加到運(yùn)行隊(duì)列中 */ add_to_runqueue(p); /* 如果設(shè)置了同步標(biāo)志 */
if (!synchronous || !(p->cpus_allowed & (1UL << smp_processor_id()))) reschedule_idle(p); /* 喚醒成功,釋放鎖退出 */ success = 1; out: spin_unlock_irqrestore(&runqueue_lock, flags); return success; } 等待隊(duì)列應(yīng)用模式 等待隊(duì)列的的應(yīng)用涉及兩個(gè)進(jìn)程,假設(shè)為A和B。A是資源的消費(fèi)者,B是資源的生產(chǎn)者。A在消費(fèi)的時(shí)候必須確保資源已經(jīng)生產(chǎn)出來(lái),為此定義一個(gè)資源等待隊(duì)列。這個(gè)隊(duì)列同時(shí)要被進(jìn)程A和進(jìn)程B使用,我們可以將它定義為一個(gè)全局變量。 DECLARE_WAIT_QUEUE_HEAD(rsc_queue); /* 全局變量 */ 在進(jìn)程A中,執(zhí)行邏輯如下: while (resource is unavaiable) { interruptible_sleep_on( &wq ); } consume_resource(); 在進(jìn)程B中,執(zhí)行邏輯如下: produce_resource(); wake_up_interruptible( &wq ); |
|