ワークキュー
Rev.1を表示中。最新版はこちら。
ワークキューはタスクレットと並んで遅延処理を行う手法です。タスクレットはそれをタスクとして起動するksoftirqdがすべてのタスクレット(他のソフト割り込みも)を担っていました。従ってどれか一つのタスクレットで遅延が発生すると、すべてに影響してしまうということです。それを回避する手段としてワークキュが誕生してということでしょうか。(たぶん)IBMのワークキューの使い方のサンプルです。どのようになっているかと言うと、ワークキューを使用するにあたって、まずユーザが定義したworkqueue_structでcreate_workqueue関数でワークキューを作成します。そして遅延したい処理をwork_struct構造体にまとめてワークキューに登録していくと言う具合です。タスクレットではワークキューに相当するものが無く、いきなりタスクレットのリストに登録しています。ここが大きな違いで、実はワーキュキューというのはそのワークをリスト管理していると同時に、それを処理するスレッドも独自に有しているのです。しかもCPU毎に複数そのスレッドを持たせることも可能なのです。
create_workqueue(name)は__create_workqueue_key(name,0,0,&key,name)で呼ばれます。(たぶん)まずwqのworkqueue_struct構造体のkzalloc関数で確保し、wq->cpu_wqにCPU分のcpu_workqueue_struct構造体のインデックスを設定するための領域をalloc_percpu関数で割り当てています。そしてworkqueue_struct構造体でなく、このcpu_workqueue_struct構造体に実際の情報が設定されていきます。
singlethread==0の時、init_cpu_workqueue関数でのcpu_workqueue_struct構造体を初期化し、create_workqueue_thread関数でこのワークキューの処理を受け持つカーネルスレッドworker_threadを作成し、そのスレッドをcpu_workqueue_struct.threadに設定します。そしてstart_workqueue_thread関数でworker_threadを起動させています。
singlethread!=0の時、for_each_possible_cpu(cpu)で上の処理をCPU毎に行っています。CPU毎に独自のworker_threadが起動するということです。
kernel/workqueue.c
struct workqueue_struct *__create_workqueue_key(const char *name,
int singlethread,
int freezeable,
struct lock_class_key *key,
const char *lock_name)
{
struct workqueue_struct *wq;
struct cpu_workqueue_struct *cwq;
int err = 0, cpu;
wq = kzalloc(sizeof(*wq), GFP_KERNEL);
if (!wq)
return NULL;
wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);
if (!wq->cpu_wq) {
kfree(wq);
return NULL;
}
wq->name = name;
lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
wq->singlethread = singlethread;
wq->freezeable = freezeable;
INIT_LIST_HEAD(&wq->list);
if (singlethread) {
cwq = init_cpu_workqueue(wq, singlethread_cpu);
err = create_workqueue_thread(cwq, singlethread_cpu);
start_workqueue_thread(cwq, -1);
} else {
cpu_maps_update_begin();
spin_lock(&workqueue_lock);
list_add(&wq->list, &workqueues);
spin_unlock(&workqueue_lock);
for_each_possible_cpu(cpu) {
cwq = init_cpu_workqueue(wq, cpu);
if (err || !cpu_online(cpu))
continue;
err = create_workqueue_thread(cwq, cpu);
start_workqueue_thread(cwq, cpu);
}
cpu_maps_update_done();
}
if (err) {
destroy_workqueue(wq);
wq = NULL;
}
return wq;
}
そして作成されてワークキューに遅延処理情報をまとめてwork_struct構造体を、queue_work(struct workqueue_struct *wq, struct work_struct *work)で登録することでワークが遅延処理として起動されます。 queue_work関数はそのワーキがペンディングされているかとかのチェックを行って、最終的に insert_work(cwq, work, &cwq->worklist);が呼ばれ、そこでcpu_workqueue_struct.headをルートとするリストのエンドのワークの実態をリスト追加しています。
kernel/workqueue.c
static void insert_work(struct cpu_workqueue_struct *cwq,
struct work_struct *work, struct list_head *head)
{
set_wq_data(work, cwq);
smp_wmb();
list_add_tail(&work->entry, head);
wake_up(&cwq->more_work);
}
ワークはworker_threadの延長線上から関数コールされるわけでが。(タスクレットと同じ。)その処理を行うのがrun_workqueue関数です。cpu_workqueue_struct.run_depth>3なら、なにやらスタックをダンプしています。あんまし遅延があって、他のCPUから起動されてりして、3回以上再エントリーしたらちょっと問題じゃない。ってユーザに警告してくれているんでしょうね?。色々とチェックとかしていますが、要はlist_empty(&cwq->worklist)で、ワークのリストが無くなるまで、work_func_t f = work->funcで設定したワーク処理を、f(work)でコールすることでワーク処理が行われるようです。
kernel/workqueue.c
static void run_workqueue(struct cpu_workqueue_struct *cwq)
{
spin_lock_irq(&cwq->lock);
cwq->run_depth++;
if (cwq->run_depth > 3) {
/* morton gets to eat his hat */
printk("%s: recursion depth exceeded: %d\n",
__func__, cwq->run_depth);
dump_stack();
}
while (!list_empty(&cwq->worklist)) {
struct work_struct *work = list_entry(cwq->worklist.next,
struct work_struct, entry);
work_func_t f = work->func;
cwq->current_work = work;
list_del_init(cwq->worklist.next);
spin_unlock_irq(&cwq->lock);
BUG_ON(get_wq_data(work) != cwq);
work_clear_pending(work);
lock_map_acquire(&cwq->wq->lockdep_map);
lock_map_acquire(&lockdep_map);
f(work);
lock_map_release(&lockdep_map);
lock_map_release(&cwq->wq->lockdep_map);
if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "
"%s/0x%08x/%d\n",
current->comm, preempt_count(),
task_pid_nr(current));
printk(KERN_ERR " last function: ");
print_symbol("%s\n", (unsigned long)f);
debug_show_held_locks(current);
dump_stack();
}
spin_lock_irq(&cwq->lock);
cwq->current_work = NULL;
}
cwq->run_depth--;
spin_unlock_irq(&cwq->lock);
}






