ワークキュー
Rev.1を表示中。最新版はこちら。
ワークキューはタスクレットと並んで遅延処理を行う手法です。タスクレットはそれをタスクとして起動するksoftirqdがすべてのタスクレット(他のソフト割り込みも)を担っていました。従ってどれか一つのタスクレットで遅延が発生すると、すべてに影響してしまうということです。それを回避する手段としてワークキュが誕生してということでしょうか。(たぶん)IBMのワークキューの使い方のサンプルです。どのようになっているかと言うと、ワークキューを使用するにあたって、まずユーザが定義したworkqueue_structでcreate_workqueue関数でワークキューを作成します。そして遅延したい処理をwork_struct構造体にまとめてワークキューに登録していくと言う具合です。タスクレットではワークキューに相当するものが無く、いきなりタスクレットのリストに登録しています。ここが大きな違いで、実はワーキュキューというのはそのワークをリスト管理していると同時に、それを処理するスレッドも独自に有しているのです。しかもCPU毎に複数そのスレッドを持たせることも可能なのです。
create_workqueue(name)は__create_workqueue_key(name,0,0,&key,name)で呼ばれます。(たぶん)まずwqのworkqueue_struct構造体のkzalloc関数で確保し、wq->cpu_wqにCPU分のcpu_workqueue_struct構造体のインデックスを設定するための領域をalloc_percpu関数で割り当てています。そしてworkqueue_struct構造体でなく、このcpu_workqueue_struct構造体に実際の情報が設定されていきます。
singlethread==0の時、init_cpu_workqueue関数でのcpu_workqueue_struct構造体を初期化し、create_workqueue_thread関数でこのワークキューの処理を受け持つカーネルスレッドworker_threadを作成し、そのスレッドをcpu_workqueue_struct.threadに設定します。そしてstart_workqueue_thread関数でworker_threadを起動させています。
singlethread!=0の時、for_each_possible_cpu(cpu)で上の処理をCPU毎に行っています。CPU毎に独自のworker_threadが起動するということです。
kernel/workqueue.c struct workqueue_struct *__create_workqueue_key(const char *name, int singlethread, int freezeable, struct lock_class_key *key, const char *lock_name) { struct workqueue_struct *wq; struct cpu_workqueue_struct *cwq; int err = 0, cpu; wq = kzalloc(sizeof(*wq), GFP_KERNEL); if (!wq) return NULL; wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct); if (!wq->cpu_wq) { kfree(wq); return NULL; } wq->name = name; lockdep_init_map(&wq->lockdep_map, lock_name, key, 0); wq->singlethread = singlethread; wq->freezeable = freezeable; INIT_LIST_HEAD(&wq->list); if (singlethread) { cwq = init_cpu_workqueue(wq, singlethread_cpu); err = create_workqueue_thread(cwq, singlethread_cpu); start_workqueue_thread(cwq, -1); } else { cpu_maps_update_begin(); spin_lock(&workqueue_lock); list_add(&wq->list, &workqueues); spin_unlock(&workqueue_lock); for_each_possible_cpu(cpu) { cwq = init_cpu_workqueue(wq, cpu); if (err || !cpu_online(cpu)) continue; err = create_workqueue_thread(cwq, cpu); start_workqueue_thread(cwq, cpu); } cpu_maps_update_done(); } if (err) { destroy_workqueue(wq); wq = NULL; } return wq; }そして作成されてワークキューに遅延処理情報をまとめてwork_struct構造体を、queue_work(struct workqueue_struct *wq, struct work_struct *work)で登録することでワークが遅延処理として起動されます。 queue_work関数はそのワーキがペンディングされているかとかのチェックを行って、最終的に insert_work(cwq, work, &cwq->worklist);が呼ばれ、そこでcpu_workqueue_struct.headをルートとするリストのエンドのワークの実態をリスト追加しています。
kernel/workqueue.c static void insert_work(struct cpu_workqueue_struct *cwq, struct work_struct *work, struct list_head *head) { set_wq_data(work, cwq); smp_wmb(); list_add_tail(&work->entry, head); wake_up(&cwq->more_work); }ワークはworker_threadの延長線上から関数コールされるわけでが。(タスクレットと同じ。)その処理を行うのがrun_workqueue関数です。cpu_workqueue_struct.run_depth>3なら、なにやらスタックをダンプしています。あんまし遅延があって、他のCPUから起動されてりして、3回以上再エントリーしたらちょっと問題じゃない。ってユーザに警告してくれているんでしょうね?。色々とチェックとかしていますが、要はlist_empty(&cwq->worklist)で、ワークのリストが無くなるまで、work_func_t f = work->funcで設定したワーク処理を、f(work)でコールすることでワーク処理が行われるようです。
kernel/workqueue.c static void run_workqueue(struct cpu_workqueue_struct *cwq) { spin_lock_irq(&cwq->lock); cwq->run_depth++; if (cwq->run_depth > 3) { /* morton gets to eat his hat */ printk("%s: recursion depth exceeded: %d\n", __func__, cwq->run_depth); dump_stack(); } while (!list_empty(&cwq->worklist)) { struct work_struct *work = list_entry(cwq->worklist.next, struct work_struct, entry); work_func_t f = work->func; cwq->current_work = work; list_del_init(cwq->worklist.next); spin_unlock_irq(&cwq->lock); BUG_ON(get_wq_data(work) != cwq); work_clear_pending(work); lock_map_acquire(&cwq->wq->lockdep_map); lock_map_acquire(&lockdep_map); f(work); lock_map_release(&lockdep_map); lock_map_release(&cwq->wq->lockdep_map); if (unlikely(in_atomic() || lockdep_depth(current) > 0)) { printk(KERN_ERR "BUG: workqueue leaked lock or atomic: " "%s/0x%08x/%d\n", current->comm, preempt_count(), task_pid_nr(current)); printk(KERN_ERR " last function: "); print_symbol("%s\n", (unsigned long)f); debug_show_held_locks(current); dump_stack(); } spin_lock_irq(&cwq->lock); cwq->current_work = NULL; } cwq->run_depth--; spin_unlock_irq(&cwq->lock); }