内核工作队列workqueue 简述
一 引入工作队列(work queue)
之前聊过Linux中断机制分为上半部中断(硬中断)和下半部,顶半部中断用于完成比较紧急的功能,往往只是简单的读取寄存器中的中断状态,并在清除中断标志后,启动下半部,下半部需要完成中断事件的绝大多数任务。我们经常使用tasklet机制(软中断延迟机制)来实现下半部工作,而tasklet机制是一种传统的底半部处理机制,它的执行时机往往放生在顶半部返回的时候,tasklet是基于软中断实现的,因此是运行在软中断上下文,软中断上下文也是中断上下文。而中断上下文的特点是内部不能发生睡眠或者延时等操作。那么这种情况就非常适合用 工作队列(work queue)来完成后面的工作。工作队列(work queue)是另外一种将工作推后执行的形式,它和tasklet有所不同。工作队列可以把工作推后,交由一个内核线程去执行,也就是说,这个下半部分可以在进程上下文中执行。这样,通过工作队列执行的代码能占尽进程上下文的所有优势。最重要的就是工作队列允许被重新调度甚至是睡眠。
那么,什么情况下使用工作队列,什么情况下使用tasklet。如果推后执行的任务需要睡眠,那么就选择工作队列。如果推后执行的任务不需要睡眠,那么就选择tasklet。另外,如果需要用一个可以重新调度的实体来执行你的下半部处理,也应该使用工作队列。它是唯一能在进程上下文运行的下半部实现的机制,也只有它才可以睡眠。这意味着在需要获得大量的内存时、在需要获取信号量时,在需要执行阻塞式的I/O操作时,它都会非常有用。如果不需要用一个内核线程来推后执行工作,那么就考虑使用tasklet。
二 工作队列(work queue)概述
参考自:https://my.oschina.net/kaedehao/blog/631394 以及 https://blog.****.net/bullbat/article/details/7410563
和 https://blog.****.net/zhangzhi123456789/article/details/46988773
workqueue作为内核的重要基础组件,在内核中被广泛的使用,通过工作队列,可以很方便的把我们要执行的某个任务(即函数+上下文参数)交代给内核,由内核替我们执行。内核在系统初始化的时候,已为我们创建好了默认的工作队列,所以我们在使用的时候,可以不需要再创建工作队列,只需要创建工作,并将工作添加到工作队列上即可。当然,我们也可以创建自己的工作队列,而不使用内核创建好的工作队列。简单的理解,工作队列是由内核线程+链表+等待队列来实现的,即由一个内核线程不断的从链表中读取工作,然后执行工作的工作函数!系统默认的工作者线程为events,自己也可以创建自己的工作者线程。
工作队列(workqueue)是另外一种将工作推后执行的形式.工作队列可以把工作推后,交由一个内核线程去执行,也就是说,这个下半部分可以在进程上下文中执行。最重要的就是工作队列允许被重新调度甚至是睡眠。
我们把推后执行的任务叫做工作(work),描述它的数据结构为work_struct,
struct work_struct {
atomic_long_t data; /*工作处理函数func的参数*/
#define WORK_STRUCT_PENDING 0 /* T if work item pending execution */
#define WORK_STRUCT_STATIC 1 /* static initializer (debugobjects) */
#define WORK_STRUCT_FLAG_MASK (3UL)
#define WORK_STRUCT_WQ_DATA_MASK (~WORK_STRUCT_FLAG_MASK)
struct list_head entry; /*连接工作的指针*/
work_func_t func; /*工作处理函数*/
#ifdef CONFIG_LOCKDEP
struct lockdep_map lockdep_map;
#endif
};
这些结构被连接成链表。当一个工作者线程被唤醒时,它会执行它的链表上的所有工作。工作被执行完毕,它就将相应的work_struct对象从链表上移去。当链表上不再有对象的时候,它就会继续休眠。
这些工作以队列结构组织成工作队列(workqueue),其数据结构为 workqueue_struct
struct workqueue_struct {
struct cpu_workqueue_struct *cpu_wq;
struct list_head list;
const char *name; /*workqueue name*/
int singlethread; /*是不是单线程 - 单线程我们首选第一个CPU -0表示采用默认的工作者线程event*/
int freezeable; /* Freeze threads during suspend */
int rt;
};
如果是多线程,Linux根据当前系统CPU的个数创建cpu_workqueue_struct 其结构体就是
truct cpu_workqueue_struct {
spinlock_t lock;/*因为工作者线程需要频繁的处理连接到其上的工作,所以需要枷锁保护*/
struct list_head worklist;
wait_queue_head_t more_work;
struct work_struct *current_work; /*当前的work*/
struct workqueue_struct *wq; /*所属的workqueue*/
struct task_struct *thread; /*任务的上下文*/
} ____cacheline_aligned;
三者之间的关系如下:
2.1 工作队列的常用接口 - 创建 初始化 work_struct 工作
创建:
创建方法1
#define __DELAYED_WORK_INITIALIZER(n, f) { \
.work = __WORK_INITIALIZER((n).work, (f)), \
.timer = TIMER_INITIALIZER(NULL, 0, 0), \
}
创建方法2 :这样就会静态地创建一个名为name,待执行函数为func,参数为data的work_struct结构。
#define DECLARE_WORK(n, f) \
struct work_struct n = __WORK_INITIALIZER(n, f)
创建方法3 :动态地初始化一个由work指向的工作
// 也可以使用INIT_WORK宏:
#define INIT_WORK(_work, _func) \
do { \
(_work)->data = (atomic_long_t) WORK_DATA_INIT(); \
INIT_LIST_HEAD(&(_work)->entry); \
PREPARE_WORK((_work), (_func)); \
} while (0)
初始化:
主要完成 工作绑定的处理函数,工作队列待执行的函数原型是: void work_handler(void*data)。
这个函数会由一个工作者线程执行,因此,函数会运行在进程上下文中。默认情况下,允许响应中断,并且不持有任何锁。如果需要,函数可以睡眠。需要注意的是,尽管该函数运行在进程上下文中,但它不能访问用户空间,因为内核线程在用户空间没有相关的内存映射。通常在系统调用发生时,内核会代表用户空间的进程运行,此时它才能访问用户空间,也只有在此时它才会映射用户空间的内存。
2.2 工作队列的常用接口 - 对工作进行调度 即 工作入队
工作入队(添加到内核工作队列 ): int schedule_work(struct work_struct *work);
1
schedule_work() 、会使work马上就会被调度,一旦其所在的处理器上的工作者线程被唤醒,它就会被执行。在大多数情况下, 并不需要自己建立工作队列,而是只定义工作, 将工作结构挂接到内核预定义的事件工作队列中调度, 在kernel/workqueue.c中定义了一个静态全局量的工作队列static struct workqueue_struct *keventd_wq;默认的工作者线程叫做events/n,这里n是处理器的编号,每个处理器对应一个线程。比如,单处理器的系统只有events/0这样一个线程。而双处理器的系统就会多一个events/1线程。
schedule_work将工作结构添加到全局的事件工作队列keventd_wq,keventd_wq由内核自己维护,创建,销毁。这样work马上就会被调度,一旦其所在的处理器上的工作者线程被唤醒,它就会被执行。
2
有时候并不希望工作马上就被执行,而是希望它经过一段延迟以后再执行。在这种情况下,同时也可以利用timer来进行延时调度它在指定的时间执行,到期后才由默认的定时器回调函数进行工作注册。延迟delay后,被定时器唤醒,将work添加到工作队列wq中。
schedule_delayed_work(&work,delay);
工作队列是没有优先级的,基本按照FIFO的方式进行处理。
三 工作队列(work queue)简单使用
在Workqueue机制中,提供了一个系统默认的workqueue队列——keventd_wq,这个队列是Linux系统在初始化的时候就创建的。用户可以直接初始化一个work_struct对象,然后在该队列中进行调度,使用更加方便。
当用户调用workqueue队列的初始化API : create_workqueue 或者 create_singlethread_workqueue 对workqueue队列进行初始化时,内核就开始为用户分配一个workqueue对象,并且将其链到一个全局的workqueue队列中。然后Linux根据当前CPU的情况,为workqueue对象分配与CPU个数相同的cpu_workqueue_struct对象,每个cpu_workqueue_struct对象都会存在一条任务队列。紧接着,Linux为每个cpu_workqueue_struct对象分配一个内核thread,即内核daemon去处理每个队列中的任务。至此,用户调用初始化接口将workqueue初始化完毕,返回workqueue的指针。
例程:
实验1 使用系统默认的队列 : 对于内核现成的队列,我们初始化完work后直接用queue_schedule加入系统默认的workqueue队列 : keventd_wq并调度执行
实验2 重新创建队列 : 对于我们从新创建的工作队列,需要用create_queue来创建work_queue,然后初始化work,最后,还需要使用queue_work加入我们创建的工作队列并调度执行。
实验1:使用系统默认的队列
实验一之模板:
#include <linux/module.h>
#include <linux/delay.h>
#include <linux/workqueue.h>
#define ENTER() printk(KERN_DEBUG "%s() Enter", __func__)
#define EXIT() printk(KERN_DEBUG "%s() Exit", __func__)
#define ERR(fmt, args...) printk(KERN_ERR "%s()-%d: " fmt "\n", __func__, __LINE__, ##args)
#define DBG(fmt, args...) printk(KERN_DEBUG "%s()-%d: " fmt "\n", __func__, __LINE__, ##args)
struct test_work {
struct work_struct w;
unsigned long data;
};
static struct test_work my_work;
static void my_work_func(struct work_struct *work)
{
struct test_work *p_work;
ENTER();
p_work = container_of(work, struct test_work, w);
while (p_work->data) {
DBG("data: %lu", p_work->data--);
msleep_interruptible(1000);
}
EXIT();
}
static int __init wq_demo_init(void)
{
INIT_WORK(&my_work.w, my_work_func);
my_work.data = 30;
msleep_interruptible(1000);
DBG("schedule work begin:");
if (schedule_work(&my_work.w) == 0) {
ERR("schedule work fail");
return -1;
}
DBG("success");
return 0;
}
static void __exit wq_demo_exit(void)
{
ENTER();
while (my_work.data) {
DBG("waiting exit");
msleep_interruptible(2000);
}
EXIT();
}
MODULE_LICENSE("GPL");
module_init(wq_demo_init);
module_exit(wq_demo_exit);
实验一之实践:
场景: Android7.1 rk3288主板接收外部单片机小板中断信号。调度工作处理函数 即将推后执行的工作加入 系统默认队列,一旦其所在的处理器上的工作者线程被唤醒,它就会被执行。执行内容是 通过i2c总线获取外设小板寄存器中的数值,将数值以sysfs属性文件的形式推送到 sys/chensai 路径下的对应结点,可通过adb shell cat gpion 获取数值。
#include <linux/kernel.h>
#include <linux/init.h>
#include <linux/module.h>
#include <linux/fs.h>
#include <linux/cdev.h>
#include <linux/ioctl.h>
#include <linux/mm.h>
#include <asm/uaccess.h>
#include <linux/blkdev.h>
#include <linux/init.h>
#include <linux/fs.h>
#include <linux/kdev_t.h>
#include <linux/slab.h>
#include <linux/string.h>
#include <linux/major.h>
#include <linux/errno.h>
#include <linux/module.h>
#include <linux/seq_file.h>
#include <linux/kobject.h>
#include <linux/kobj_map.h>
#include <linux/cdev.h>
#include <linux/mutex.h>
#include <linux/backing-dev.h>
#include <linux/tty.h>
#include <linux/kernel.h>
#include <linux/module.h>
#include <linux/delay.h>
#include <linux/hrtimer.h>
#include <linux/i2c.h>
#include <linux/input.h>
#include <linux/interrupt.h>
#include <linux/io.h>
#include <linux/platform_device.h>
#include <linux/async.h>
#include <linux/irq.h>
#include <linux/workqueue.h>
#include <linux/proc_fs.h>
#include <linux/input/mt.h>
#include <linux/of_gpio.h>
#include <linux/gpio.h>
#include <linux/slab.h>
#include <linux/device.h>
#include <linux/module.h>
#include <linux/moduleparam.h>
#include <linux/init.h>
#include <linux/kmod.h>
#include <linux/sched.h>
#include <linux/delay.h>
#include <linux/module.h>
#include <linux/debugfs.h>
#include <linux/kthread.h>
#include <asm/unaligned.h>
#include <net/bluetooth/bluetooth.h>
#include <net/bluetooth/hci_core.h>
#include <net/bluetooth/l2cap.h>
#include <net/bluetooth/rfcomm.h>
static int debug = 4;
module_param(debug, int, S_IRUGO|S_IWUSR);
#define dbg_codec(level, fmt, arg...) \
do { \
if (debug >= level) \
printk(fmt , ## arg); \
} while (0)
#define DBG(fmt, ...) dbg_codec(0, fmt, ## __VA_ARGS__)
#define CLONE_KERNEL (CLONE_FS | CLONE_FILES | CLONE_SIGHAND)
static struct i2c_client *chensai_client = NULL;
unsigned int gpio_num; //= 230; //GPIO7_A6
unsigned char register_addr = 0x00;
char chensai_iic_addr = 0x09;
int chensai_irq_num;
uint8_t key_status[1];
int keys_val[5];
int key_status_val;
//static struct task_struct *task;
struct test_work {
struct work_struct mhr_work;
int val;
};
struct test_work my_work;
static int chensai_i2c_read( struct i2c_client* client,unsigned char reg,uint8_t *data, char device_addr)
{
int ret;
struct i2c_msg msgs[] = {
{
.addr = device_addr,
.flags = 0,
// .len = 1,
.len = sizeof(reg),
.buf = ®,// 寄存器地址
},
{
.addr = device_addr,
// .flags = I2C_M_RD,0x01
.flags = I2C_M_RD,
.len = sizeof(data),
.buf = data,// 寄存器的值
},
};
ret = i2c_transfer(client->adapter, msgs, 2);
if (ret < 0)
{
printk("i2c read error\n");
}
DBG("%s i2c_transfer_ret=%d\n", __func__, ret);
return ret;
}
/*
static int chensai_thread(void *arg)
{
int i;
DBG("%s : chensai_thread\n", __func__);
while(1)
{
if(irq_status)
{
chensai_i2c_read(chensai_client, register_addr, key_status, chensai_iic_addr);
key_status_val = key_status[0];
irq_status = 0;
for( i=0; i < 5; i++)
{
if(key_status_val & (1 << i))
{
keys_val[i] = 1;
}else{
keys_val[i] = 0;
}
}
}
ssleep(1);
}
return 0;
}
*/
static ssize_t gpio0_show(struct kobject *kobj, struct kobj_attribute *attr, char *buf)
{
int a = keys_val[0];
return sprintf(buf, "%d\n", a);
}
static ssize_t gpio1_show(struct kobject *kobj, struct kobj_attribute *attr, char *buf)
{
int a = keys_val[1];
return sprintf(buf, "%d\n", a);
}
static ssize_t gpio2_show(struct kobject *kobj, struct kobj_attribute *attr, char *buf)
{
int a = keys_val[2];
return sprintf(buf, "%d\n", a);
}
static ssize_t gpio3_show(struct kobject *kobj, struct kobj_attribute *attr, char *buf)
{
int a = keys_val[3];
return sprintf(buf, "%d\n", a);
}
static ssize_t gpio4_show(struct kobject *kobj, struct kobj_attribute *attr, char *buf)
{
int a = keys_val[4];
return sprintf(buf, "%d\n", a);
}
static struct kobject *chensai_kobj = NULL;
struct chensai_control_attribute {
struct attribute attr;
ssize_t (*show)(struct kobject *kobj, struct kobj_attribute *attr, char *buf);
ssize_t (*store)(struct kobject *kobj, struct kobj_attribute *attr, const char *buf, size_t n);
};
static struct chensai_control_attribute chensai_attribute[] = {
__ATTR(gpio0, 0777, gpio0_show, NULL),
__ATTR(gpio1, 0777, gpio1_show, NULL),
__ATTR(gpio2, 0777, gpio2_show, NULL),
__ATTR(gpio3, 0777, gpio3_show, NULL),
__ATTR(gpio4, 0777, gpio4_show, NULL),
};
static void my_work_func(struct work_struct *work)
{
struct test_work *p_work;
int i;
p_work = container_of(work, struct test_work, mhr_work);
if(p_work->val) {
DBG("%s : container_of ok\n", __func__);
chensai_i2c_read(chensai_client, register_addr, key_status, chensai_iic_addr);
key_status_val = key_status[0];
for( i=0; i < 5; i++)
{
if(key_status_val & (1 << i))
{
keys_val[i] = 1;
}else{
keys_val[i] = 0;
}
}
}
}
static irqreturn_t chensai_irq_handler(int irq, void *dev_id)
{
DBG("%s :enter\n", __func__);
disable_irq_nosync(chensai_irq_num);
//irq_status = 1;
if (schedule_work(&my_work.mhr_work) == 0) {
DBG("schedule work fail\n");
}
enable_irq(chensai_irq_num);
return IRQ_HANDLED;
}
static int chensai_probe(struct i2c_client *client, const struct i2c_device_id *id)
{
int error;
struct device_node *np = client->dev.of_node;
enum of_gpio_flags flags;
int i;
DBG("%s : chensai_probe\n", __func__);
for(i=0; i < 5; i++)
{
error = sysfs_create_file(chensai_kobj, &chensai_attribute[i].attr);
}
gpio_num =of_get_named_gpio_flags(np, "irq-gpio", 0, &flags);
DBG("%s gpio_num=%d\n", __func__, gpio_num);
if (!gpio_is_valid(gpio_num)){
DBG("%s gpio_is_unvalid \n", __func__);
}
if (gpio_request(gpio_num, "irq-gpio")) {
DBG("%s failed to request irq-gpio, gpio_num =%d\n", __func__, gpio_num);
}
gpio_direction_input(gpio_num);
chensai_irq_num = gpio_to_irq(gpio_num); //将gpio转换成对应的中断号
DBG("%s chensai_irq_num=%d\n", __func__, chensai_irq_num);
error = request_irq(chensai_irq_num, chensai_irq_handler, IRQ_TYPE_EDGE_BOTH, "chensai_irq", NULL);
if (error) {
printk("request_irq error\n");
}
//队列
INIT_WORK(&my_work.mhr_work, my_work_func);
my_work.val = 1;
chensai_client = client;
return 0;
}
static const struct i2c_device_id chensai_id[] = {
{"chensai_keyboard", 0},
{}
};
MODULE_DEVICE_TABLE(i2c, chensai_id);
static struct i2c_driver chensai_drv = {
.driver = {
.name = "chensai",
.owner = THIS_MODULE,
},
.probe = chensai_probe,
.id_table = chensai_id,
};
static int chensai_init(void)
{
chensai_kobj = kobject_create_and_add("chensai", NULL);
i2c_add_driver(&chensai_drv);
return 0;
}
static void chensai_exit(void)
{
i2c_del_driver(&chensai_drv);
free_irq(chensai_irq_num, chensai_irq_handler);
//kthread_stop(task); //发信号给task,通知其可以退出了
}
module_init(chensai_init);
module_exit(chensai_exit);
MODULE_LICENSE("GPL");
实验2:重新创建队列
实验二之模板:
#include <linux/init.h>
#include <linux/kernel.h>
#include <linux/module.h>
MODULE_AUTHOR("Mike Feng");
/*测试数据结构*/
struct my_data
{
structwork_struct my_work;
intvalue;
};
struct workqueue_struct *wq=NULL;
struct work_struct work_queue;
/*初始化我们的测试数据*/
struct my_data* init_data(structmy_data *md)
{
md=(structmy_data*)kmalloc(sizeof(struct my_data),GFP_KERNEL);
md->value=1;
md->my_work=work_queue;
returnmd;
}
/*工作队列函数*/
static void work_func(struct work_struct *work)
{
structmy_data *md=container_of(work,structmy_data,my_work);
printk("<2>""Thevalue of my data is:%d\n",md->value);
}
static __init intwork_init(void)
{
structmy_data *md=NULL;
structmy_data *md2=NULL;
md2=init_data(md2);
md=init_data(md);
md2->value=20;
md->value=10;
/*第一种方式:使用统默认的workqueue队列——keventd_wq,直接调度*/
INIT_WORK(&md->my_work,work_func);
schedule_work(&md->my_work);
/*第二种方式:创建自己的工作队列,加入工作到工作队列(加入内核就对其调度执行)*/
wq=create_workqueue("test");
INIT_WORK(&md2->my_work,work_func);
queue_work(wq,&md2->my_work);
return0;
}
static void work_exit(void)
{
/*工作队列销毁*/
destroy_workqueue(wq);
}
module_init(work_init);
module_exit(work_exit);
实验二之实践:
待续。。。