本文主要講解如何通過等待隊列實現(xiàn)對進程的阻塞。
應(yīng)用場景:
當(dāng)進程要獲取某些資源(例如從網(wǎng)卡讀取數(shù)據(jù))的時候,但資源并沒有準(zhǔn)備好(例如網(wǎng)卡還沒接收到數(shù)據(jù)),這時候內(nèi)核必須切換到其他進程運行,直到資源準(zhǔn)備好再喚醒進程。
waitqueue (等待隊列)就是內(nèi)核用于管理等待資源的進程,當(dāng)某個進程獲取的資源沒有準(zhǔn)備好的時候,可以通過調(diào)用 add_wait_queue()函數(shù)把進程添加到 waitqueue 中,然后切換到其他進程繼續(xù)執(zhí)行。當(dāng)資源準(zhǔn)備好,由資源提供方通過調(diào)用 wake_up()函數(shù)來喚醒等待的進程。
定義頭文件:
#include
定義和初始化等待隊列頭(workqueue):
靜態(tài)的,用宏:
#define DECLARE_WAIT_QUEUE_HEAD(name) wait_queue_head_t name = __WAIT_QUEUE_HEAD_INITIALIZER(name)
動態(tài)的,也是用宏:
#define init_waitqueue_head(q) do { static struct lock_class_key __key; __init_waitqueue_head((q), #q, &__key); } while (0)
定義實例
wait_queue_head_t wq; init_waitqueue_head(&wq);
阻塞接口:
wait_event(wq, condition) wait_event_timeout(wq, condition, timeout) wait_event_interruptible(wq, condition) wait_event_interruptible_timeout(wq, condition, timeout) wait_event_hrtimeout(wq, condition, timeout) wait_event_interruptible_hrtimeout(wq, condition, timeout) wait_event_interruptible_exclusive(wq, condition) wait_event_interruptible_locked(wq, condition) wait_event_interruptible_locked_irq(wq, condition) wait_event_interruptible_exclusive_locked(wq, condition) wait_event_interruptible_exclusive_locked_irq(wq, condition) wait_event_killable(wq, condition) wait_event_lock_irq_cmd(wq, condition, lock, cmd) wait_event_lock_irq(wq, condition, lock) wait_event_interruptible_lock_irq_cmd(wq, condition, lock, cmd) wait_event_interruptible_lock_irq(wq, condition, lock) wait_event_interruptible_lock_irq_timeout(wq, condition, lock, timeout)
參數(shù)
wq 定義的等待隊列頭, condition 為條件表達式,當(dāng) wake up 后,condition 為真時,喚醒阻塞的進程,為假時,繼續(xù)睡眠。
功能說明
接口版本比較多,各自都有自己合適的應(yīng)用場合,但是常用的是前面四個。
wait_event:不可中斷的睡眠,條件一直不滿足,會一直睡眠。 wait_event_timeout:不可中斷睡眠,當(dāng)超過指定的 timeout(單位是 jiffies)時間,不管有沒有 wake up,還是條件沒滿足,都要喚醒進程,此時返回的是 0。在 timeout 時間內(nèi)條件滿足返回值為 timeout 或者 1; wait_event_interruptible:可被信號中斷的睡眠,被信號打斷喚醒時,返回負(fù)值 -ERESTARTSYS;wake up 時,條件滿足的,返回 0。除了 wait_event 沒有返回值,其它的都有返回,有返回值的一般都要判斷返回值。如下例: int flag = 0; if(wait_event_interruptible(&wq,flag == 1)) return -ERESTARTSYS; wait_event_interruptible_timeout:是 wait_event_timeout 和 wait_event_interruptible_timeout 的結(jié)合版本,有它們兩個的特點。
其他的接口,用的不多,有興趣可以自己看看。
解除阻塞接口(喚醒)
接口定義:
#define wake_up(x) __wake_up(x, TASK_NORMAL, 1, NULL) #define wake_up_nr(x, nr) __wake_up(x, TASK_NORMAL, nr, NULL) #define wake_up_all(x) __wake_up(x, TASK_NORMAL, 0, NULL) #define wake_up_locked(x) __wake_up_locked((x), TASK_NORMAL, 1) #define wake_up_all_locked(x) __wake_up_locked((x), TASK_NORMAL, 0) #define wake_up_interruptible(x) __wake_up(x, TASK_INTERRUPTIBLE, 1, NULL) #define wake_up_interruptible_nr(x, nr) __wake_up(x, TASK_INTERRUPTIBLE, nr, NULL) #define wake_up_interruptible_all(x) __wake_up(x, TASK_INTERRUPTIBLE, 0, NULL) #define wake_up_interruptible_sync(x) __wake_up_sync((x), TASK_INTERRUPTIBLE, 1)
功能說明
wake_up:一次只能喚醒掛在這個等待隊列頭上的一個進程 wake_up_nr:一次喚起 nr 個進程(等待在同一個 wait_queue_head_t 有很多個) wake_up_all:一次喚起所有等待在同一個 wait_queue_head_t 上所有進程 wake_up_interruptible:對應(yīng) wait_event_interruptible 版本的 wake up wake_up_interruptible_sync:保證 wake up 的動作原子性,wake_up 這個函數(shù),很有可能函數(shù)還沒執(zhí)行完,就被喚起來進程給搶占了,這個函數(shù)能夠保證 wak up 動作完整的執(zhí)行完成。
其他的也是與對應(yīng)阻塞接口對應(yīng)的。
使用實例
以字符設(shè)備為例,在沒有數(shù)據(jù)的時候,在 read 函數(shù)中實現(xiàn)讀阻塞,當(dāng)向內(nèi)核寫入數(shù)據(jù)時,則喚醒阻塞在該等待隊列的所有任務(wù)。
讀操作
static ssize_t hello_read(struct file *filp,char __user *buf,size_t size,loff_t *poss) { wait_event_interruptible(rwq,flage!=0); …………… flage=0; wake_up_interruptible(&wwq); return size; }
寫操作
static ssize_t hello_write(struct file *filp,const char __user *buf,size_t size,loff_t *poss) { wait_event_interruptible(wwq,flage!=1); …………… flage=1; wake_up_interruptible(&rwq); return size; }
如何同步支持非阻塞?
上述操作雖然實現(xiàn)了阻塞功能,但是我們在應(yīng)用程序打開一個字符設(shè)備的時候,有時候我們希望操作是非阻塞的,比如:
fd=open("/dev/hello",O_RDONLY|O_NONBLOCK);
那么驅(qū)動如何得到這個標(biāo)記呢?
參考《手把手教 Linux 驅(qū)動 6-inode,file,file_operations 關(guān)系》,該標(biāo)記會存儲在結(jié)構(gòu)體 struct file 的 f_flags 成員中。
所以程序可以修改如下:
static ssize_t hello_read(struct file *filp,char __user *buf,size_t size,loff_t *poss) { int ret = 0; if(flage==0) { if(filp->f_flags & O_NONBLOCK) { return -EAGAIN; } wait_event_interruptible(rwq,flage!=0); } …………… flage=0; wake_up_interruptible(&wwq); return size; }
一種靈活的添加刪除等待隊列頭中的等待隊列:
(1)定義:
靜態(tài):
#define DECLARE_WAITQUEUE(name, tsk) wait_queue_t name = __WAITQUEUE_INITIALIZER(name, tsk)
(2)動態(tài):
wait_queue_t wa; init_waitqueue_entry(&wa,&tsk);
tsk 是進程結(jié)構(gòu)體,一般是 current(linux 當(dāng)前進程就是用這個獲?。_€可以用下面的,設(shè)置自定義的等待隊列回調(diào)函數(shù),上面的是 linux 默認(rèn)的一個回調(diào)函數(shù) default_wake_function(),不過默認(rèn)的用得最多:
wait_queue_t wa; wa->private = &tsk; int func(wait_queue_t *wait, unsigned mode, int flags, void *key) { // } init_waitqueue_func_entry(&wa,func);
(回調(diào)有什么作用?)
用下面函數(shù)將等待隊列,加入到等待隊列頭(帶 remove 的是從工作隊列頭中刪除工作隊列):
extern void add_wait_queue(wait_queue_head_t *q, wait_queue_t *wait); extern void add_wait_queue_exclusive(wait_queue_head_t *q, wait_queue_t *wait); extern void remove_wait_queue(wait_queue_head_t *q, wait_queue_t *wait);
上面的阻塞和解除阻塞接口,只能是對當(dāng)前進程阻塞 / 解除阻塞,有了這幾個靈活的接口,我們可以單獨定義一個等待隊列,只要獲取進程 task_struct 指針,我們可以將任何進程加入到這個等待隊列,然后加入到等待隊列頭,我們能將其它任何進程(不僅僅是當(dāng)前進程),掛起睡眠,當(dāng)然喚醒時,如果用 wake_up_all 版本的話,也會一同喚起。這種情況,阻塞不能用上面的接口了,我們需要用下一節(jié)講述的接口(schedule()),解除阻塞可以用 wake_up,wake_up_interruptible 等。
更高級靈活的阻塞:
阻塞當(dāng)前進程的原理:用函數(shù) set_current_state()修改當(dāng)前進程為 TASK_INTERRUPTIBLE(不可中斷睡眠)或 TASK_UNINTERRUPTIBLE(可中斷睡眠)狀態(tài),然后調(diào)用 schedule()告訴內(nèi)核重新調(diào)度,由于當(dāng)前進程狀態(tài)已經(jīng)為睡眠狀態(tài),自然就不會被調(diào)度。schedule()簡單說就是告訴內(nèi)核當(dāng)前進程主動放棄 CPU 控制權(quán)。這樣來,就可以說當(dāng)前進程在此處睡眠,即阻塞在這里。
在上一小節(jié)“靈活的添加刪等待隊列頭中的等待隊列”,將任意進程加入到 waitqueue,然后類似用:
task_struct *tsk; wait_queue_t wa; // 假設(shè) tsk 已經(jīng)指向某進程控制塊 p->state = TASK_INTERRUPTIBLE;//or TASK_UNINTERRUPTIBLE init_waitqueue_entry(&wa,&tsk);
就能將任意進程掛起,當(dāng)然,還需要將 wa,掛到等待隊列頭,然后用 wait_event(&wa),進程就會從就緒隊列中退出,進入到睡眠隊列,直到 wake up 時,被掛起的進程狀態(tài)被修改為 TASK_RUNNING,才會被再次調(diào)度。(主要是 schedule()下面會說到)。
wait_event 實現(xiàn)原理:
先看下 wait_event 實現(xiàn):
#define __wait_event(wq, condition) do { DEFINE_WAIT(__wait); for (;;) { prepare_to_wait(&wq, &__wait, TASK_UNINTERRUPTIBLE); if (condition) break; schedule(); } finish_wait(&wq, &__wait); } while (0) #define wait_event(wq, condition) do { if (condition) break; __wait_event(wq, condition); } while (0)
DEFINE_WAIT:
定義一個工作隊列。
prepare_to_wait:
定義:void prepare_to_wait(wait_queue_head_t *q, wait_queue_t *wait, int state) 功能:將工作隊列 wait 加入到工作隊列頭 q,并將當(dāng)前進程設(shè)置為 state 指定的狀態(tài),一般是 TASK_UNINTERRUPTIBLE 或 TASK_INTERRUPTIBLE 狀態(tài)(在這函數(shù)里有調(diào)用 set_current_state)。 第一個參數(shù):工作隊列頭 第二個參數(shù):工作隊列 第三個參數(shù):當(dāng)前進程要設(shè)置的狀態(tài)
finish_wait:
用了 prepare_to_wait 之后,當(dāng)退出時,一定要用這個函數(shù)清空等待隊列。
功能:
該函數(shù)首先調(diào)用 prepare_to_wait,修改進程到睡眠狀態(tài),
條件不滿足,schedule()就放棄 CPU 控制權(quán),睡眠,
當(dāng) wakeup 的時候,阻塞在 wq(也可以說阻塞在 wait_event 處)等待隊列頭上的進程,再次得到運行,接著執(zhí)行 schedule()后面的代碼,這里,顯然是個循環(huán),prepare_to_wait 再次設(shè)置當(dāng)前進程為睡眠狀態(tài),然后判斷條件是否滿足,
滿足就退出循環(huán),finish_wait 將當(dāng)前進程恢復(fù)到 TASK_RUNNING 狀態(tài),也就意味著阻塞解除。不滿足,繼續(xù)睡下去。如此反復(fù)等待條件成立。
明白這個過程,用 prepare_to_wait 和 schedule()來實現(xiàn)更為靈活的阻塞,就很簡單了,解除阻塞和前面的一樣用 wake_up,wake_up_interruptible 等。
下面是 wake_up 和 wait_event 流程圖:
wait_event 和 wake_up 流程
獨占等待
當(dāng)調(diào)用 wake_up 時,所有等待在該隊列上的進程都被喚醒,并進入可運行狀態(tài)如果只有一個進程可獲得資源,此時,其他的進程又將再次進入休眠,如果數(shù)量很大,被稱為”瘋狂售群”。這樣會非常占用系統(tǒng)資源。
解決方法:
wait_queue_t 成員 flage 有個重要的標(biāo)志 WQ_FLAG_EXCLUSIVE,表示:
當(dāng)一個等待隊列入口有 WQ_FLAG_EXCLUSEVE 標(biāo)志置位, 它被添加到等待隊列的尾部 . 沒有這個標(biāo)志的入口項, 添加到開始 . 當(dāng) wake_up 被在一個等待隊列上調(diào)用, 它在喚醒第一個有 WQ_FLAG_EXCLUSIVE 標(biāo)志的進程后停止 .
wait_event 默認(rèn)總是將 waitqueue 加入開始,而 wake_up 時總是一個一個的從開始處喚醒,如果不斷有 waitqueue 加入,那么最開始加入的,就一直得不到喚醒,有這個標(biāo)志,就避免了這種情況。
prepare_to_wait_exclusive()就是加入了這個標(biāo)志的。
補充
Linux 將進程狀態(tài)描述為如下五種: 1. TASK_RUNNING:可運行狀態(tài)。處于該狀態(tài)的進程可以被調(diào)度執(zhí)行而成為當(dāng)前進程。 2. TASK_INTERRUPTIBLE:可中斷的睡眠狀態(tài)。處于該狀態(tài)的進程在所需資源有效時被喚醒,也可以通過信號或定時中斷喚醒(因為有 signal_pending()函數(shù))。 3. TASK_UNINTERRUPTIBLE:不可中斷的睡眠狀態(tài)。處于該狀態(tài)的進程僅當(dāng)所需資源有效時被喚醒。 4. TASK_ZOMBIE:僵尸狀態(tài)。表示進程結(jié)束且已釋放資源,但其 task_struct 仍未釋放。 5. TASK_STOPPED:暫停狀態(tài)。處于該狀態(tài)的進程通過其他進程的信號才能被喚醒
Linux 通過結(jié)構(gòu)體 task_struct 維護所有運行的線程、進程,不同狀態(tài)的任務(wù),會由不同的隊列進行維護,schedule()函數(shù)就負(fù)責(zé)根據(jù)這些狀態(tài)的變化調(diào)度這些任務(wù)。關(guān)于進程的調(diào)度,后續(xù)會新開文章詳細(xì)介紹。
實例
下面實例主要功能是基于我們之前課程《手把手教 Linux 驅(qū)動 3- 之字符設(shè)備架構(gòu)詳解,有這篇就夠了》最后的代碼實例,在此基礎(chǔ)上增加寫阻塞的功能。
內(nèi)核中有緩沖內(nèi)存,以及是否可以訪問的標(biāo)記;
int flage=0; //1:有數(shù)據(jù)可讀 0:無數(shù)據(jù),不可讀 char kbuf[128];
初始狀態(tài)下 flage 為 0,沒有數(shù)據(jù);
應(yīng)用進程讀取數(shù)據(jù)會調(diào)用到內(nèi)核函數(shù) hello_read(),如果 flage 為 1,則直接讀走數(shù)據(jù),并將改 flage 置 1,如果 flage 為 0,則進程阻塞,直到有進程寫入數(shù)據(jù)將該 flage 置 1;
應(yīng)用進程每次寫入數(shù)據(jù)會調(diào)用到內(nèi)核函數(shù) hello_write(),如果 flage 為 0,則直接寫入數(shù)據(jù),并設(shè)置 flage 為 1,如果為 1,則阻塞,直到有其他進程調(diào)用到讀函數(shù) hello_read()將 flage 置 0。
驅(qū)動
/*********************************************
*hellodev.c
*********************************************/
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
static int hello_major = 250;
static struct class *hello_class;
#define DEV_NAME "hello_cls"
module_param(hello_major,int,S_IRUGO);
dev_t devno;
struct cdev cdev;
int num;
int flage=0;
char kbuf[128];
wait_queue_head_t rwq; //read wq
wait_queue_head_t wwq; //write wq
int hello_open(struct inode *inode,struct file *filp)
{
return 0;
}
int hello_release(struct inode *inode,struct file *filp)
{
return 0;
}
static ssize_t hello_read(struct file *filp,char __user *buf,size_t size,loff_t *poss)
{
int ret = 0;
if(flage==0)
{
if(filp->f_flags & O_NONBLOCK)
{
return -EAGAIN;
}
wait_event_interruptible(rwq,flage!=0);
}
if(copy_to_user(buf,kbuf,size))
{
return -EFAULT;
}
flage=0;
wake_up_interruptible(&wwq);
return size;
}
static ssize_t hello_write(struct file *filp,const char __user *buf,size_t size,loff_t *poss)
{
int ret = 0;
if(size>128||size<0)
{
return -EINVAL;
}
wait_event_interruptible(wwq,flage!=1);
if(copy_from_user(kbuf,buf,size))
{
return -EFAULT;
}
flage=1;
wake_up_interruptible(&rwq);
return size;
}
static const struct file_operations hello_fops =
{
.owner = THIS_MODULE,
.read = hello_read,
.write = hello_write,
.open = hello_open,
.release = hello_release,
};
static int hellodev_init(void)
{
int result;
int i;
struct device *hello_dev;
devno = MKDEV(hello_major,0);
if(hello_major)
result = register_chrdev_region(devno,2,"hello");
else
{
result = alloc_chrdev_region(&devno,0,2,"hello");
hello_major = MAJOR(devno);
}
if(result < 0)
return result;
cdev_init(&cdev,&hello_fops);
cdev.owner = THIS_MODULE;
cdev.ops = &hello_fops;
cdev_add(&cdev,MKDEV(hello_major,0),1);
init_waitqueue_head(&rwq);
init_waitqueue_head(&wwq);
hello_class = class_create(THIS_MODULE, DEV_NAME);// 類名字
if (IS_ERR(hello_class)) {
printk(KERN_WARNING "class_create faihello ! n");
goto err_3;
}
hello_dev = device_create(hello_class, NULL, devno, NULL, "hello");
if (IS_ERR(hello_dev)) {
printk(KERN_WARNING "device_create faihello! n");
goto err_4;
}
return0;
err_4:
class_destroy(hello_class);
err_3:
cdev_del(&cdev);
unregister_chrdev_region(MKDEV(hello_major,0),2);
return 0;
}
static void hellodev_exit(void)
{
device_destroy(hello_class, devno);
class_destroy(hello_class);
cdev_del(&cdev);
unregister_chrdev_region(MKDEV(hello_major,0),2);
}
MODULE_LICENSE("GPL");
MODULE_DESCRIPTION("yikoulinux");
module_init(hellodev_init);
module_exit(hellodev_exit);
測試程序
read.c
#include
#include
#include
#include
#include
#include
int main()
{
int fd= 0;
char buf[128];
int num;
// fd=open("/dev/hello",O_RDONLY); 阻塞方式讀取
fd=open("/dev/hello",O_RDONLY|O_NONBLOCK);// 非阻塞
if(fd<0)
{
printf("open memdev failed!n");
return -1;
}
read(fd,buf,sizeof(buf));
printf("num:%sn",buf);
close(fd);
return 0;
}
write.c
#include
#include
#include
#include
#include
#include
int main()
{
int fd =0;
char buf[128]="hello yikouLlinux";
int num;
fd=open("/dev/hello",O_RDWR);
if(fd <0)
{
printf("open device failed!n");
return -1;
}
write(fd,buf,sizeof(buf));
close(fd);
return 0;
}
掌握了等待隊列的用法,后面我們就可以進行中斷的講解了。