求知 文章 文库 Lib 视频 iPerson 课程 认证 咨询 工具 讲座 Modeler   Code  
会员   
 
  
 
 
     
   
分享到
Linux 的并发可管理工作队列机制探讨
 

作者:赵军 , 发布于2011-06-29 , IBM

 

简介: 本文简要分析比较了内核中的各种延迟机制,着重于工作队列的使用、实现以及限制,随后分析了自版本 2.6.36 出现的并发可管理工作队列的实现以及将来的应用前景。

并发可管理工作队列的出现

在内核代码中,经常希望延缓部分工作到将来某个时间执行,这样做的原因很多,比如:在持有锁时做大量(或者说费时的)工作不合适;或希望将工作聚集以获取批处理的性能;或调用了一个可能导致睡眠的函数使得在此时执行新调度非常不合适等。
内核中提供了许多机制来提供延迟执行,如中断的下半部处理可延迟中断上下文中的部分工作;定时器可指定延迟一定时间后执行某工作;工作队列则允许在进程上下文环境下延迟执行等。除此之外,内核中还曾短暂出现过慢工作机制 (slow work mechanism),还有异步函数调用 (asynchronous function calls) 以及各种私有实现的线程池等。在上面列出的如此多的内核基础组件中,使用最多则是工作队列。

工作队列 (workqueues)

在讨论之前,先定义几个内核中使用工作队列时用到的术语方便后面描述。

  • workqueues:所有工作项被 ( 需要被执行的工作 ) 排列于该队列,因此称作工作队列 (workqueues) 。
  • worker thread:工作者线程 (worker thread) 是一个用于执行工作队列中各个工作项的内核线程,当工作队列中没有工作项时,该线程将变为 idle 状态。
  • single threaded(ST)::工作者线程的表现形式之一,在系统范围内,只有一个工作者线程为工作队列服务
  • multi threaded(MT):工作者线程的表现形式之一,在多 CPU 系统上每个 CPU 上都有一个工作者线程为工作队列服务

工作队列之所以成为使用最多的延迟执行机制,得益于它的实现中的一些有意思的地方:

  • 使用的接口简单明了

对于使用者,基本上只需要做 3 件事情,依次为:

  • 创建工作队列 ( 如果使用内核默认的工作队列,连这一步都可以省略掉 )
  • 创建工作项
  • 向工作队列中提交工作项
  • 执行在进程上下文中,这样使得它可以睡眠,被调度及被抢占

执行在进程上下文中是一个非常大的优势,其他的下半部工作机制,基本上都运行于中断上下文中,我们知道在中断上下文里,不能睡眠

不能阻塞;原因是中断上下文并不与任何进程关联,如在中断上下文睡眠,调度器将不能将其唤醒,所以在中断上下文中不能有导致内核进入睡眠的行为,如持有信号量,执行非原子的内存分配等。工作队列运行于进程上下文中 ( 他们通过内核线程执行 ),因此它完全可以睡眠,可以被调度,也可以被其他进程所抢占。

  • 在多核环境下的使用也非常友好

与 tasklet 机制相较而言,工作队列可以在不同 CPU 上同时运行是个优势。这使得该接口在多核情况下也非常适合,内核邮件列表中就曾经有过用软中断和工作队列来替换不支持多 CPU 执行的 tasklet 的讨论。

总体说来,工作队列和定时器函数的处理有点类似,都是延迟执行相关的回调函数,但和定时器处理函数不同的是定时器回调函数只执行一次 ( 当然可以在执行时再次注册以反复调用,但这需要显示的再次注册 ), 且执行定时器回调函数时在时钟中断环境 , 限制较多,因此回调函数不能太复杂;而工作队列是通过内核线程实现,一直有效,可重复执行,执行时可以休眠,因此工作队列非常适合处理那些不是很紧急的任务,如垃圾回收处理等。

工作队列的使用和一些缺陷

之前简单讨论了工作队列使用上的便利性,依据工作队列的使用步骤,在下面列出了在 2.6.36 之前提供的接口,并描述了使用时的一些选择。由于工作队列的实现中,已有默认的共享工作队列,因此在选择接口时,就出现了 2 种选择:要么使用内核已经提供的共享工作队列,要么自己创建工作队列。

如选择使用共享的工作队列,基本的步骤为:

1. 创建工作项

创建工作项的接口分为静态和动态方式,接口分别是:

清单 1. 静态创建工作项

typedef void (*work_func_t)(struct work_struct *work);
DECLARE_WORK(name, func);
DECLARE_DELAYED_WORK(name, func);

该系列宏静态创建一个以 name 命名的工作项,并设置了回调函数 func

清单 2. 动态创建工作项

INIT_WORK(struct work_struct work, work_func_t func);
PREPARE_WORK(struct work_struct work, work_func_t func);
INIT_DELAYED_WORK(struct delayed_work work, work_func_t func);
PREPARE_DELAYED_WORK(struct delayed_work work, work_func_t func);

该系列宏在运行时初始化工作项 work,并设置了回调函数 func

2. 调度工作项

清单 3. 调度工作项

int schedule_work(struct work_struct *work);
int schedule_delayed_work(struct delayed_work *work, unsigned long delay);

上面两个函数将工作项添加到共享的工作队列,工作项随后在某个合适时机将被执行。

如果因为某些原因,如需要执行的是个阻塞性质的任务而不愿或不能使用内核提供的共享工作队列,这时需要自己创建工作队列,则上述步骤和使用的接口则略有改变:

3. 创建工作队列

在 2.6.36 之前,内核中的每个工作队列都有一个专用的内核线程来为它服务,创建工作队列时,有 2 个选择,可选择系统范围内的 ST,也可选择每 CPU 一个内核线程的 MT,其接口如下:

清单 4. 创建工作队列

create_singlethread_workqueue(name)
create_workqueue(name)

相对于 create_singlethread_workqueue,create_workqueue 同样会分配一个 wq 的工作队列。不同之处在于,对于多 CPU 系统而言,对每一个 active 的 CPU,都会为之创建一个 per-CPU 的 cwq 结构,对应每一个 cwq,都会生成一个新的 worker_thread。

4. 创建工作项

创建工作项的接口和使用内核默认的共享工作队列时是一样的。

向工作队列提交工作项

清单 5. 向工作队列中提交工作项

int queue_work(workqueue_t *queue, work_t *work);
int queue_delayed_work(workqueue_t *queue, work_t *work, unsigned long delay);

它们都会将工作项 work 提交到工作队列 queue,但第二个函数确保最少延迟 delay jiffies 之后该工作才会被执行。对于 MT 的情况,当用 queue_work 向 cwq 上提交工作项节点时, 是哪个 active CPU 正在调用该函数,那么便向该 CPU 对应的 cwq 上的 worklist 上增加工作项节点。

假如你需要取消一个挂起的工作队列中的工作项 , 你可以调用:

清单 6. 取消工作队列中挂起的工作项

int cancel_delayed_work(struct work_struct *work);

如果这个工作项在它开始执行前被取消,返回值是非零。内核保证给定工作项的执行不会在调用 cancel_delay_work 成功后被执行。 如果 cancel_delay_work 返回 0,则这个工作项可能已经运行在一个不同的处理器,并且仍然可能在调用 cancel_delayed_work 之后被执行。要绝对确保工作函数没有在 cancel_delayed_work 返回 0 后在任何地方运行,你必须跟随这个调用之后接着调用 flush_workqueue。在 flush_workqueue 返回后。任何在改调用之前提交的工作函数都不会在系统任何地方运行。

当你结束对一个工作队列的使用后,你可以使用下面的函数释放相关资源:

清单 7. 释放工作队列

void destroy_workqueue(struct workqueue_struct *queue);

前面比较了工作队列与其他基于中断上下文的延迟机制之间的优势,但工作队列并非没有缺点。首先是公共的共享工作队列不能提供更多的好处,因为如果其中的任一工作项阻塞,则其他工作项将不能被执行,因此在实际的使用中,使用者多会自己创建工作队列,而这又导致下面的一些问题:

MT 的工作队列导致了内核的线程数增加得非常的快,这样带来一些问题:一个是占用了 pid 数目,这对于服务器可不是一个好消息,因为 pid 实际上是一种全局资源;而大量的工作线程对于资源的竞争也导致了无效的调度,而这些调度其实是不需要的,对调度器也带来了压力。
现有的工作队列机制某些情况下有导致死锁的倾向,特别是在两个工作项之间存在依赖时。如果你曾经调试过这种偶尔出现的死锁,会知道这种问题让人非常的沮丧。

并发可管理工作队列 (Concurrency-managed workqueues)

在 2.6.36 之前的工作队列,其核心是每个工作队列都有专有的内核线程为其服务——系统范围内的 ST 或每个 CPU 都有一个内核线程的 MT。新的 cmwq 在实现上摒弃了这一点,不再有专有的线程与每个工作队列关联,事实上,现在变成了 Online CPU number + 1 个线程池来为工作队列服务,这样将线程的管理权实际上从工作队列的使用者交还给了内核。当一个工作项被创建以及排队,将在合适的时机被传递给其中一个线程,而 cmwq 最有意思的改变是:被提交到相同工作队列,相同 CPU 的工作项可能并发执行,这也是命名为并发可管理工作队列的原因。

cmwq 的实现遵循了以下几个原则:

  • 与原有的工作队列接口保持兼容,cmwq 只是更改了创建工作队列的接口,很容易移植到新的接口。
  • 工作队列共享 per-CPU 的线程池,提供灵活的并发级别而不再浪费大量的资源。
  • 自动平衡工作者线程池和并发级别,这样工作队列的用户不再需要关注如此多的细节。

在工作队列的用户眼中,cmwq 与之前的工作队列相比,创建工作队列的接口实现的后端有所改变,现在的新接口为:

清单 8. cmwq 中创建工作队列的后端接口

struct workqueue_struct
*alloc_workqueue(char *name, unsigned int flags, int max_active);

其中:

name:为工作队列的名字,而不像 2.6.36 之前实际是为工作队列服务的内核线程的名字。

flag 指明工作队列的属性,可以设定的标记如下:

  • WQ_NON_REENTRANT:默认情况下,工作队列只是确保在同一 CPU 上不可重入,即工作项不能在同一 CPU 上被多个工作者线程并发执行,但容许在多个 CPU 上并发执行。但该标志标明在多个 CPU 上也是不可重入的,工作项将在一个不可重入工作队列中排队,并确保至多在一个系统范围内的工作者线程被执行。
  • WQ_UNBOUND:工作项被放入一个由特定 gcwq 服务的未限定工作队列,该客户工作者线程没有被限定到特定的 CPU,这样,未限定工作者队列就像简单的执行上下文一般,没有并发管理。未限定的 gcwq 试图尽可能快的执行工作项。
  • WQ_FREEZEABLE:可冻结 wq 参与系统的暂停操作。该工作队列的工作项将被暂停,除非被唤醒,否者没有新的工作项被执行。
  • WQ_MEM_RECLAIM:所有的工作队列可能在内存回收路径上被使用。使用该标志则保证至少有一个执行上下文而不管在任何内存压力之下。
  • WQ_HIGHPRI:高优先级的工作项将被排练在队列头上,并且执行时不考虑并发级别;换句话说,只要资源可用,高优先级的工作项将尽可能快的执行。高优先工作项之间依据提交的顺序被执行。
  • WQ_CPU_INTENSIVE:CPU 密集的工作项对并发级别并无贡献,换句话说,可运行的 CPU 密集型工作项将不阻止其它工作项。这对于限定得工作项非常有用,因为它期望更多的 CPU 时钟周期,所以将它们的执行调度交给系统调度器。

ax_active:决定了一个 wq 在 per-CPU 上能执行的最大工作项。比如 max_active 设置为 16 表示一个工作队列上最多 16 个工作项能同时在 per-CPU 上同时执行。当前实行中,对所有限定工作队列,max_active 的最大值是 512,而设定为 0 时表示是 256;而对于未限定工作队列,该最大值为:MAX[512,4 * num_possible_cpus() ],除非有特别的理由需要限流或者其它原因,一般设定为 0 就可以了。

cmwq 本质上是提供了一个公共的内核线程池的实现,其接口基本上和以前保持了兼容,只是更改了创建工作队列的函数的后端,它实际上是将工作队列和内核线程的一一绑定关系改为由内核来管理内核线程的创建,因此在 cmwq 中创建工作队列并不意味着一定会创建内核线程。
而之前的接口的则改为基于 alloc_workqueue 来实现。

清单 9. 基于新后端接口的实现

#define create_workqueue(name) \
alloc_workqueue((name), WQ_MEM_RECLAIM, 1)
#define create_freezeable_workqueue(name) \
alloc_workqueue((name), WQ_FREEZEABLE | WQ_UNBOUND | WQ_MEM_RECLAIM, 1)
#define create_singlethread_workqueue(name) \
alloc_workqueue((name), WQ_UNBOUND | WQ_MEM_RECLAIM, 1)

并发可管理工作队列的实现

调度器中的 hook 函数

为了知道工作者线程何时将睡眠或被唤醒,在内核中增加了一个 PF_WQ_WORKER 类型的标记,表明是工作者线程,并且添加了 2 个 hook 函数到当前的调度器中。

清单 10. 调度器中的 hook 函数

void wq_worker_waking_up(struct task_struct *task, unsigned int cpu);
struct task_struct *wq_worker_sleeping(struct task_struct *task, unsigned int cpu);

其中 wq_worker_waking_up 在一个工作者线程被唤醒时在 try_to_wake_up/try_to_wake_up_local 中被调用。而 wq_worker_sleeping 则在 schedule () 中被调用,表明该工作者线程将会睡眠,返回值是一个 task,它可在相同的 CPU 上被 try_to_wake_up_local 用来唤醒。现在 2 个 hook 函数都是硬编码在内核的调度器中,后续可能会以其它形式改变其实现方式。

并发可管理工作队列的后端 gcwq

在 cmwq 的实现中,最重要的是其后端 gcwq:

清单 11. gcwq

/*
* Global per-cpu workqueue. There's one and only one for each cpu
* and all works are queued and processed here regardless of their
* target workqueues.
*/
struct global_cwq {
spinlock_t lock; /* the gcwq lock */
struct list_head worklist; /* L: list of pending works */
unsigned int cpu; /* I: the associated cpu */
unsigned int flags; /* L: GCWQ_* flags */ int nr_workers; /* L: total number of workers */
int nr_idle; /* L: currently idle ones */ /* workers are chained either in the idle_list or busy_hash */
struct list_head idle_list; /* X: list of idle workers */
struct hlist_head busy_hash[BUSY_WORKER_HASH_SIZE];
/* L: hash of busy workers */ struct timer_list idle_timer; /* L: worker idle timeout */
struct timer_list mayday_timer; /* L: SOS timer for dworkers */ struct ida worker_ida; /* L: for worker IDs */ // 为了实现 CPU 热插拔时候的委托机制
struct task_struct *trustee; /* L: for gcwq shutdown */
unsigned int trustee_state; /* L: trustee state */
wait_queue_head_t trustee_wait; /* trustee wait */
struct worker *first_idle; /* L: first idle worker */
} ____cacheline_aligned_in_smp;

它用来管理线程池,其数量为每个 CPU 一个 gcwq,还有一个特定的 gcwq 为未限定 (unbound) 工作队列的工作项服务。需要注意的是在 cmwq 中只有 Number of online CPU + 1 (unbound) 个线程池。由于计数从 0 开始,所以可能的线程池的数目最大为 NR_CPUS。由于涉及到 CPU 的热插拔问题,因此只有 online 的 CPU 上才有线程池与之绑定。

该结构体中的一些重要字段如下:

worklist:所有未决的工作项被链接在该链表中

cpu:表明该线程池和哪个 CPU 绑定,实现中有一个未绑定到任何 CPU 的 gcwq,其标记为 WORK_CPU_UNBOUND,在代码中,将这个未绑定到特定 CPU 的 gcwq 和绑定到 CPU 的 gcwq 一起处理,应此定义 WORK_CPU_UNBOUND = NR_CPUS,这也是代码中的一个小小的技巧

nr_workers:总的工作者线程数

nr_idle:当前的空闲工作者线程数

idle_list:空闲的工作者线程链接成该链表

busy_hash[BUSY_WORKER_HASH_SIZE] :正执行工作项任务的工作者线程放入该哈希表中

有了前面基础,我们可以开始看看 cmwq 的实现,根据以往的经验,从初始化部分开始:

清单 12. cmwq 的初始化

static int __init init_workqueues(void)
{
unsigned int cpu;
int i;
// 注册 CPU 事件的通知链,主要用于处理 CPU 热插拔时候,将该 CPU 上的工作队列迁移到 online 的 CPU
// 在 cmwq 中,将这种机制叫做 trustee
cpu_notifier(workqueue_cpu_callback, CPU_PRI_WORKQUEUE); // 初始化 CPU 数目 +1 个 gcwq
for_each_gcwq_cpu(cpu) {
struct global_cwq *gcwq = get_gcwq(cpu);
…… .
} // 初始化 online CPU 数目 +1 个工作者线程池
// 创建的线程命名方式如下 :
// 对于与 CPU 绑定的线程,以 ps 命令看到的为:[kworker/cup_id:thread_id],cup_id 为 CPU 的编号
// thread_id 为创建的工作者线程 id,对于未绑定的 CPU 的线程池中的线程,则显示为
//[kworker/u:thread_id]
for_each_online_gcwq_cpu(cpu) {
…… .
worker = create_worker(gcwq, true);
…… .
start_worker(worker);
…… .
} // 创建 4 个全局的工作队列
system_wq = alloc_workqueue("events", 0, 0);
system_long_wq = alloc_workqueue("events_long", 0, 0);
system_nrt_wq = alloc_workqueue("events_nrt", WQ_NON_REENTRANT, 0);
system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND,
WQ_UNBOUND_MAX_ACTIVE);
……
return 0;
}

工作者线程池的管理

为了实现工作者线程池,针对每个工作者线程,封装了一个结构体 worker 用于工作者线程的管理,如下:

清单 13. 工作者的管理结构体

struct worker {
// 与工作者线程的状态有关系,如果工作者线程处于 idle 状态,则使用 entry;如果处于 busy 状态,
// 则使用哈希节点 hentry,参考 gcwq 中的 idle_list 和 busy_hash 字段
union {
struct list_head entry; /* L: while idle */
struct hlist_node hentry; /* L: while busy */
};
……
// 被调度的工作项 list,注意只有进入到该列表,工作项才真正被工作队列处理
struct list_head scheduled; /* L: scheduled works */
// 被内核调度的实体,工作者线程在内核调度器看来只是一个 task 而已
struct task_struct *task; /* I: worker task */
struct global_cwq *gcwq; /* I: the associated gcwq */
/* 64 bytes boundary on 64bit, 32 on 32bit */
// 记录上次 active 的时间,用于判定该工作者线程是否可以被 destory 时使用
unsigned long last_active; /* L: last active timestamp */
unsigned int flags; /* X: flags */
// 工作者线程的 id,用 ps 命令在用户空间可以看到具体的值
int id; /* I: worker id */
struct work_struct rebind_work; /* L: rebind worker to cpu */

工作者线程池的主体执行是 worker_thread,其执行流程如下:

清单 14. 工作者线程的管理

static int worker_thread(void *__worker)
{
struct worker *worker = __worker;
struct global_cwq *gcwq = worker->gcwq;
// 告诉调度器这是一个工作者线程
worker->task->flags |= PF_WQ_WORKER;
woke_up:
spin_lock_irq(&gcwq->lock);
……
// 让工作者从 idle 状态离开,因为新创建的工作者线程处于 idle 状态,在让该工作者线程工作时,需要从
// idle 状态离开以执行相关的动作
worker_leave_idle(worker);
recheck:
// 检查是否需要更多的工作者线程
// 检查的依据是如果有高优先级的工作,如果工作队列中有工作要做然而该 cpu 的全局队列中却已
// 经没有空闲处理内核线程,那就有必要处理了
if (!need_more_worker(gcwq))
goto sleep;
// may_start_working 检查 gcwq 中是否有 idle 的工作者线程
// manage_workers 在后面详述
if (unlikely(!may_start_working(gcwq)) && manage_workers(worker))
goto recheck;
// 确保工作者线程的被调度 list 为空
BUG_ON(!list_empty(&worker->scheduled));
// 设置标记表明工作者线程即将处理相关的工作项,类似于一个 busy 标记
worker_clr_flags(worker, WORKER_PREP);
// 基本流程为,先将工作项合并到工作者线程的被调度 list,然后依次处理被调度 list 的工作
do {
struct work_struct *work =
list_first_entry(&gcwq->worklist,
struct work_struct, entry);
if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) {
/* optimization path, not strictly necessary */
// 注意这里只是代码路径上的显示的优化,本质上并不需要该路径,else 的部分才是代码
// 逻辑的的所在,应此可以忽略这部分
process_one_work(worker, work);
if (unlikely(!list_empty(&worker->scheduled)))
process_scheduled_works(worker);
} else {
// 将 gcwq 的工作项移到工作线程的被调度列表,随后工作者线程将依序处理被调度 list
// 处理单项工作项时使用的是 process_one_work
move_linked_works(work, &worker->scheduled, NULL);
process_scheduled_works(worker);
}
} while (keep_working(gcwq));
worker_set_flags(worker, WORKER_PREP, false);
// 如果没有工作项需要处理,让工作者线程进入睡眠状态
sleep:
……
}

manage_workers 中处理需要被 destroy 的工作者线程,也决定是否需要创建新的工作者线程:在 maybe_destroy_workers 中去判定当工作线程数目是否被认定太多 ( 认定工作者线程过多的本质是个策略问题,实现者认为如果 idle 的工作者多余 1/4 个 busy 工作者就表示工作者线程过多 ),且该工作线程已经进入 idle 状态 5 分钟,则认定该工作者线程可以被 destroy;而 maybe_create_worker 决定是否需要创建新的工作者线程来为工作队列服务,判定的条件为 如果有高优先级的工作,或工作队列中有工作要做但该 CPU 的全局队列中却已经没有空闲处理内核线程,那就有必要去创建新的工作者线程了。

并发可管理工作队列的前景

并发可管理工作队列进入 mainline 的时间并不长,但已经快速替换了老的工作队列接口以及慢工作机制 (slow work mechanism),但这并不是它的唯一目标,它的长期目标则是希望在内核中提供一个通用的线程池机制,这样,工作队列的适用范围将更为普遍。

参考资料

学习

 
分享到
 
 
     


基于模型的整车电子电气架构设计
嵌入式设备上的 Linux 系统开发
Linux 的并发可管理工作队列
ARM嵌入式系统的问题总结分析
嵌入式系统设计与实例开发
WinCE6.0的EBOOT概要
更多...   


开发人员最需要具备的素质
好资料-Android开发指南中文版


开发技术讨论


UML +RoseRealtime+嵌入式
C++嵌入式系统开发
嵌入式白盒测试
手机软件测试
嵌入式软件测试
嵌入式操作系统VxWorks


中国航空 嵌入式C高质量编程
使用EA和UML进行嵌入式系统分析设计
基于SysML和EA的嵌入式系统建模
上海汽车 嵌入式软件架构设计
北京 嵌入式C高质量编程
北京 高质高效嵌入式开发
Nagra linux内核与设备驱动原理
更多...