http://lwn.net/Articles/403891/
工作队列(workqueue)是另外一种将工作推后执行的形式。工作队列可以把工作推后,交由一个内核线程去执行,也就是说,这个下半部分可以在进程上下文中执行。 这样,通过工作队列执行的代码能占尽进程上下文的所有优势。最重要的就是工作队列允许被重新调度甚至是睡眠。 那么,什么情况下使用工作队列,什么情况下使用tasklet。如果推后执行的任务需要睡眠,那么就选择工作队列。如果推后执行的任务不需要睡眠,那么就选择tasklet。另外,如果需要用一个可以重新调度的实体来执行你的下半部处理,也应该使用工作队列。它是唯一能在进程上下文运行的下半部实现的机制,也只有它才可以睡眠。这意味着在需要获得大量的内存时、在需要获取信号量时,在需要执行阻塞式的I/O操作时,它都会非常有用。如果不需要用一个内核线程来推后执行工作,那么就考虑使用tasklet。
先说一下worker_pool数据结构,每个cpu管理两个 worker_pool 数据结构,保存在 cpu_worker_pools 这个per_cpu变量里!! 一个cpu对应的两个 worker_pool 的 worker_pool->attrs->nice的值分别为0和-20。()
struct worker_pool { int cpu; /* I: the associated cpu */ //所属的cpu struct list_head worklist; /* L: list of pending works */ //worklist连着所有可以运行的work!!在schedule_work中把work加到链表中的 int nr_workers; /* L: total number of workers */ //查看start_worker()函数中,设置worker->flags为WORKER_STARTED,然后nr_workers++ int nr_idle; /* L: currently idle ones */ //worker_enter_idle()中nr_idle++ struct list_head idle_list; /* X: list of idle workers */ //worker_enter_idle()中 list_add(&worker->entry, &pool->idle_list) struct timer_list idle_timer; /* L: worker idle timeout */ struct timer_list mayday_timer; /* L: SOS timer for workers */ atomic_t nr_running ____cacheline_aligned_in_smp;}在init_workqueue()->create_and_start_worker函数中,worker 会通过 create_worker(pool) 创建。 create_worker()函数生成一个worker数据结构,并把worker和 worker_pool相关联。
worker->pool = worker_pool//worker->task 保存生成的taskworker->task = kthread_create_on_node(worker_thread, worker, pool->node, "kworker/%s", id_buf);然后在start_worker()中:
worker->flags |= WORKER_STARTED worker->flags |= WORKER_IDLEpool->nr_idle++; //worker_pool->nr_idle++worker->pool->nr_workers++ //worker_pool->nr_worker++worker->last_active = jiffies //保存idle的时候的时间list_add(&worker->entry, &pool->idle_list) //worker->entry关联到pool->idle_list中wake_up_PRocess(worker->task)//开始worker相关的task[kworker kworker/0:0H 等 ]cpu 0,根据cpu0的两个work_pool,创建worker,然后根据nice的值创建两个task,然后建立worker和work_pool的连接。
create_and_start_worker()的create_worker()中1) worker = alloc_worker();2) id = idr_alloc(&pool->worker_idr, NULL, 0, 0, GFP_NOWAIT); worker->id = id;3) worker->task = kthread_create_on_node(worker_thread, worker, pool->node, "kworker/%s", id_buf); set_user_nice(worker->task, pool->attrs->nice); set_cpus_allowed_ptr(worker->task, pool->attrs->cpumask);create_and_start_worker()的start_worker()中1) worker->flags |= WORKER_STARTED;2) worker->pool->nr_workers++;3) worker_enter_idle()中 worker->flags |= WORKER_IDLE; pool->nr_idle++; worker->last_active = jiffies; list_add(&worker->entry, &pool->idle_list);以下是对应的cpu0生成两个task的log<6>[0.009321] [0:swapper/0:1] create_worker pool->cpu = 0, pool->attrs->nice = 0<6>[0.009512] [0:swapper/0:1] create_worker id_buf = 0:0 //kworker/0:0<6>[0.009781] [0:swapper/0:1] for_each_cpu_worker_pool 1 cpu = 0<6>[0.009803] [0:swapper/0:1] create_worker pool->cpu = 0, pool->attrs->nice = -20<6>[0.009864] [0:swapper/0:1] create_worker id_buf = 0:0H//kworker/0:0H在CPU_UP_PREPARE 的时候,根据cpu值和对应cpu的pool->attrs->nice 的值创建kworker。和上面的cpu0的一样。 每个cpu生成两个kworker,nice的值都是0和-20。分别生成1:0,1:0H,依次类推。
<6>[0.046226] [0:swapper/0:1] workqueue_cpu_up_callback CPU_UP_PREPARE pool->cpu =1 , pool->nr_workers = 0<6>[0.046253] [0:swapper/0:1] create_worker pool->cpu = 1, pool->attrs->nice = 0<6>[0.046326] [0:swapper/0:1] create_worker id_buf = 1:0<6>[0.046629] [0:swapper/0:1] workqueue_cpu_up_callback CPU_UP_PREPARE pool->cpu =1 , pool->nr_workers = 0<6>[0.046653] [0:swapper/0:1] create_worker pool->cpu = 1, pool->attrs->nice = -20<6>[0.046720] [0:swapper/01] create_worker id_buf = 1:0H<6>[0.055634] [1:swapper/1:0] CPU1: thread -1, cpu 1, socket 0, mpidr 80000001<6>[0.056937] [0:swapper/0:1] workqueue_cpu_up_callback CPU_UP_PREPARE pool->cpu =2 , pool->nr_workers = 0<6>[0.056970] [0:swapper/0:1] create_worker pool->cpu = 2, pool->attrs->nice = 0<6>[0.057041] [0:swapper/0:1] create_worker id_buf = 2:0<6>[0.057232] [0:swapper/0:1] workqueue_cpu_up_callback CPU_UP_PREPARE pool->cpu =2 , pool->nr_workers = 0<6>[0.057257] [0:swapper/0:1] create_worker pool->cpu = 2, pool->attrs->nice = -20<6>[0.057402] [0:swapper/0:1] create_worker id_buf = 2:0H<6>[0.066358] [2:swapper/2:0] CPU2: thread -1, cpu 2, socket 0, mpidr 80000002<6>[0.067545] [0:swapper/0:1] workqueue_cpu_up_callback CPU_UP_PREPARE pool->cpu =3 , pool->nr_workers = 0<6>[0.067578] [0:swapper/0:1] create_worker pool->cpu = 3, pool->attrs->nice = 0<6>[0.067652] [0:swapper/0:1] create_worker id_buf = 3:0<6>[0.067838] [0:swapper/0:1] workqueue_cpu_up_callback CPU_UP_PREPARE pool->cpu =3 , pool->nr_workers = 0<6>[0.067862] [0:swapper/0:1] create_worker pool->cpu = 3, pool->attrs->nice = -20<6>[0.067930] [0:swapper/0:1] create_worker id_buf = 3:0H<6>[0.076797] [3:swapper/3:0] CPU3: thread -1, cpu 3, socket 0, mpidr 80000003static int __cpuinit workqueue_cpu_up_callback(struct notifier_block *nfb, unsigned long action, void *hcpu){ ... switch (action & ~CPU_TASKS_FROZEN) { case CPU_UP_PREPARE: for_each_cpu_worker_pool(pool, cpu) { pr_info("workqueue_cpu_up_callback CPU_UP_PREPARE pool->cpu =%d , pool->nr_workers = %d/n", pool->cpu ,pool->nr_workers); if (pool->nr_workers) continue; if (create_and_start_worker(pool) < 0) return NOTIFY_BAD; } break; ...}什么情况下调用get_unbound_pool()函数还需确认??
<6>[0.010539] [0:swapper/0:1] get_unbound_pool !!!!<6>[0.010562] [0:swapper/0:1] create_worker pool->cpu = -1, pool->attrs->nice = 0<6>[0.010623] [0:swapper/0:1] create_worker id_buf = u8:0....<6>[0.537739] [1:kworker/u8:0:6] create_worker pool->cpu = -1, pool->attrs->nice = 0<6>[0.537784] [1:kworker/u8:0:6] create_worker id_buf = u8:1....<6>[0.539426] [0:swapper/0:1] get_unbound_pool !!!!<6>[0.539450] [0:swapper/0:1] create_worker pool->cpu = -1, pool->attrs->nice = -20<6>[0.539520] [0:swapper/0:1] create_worker id_buf = u9:0<6>[0.540053] [1:kworker/u9:0: 42] create_worker pool->cpu = -1, pool->attrs->nice = -20<6>[0.540095] [1:kworker/u9:0: 42] create_worker id_buf = u9:1在每个kworker运行的时候,也就是在worker_thread()函数中,通过一些条件来判断是否需要创建一个新的worker。
static int worker_thread(void *__worker){ ... //pool->idle_list链表去掉这个当前worker,pool->nr_idle--; worker_leave_idle(worker);recheck: //pool->worklist是否为空或者pool->nr_running是否为0 if (!need_more_worker(pool)) goto sleep; //pool->worklist不为空且pool->nr_running非0的时候会跑到这里 //may_start_working()判断pool->nr_idle是否是0, //pool->nr_idle为0的话就调用manage_workers()函数 /* do we need to manage? */ if (unlikely(!may_start_working(pool)) && manage_workers(worker)) goto recheck;sleep: //pool->worklist为空或者pool->nr_running为0的时候跑到sleep这里, //pool->worklist为空且pool->nr_running非0且pool->nr_idle==0的时候调用manage_worker() if (unlikely(need_to_manage_workers(pool)) && manage_workers(worker)) goto recheck; /* static bool need_to_manage_workers(struct worker_pool *pool) { return need_to_create_worker(pool) || (pool->flags & POOL_MANAGE_WORKERS); } //pool->worklist不为空且pool->nr_running非0的时候need_more_worker()返回true //may_start_working()在pool->nr_idle==0的时候为false, //所以need_to_create_worker()函数在pool->worklist为空且pool->nr_running非0且 //pool->nr_idle==0 的时候返回true。 static bool need_to_create_worker(struct worker_pool *pool) { return need_more_worker(pool) && !may_start_working(pool); } */ worker_enter_idle(worker); __set_current_state(TASK_INTERRUPTIBLE); spin_unlock_irq(&pool->lock); schedule(); goto woke_up;}根据在create_worker()函数以及在函数中加的log,总结create_and_start_worker()函数做的事情 1)根据每个cpu的work_pool,通过create_worker()函数创建一个worker,把pool, idr_alloc(&pool->worker_idr, NULL, 0, 0, GFP_NOWAIT)算出来的结果等都保存在worker结构体中
worker = alloc_worker();//创建worker worker->pool = pool; //分配id保存在worker->id中 id = idr_alloc(&pool->worker_idr, NULL, 0, 0, GFP_NOWAIT); worker->id = id; //创建task并保存,设置nice的值 worker->task = kthread_create_on_node(worker_thread, worker, pool->node, "kworker/%s", id_buf); set_user_nice(worker->task, pool->attrs->nice); set_cpus_allowed_ptr(worker->task, pool->attrs->cpumask); worker->task->flags |= PF_NO_SETAFFINITY; if (pool->flags & POOL_DISASSOCIATED) worker->flags |= WORKER_UNBOUND;2)根据上面创建的worker,调用start_worker()函数开始相关的worker
worker->flags |= WORKER_STARTED; worker->pool->nr_workers++; worker->flags |= WORKER_IDLE; //增加pool中idle状态的worker的个数 pool->nr_idle++; //标记worker开始的时间 worker->last_active = jiffies; //worker连到pool->idle_list的链表中 list_add(&worker->entry, &pool->idle_list);alloc_workqueue()函数分配workqueue_struct是一个比较费时的工作,所以在init_workqueues()的最后,会调用alloc_workqueue()函数分配几个workqueue以便使用。
system_wq = alloc_workqueue("events", 0, 0); system_highpri_wq = alloc_workqueue("events_highpri", WQ_HIGHPRI, 0); system_long_wq = alloc_workqueue("events_long", 0, 0); system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND, WQ_UNBOUND_MAX_ACTIVE); system_freezable_wq = alloc_workqueue("events_freezable", WQ_FREEZABLE, 0);其中最长用到的就是下面的叫events的workqueue。 system_wq = alloc_workqueue(“events”, 0, 0) ; 以下看一下alloc_workqueue()函数都做什么事情
1.分配一个workqueue_struct数据结构,初始化链表以及设置名字等等
wq = kzalloc(sizeof(*wq) + tbl_size, GFP_KERNEL); va_start(args, lock_name); vsnprintf(wq->name, sizeof(wq->name), fmt, args); va_end(args); max_active = max_active ?: WQ_DFL_ACTIVE; max_active = wq_clamp_max_active(max_active, flags, wq->name); /* init wq */ wq->flags = flags; wq->saved_max_active = max_active; mutex_init(&wq->mutex); atomic_set(&wq->nr_pwqs_to_flush, 0); INIT_LIST_HEAD(&wq->pwqs); INIT_LIST_HEAD(&wq->flusher_queue); INIT_LIST_HEAD(&wq->flusher_overflow); INIT_LIST_HEAD(&wq->maydays); lockdep_init_map(&wq->lockdep_map, lock_name, key, 0); INIT_LIST_HEAD(&wq->list);2.调用 alloc_and_link_pwqs() 函数,按flags 为0的流程看一下过程。 1) 分配一个wq->cpu_pwqs
wq->cpu_pwqs = alloc_percpu(struct pool_workqueue)//这个数据结构是pool_workqueue,是per_cpu变量。2) 对应每个cpu的 pool_workqueue 和 worker_pool[prio] 关联起来。
//表示根据priority,也就是nice的值,把相应cpu的worker_pool赋值给pool_workqueue->pool。pool_workqueue->pool = worker_pool[highpri?]//每个cpu对应的pool_workqueue都可以访问到这个workqueue_structpool_workqueue->wq = workqueue_struct由于 workqueue_struct->cpu_pwqs 保存着pool_workqueue这个per_cpu变量。 所以通过 workqueue_struct 就可以找到 worker_pool。
3) 调用 link_pwq() 中的 list_add_rcu(&pwq->pwqs_node, &wq->pwqs)
//link_pwq()函数中,pool_workqueue会被链接到workqueue_struct->pwqs中 list_add_rcu(&pwq->pwqs_node, &wq->pwqs); //workqueue_struct->pwqs保存 pool_workqueuestatic int alloc_and_link_pwqs(struct workqueue_struct *wq){ bool highpri = wq->flags & WQ_HIGHPRI; int cpu, ret; if (!(wq->flags & WQ_UNBOUND)) { wq->cpu_pwqs = alloc_percpu(struct pool_workqueue); if (!wq->cpu_pwqs) return -ENOMEM; for_each_possible_cpu(cpu) { struct pool_workqueue *pwq = per_cpu_ptr(wq->cpu_pwqs, cpu); //每个cpu对应的workqueue_struct->pool_workqueue和对应cpu的cpu_worker_pools struct worker_pool *cpu_pools = per_cpu(cpu_worker_pools, cpu); init_pwq(pwq, wq, &cpu_pools[highpri]); //每个cpu对应的workqueue_struct和 cpu_worker_pools 放到 pool_workqueue中!! //pool_workqueue->pool 放 cpu_worker_pools , pool_workqueue->wq 放 worker_pool mutex_lock(&wq->mutex); //再把pool_workqueue通过 pool_workqueue->pwqs_node加到workqueue_struct->pwqs 中! link_pwq(pwq); mutex_unlock(&wq->mutex); } return 0; } else if (wq->flags & __WQ_ORDERED) { ret = apply_workqueue_attrs(wq, ordered_wq_attrs[highpri]); /* there should only be single pwq for ordering guarantee */ WARN(!ret && (wq->pwqs.next != &wq->dfl_pwq->pwqs_node || wq->pwqs.prev != &wq->dfl_pwq->pwqs_node), "ordering guarantee broken for workqueue %s/n", wq->name); return ret; } else { return apply_workqueue_attrs(wq, unbound_std_wq_attrs[highpri]); }}上面讲了worker的初始化等,但每个worker只是一个进程一直在那里跑,work才是需要做的实际的工作。 下面来看一下work的初始化以及怎么选worker并加入到worker里边去的。 以下以idletimer_tg_work()为例,看一下一个work的初始化:
INIT_WORK(&info->timer->work, idletimer_tg_work);#define INIT_WORK(_work, _func) / do { / __INIT_WORK((_work), (_func), 0); / } while (0)#define __INIT_WORK(_work, _func, _onstack) / do { / __init_work((_work), _onstack); / (_work)->data = (atomic_long_t) WORK_DATA_INIT(); / INIT_LIST_HEAD(&(_work)->entry); / PREPARE_WORK((_work), (_func)); / } while (0)#endif//work_struct结构体struct work_struct { atomic_long_t data; struct list_head entry; work_func_t func;#ifdef CONFIG_LOCKDEP struct lockdep_map lockdep_map;#endif};有需要这个work工作的时候就调用schedule_work(&timer->work),把work加入到workqueue_struct
schedule_work()最终会调用下面的函数,在wq->cpu_pwqs取出对应的pool_workqueue,经过中间的一些步骤,最后检查pool_workqueue->nr_active的个数,最终调用insert_work函数把work加入到pool_workqueue->pool->worklist或者加到pool_workqueue->delayed_worksstatic void __queue_work(int cpu, struct workqueue_struct *wq, struct work_struct *work){ if (!(wq->flags & WQ_UNBOUND)) pwq = per_cpu_ptr(wq->cpu_pwqs, cpu); else pwq = unbound_pwq_by_node(wq, cpu_to_node(cpu)); last_pool = get_work_pool(work); if (last_pool && last_pool != pwq->pool) { struct worker *worker; spin_lock(&last_pool->lock); worker = find_worker_executing_work(last_pool, work); if (worker && worker->current_pwq->wq == wq) { pwq = worker->current_pwq; } else { /* meh... not running there, queue here */ spin_unlock(&last_pool->lock); spin_lock(&pwq->pool->lock); } } else { spin_lock(&pwq->pool->lock); } //查看当前pool_workqueue的nr_active,选择要加的worklist if (likely(pwq->nr_active < pwq->max_active)) { trace_workqueue_activate_work(work); pwq->nr_active++; worklist = &pwq->pool->worklist; } else { work_flags |= WORK_STRUCT_DELAYED; worklist = &pwq->delayed_works; } //把当前的work连到pool_workqueue->worklist中 insert_work(pwq, work, worklist, work_flags); spin_unlock(&pwq->pool->lock);}最终在创建的kworker进程的函数worker_thread()中,会从pool->worklist中选出一个并进行调用 struct work_struct *work = list_first_entry(&pool->worklist, struct work_struct, entry);
新闻热点
疑难解答