ワークキュー


Rev.3を表示中。最新版はこちら

ワークキューはタスクレットと並んで遅延処理を行う手法です。タスクレットはそれをタスクとして起動するksoftirqdがすべてのタスクレット(他のソフト割り込みも)を担っていました。従ってどれか一つのタスクレットで遅延が発生すると、すべてに影響してしまうということです。それを回避する手段としてワークキュが誕生してということでしょうか。(たぶん)

IBMのワークキューの使い方のサンプルです。どのようになっているかと言うと、ワークキューを使用するにあたって、まずユーザが定義したworkqueue_structでcreate_workqueue関数でワークキューを作成します。そして遅延したい処理をwork_struct構造体にまとめてワークキューに登録していくと言う具合です。タスクレットではワークキューに相当するものが無く、いきなりタスクレットのリストに登録しています。ここが大きな違いで、実はワーキュキューというのはそのワークをリスト管理していると同時に、それを処理するスレッドも独自に有しているのです。しかもCPU毎に複数そのスレッドを持たせることも可能なのです。

create_workqueue(name)は__create_workqueue_key(name,0,0,&key,name)で呼ばれます。(たぶん)まずwqのworkqueue_struct構造体のkzalloc関数で確保し、wq->cpu_wqにCPU分のcpu_workqueue_struct構造体のインデックスを設定するための領域をalloc_percpu関数で割り当てています。そしてworkqueue_struct構造体でなく、このcpu_workqueue_struct構造体に実際の情報が設定されていきます。

singlethread==0の時、init_cpu_workqueue関数でのcpu_workqueue_struct構造体を初期化し、create_workqueue_thread関数でこのワークキューの処理を受け持つカーネルスレッドworker_threadを作成し、そのスレッドをcpu_workqueue_struct.threadに設定します。そしてstart_workqueue_thread関数でworker_threadを起動させています。

singlethread!=0の時、for_each_possible_cpu(cpu)で上の処理をCPU毎に行っています。CPU毎に独自のworker_threadが起動するということです。
kernel/workqueue.c
struct workqueue_struct *__create_workqueue_key(const char *name,
                                               int singlethread,
                                               int freezeable,
                                               struct lock_class_key *key,
                                               const char *lock_name)
{
       struct workqueue_struct *wq;
       struct cpu_workqueue_struct *cwq;
       int err = 0, cpu;

       wq = kzalloc(sizeof(*wq), GFP_KERNEL);
       if (!wq)
               return NULL;

       wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);
       if (!wq->cpu_wq) {
               kfree(wq);
               return NULL;
       }

       wq->name = name;
       lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
       wq->singlethread = singlethread;
       wq->freezeable = freezeable;
       INIT_LIST_HEAD(&wq->list);

       if (singlethread) {
               cwq = init_cpu_workqueue(wq, singlethread_cpu);
               err = create_workqueue_thread(cwq, singlethread_cpu);
               start_workqueue_thread(cwq, -1);
       } else {
               cpu_maps_update_begin();
               spin_lock(&workqueue_lock);
               list_add(&wq->list, &workqueues);
               spin_unlock(&workqueue_lock);
               for_each_possible_cpu(cpu) {
                       cwq = init_cpu_workqueue(wq, cpu);
                       if (err || !cpu_online(cpu))
                               continue;
                       err = create_workqueue_thread(cwq, cpu);
                       start_workqueue_thread(cwq, cpu);
               }
               cpu_maps_update_done();
       }

       if (err) {
               destroy_workqueue(wq);
               wq = NULL;
       }
       return wq;
}
そして作成されてワークキューに遅延処理情報をまとめてwork_struct構造体を、queue_work(struct workqueue_struct *wq, struct work_struct *work)で登録することでワークが遅延処理として起動されます。 queue_work関数はそのワーキがペンディングされているかとかのチェックを行って、最終的に insert_work(cwq, work, &cwq->worklist);が呼ばれ、そこでcpu_workqueue_struct.headをルートとするリストのエンドのワークの実態をリスト追加しています。
kernel/workqueue.c
static void insert_work(struct cpu_workqueue_struct *cwq,
                       struct work_struct *work, struct list_head *head)
{
       set_wq_data(work, cwq);
       smp_wmb();
       list_add_tail(&work->entry, head);
       wake_up(&cwq->more_work);
}
ワークはworker_threadの延長線上から関数コールされるわけでが。(タスクレットと同じ。)その処理を行うのがrun_workqueue関数です。cpu_workqueue_struct.run_depth>3なら、なにやらスタックをダンプしています。あんまし遅延があって、他のCPUから起動されてりして、3回以上再エントリーしたらちょっと問題じゃない。ってユーザに警告してくれているんでしょうね?。色々とチェックとかしていますが、要はlist_empty(&cwq->worklist)で、ワークのリストが無くなるまで、work_func_t f = work->funcで設定したワーク処理を、f(work)でコールすることでワーク処理が行われるようです。
kernel/workqueue.c
static void run_workqueue(struct cpu_workqueue_struct *cwq)
{
       spin_lock_irq(&cwq->lock);
       cwq->run_depth++;
       if (cwq->run_depth > 3) {
               /* morton gets to eat his hat */
               printk("%s: recursion depth exceeded: %d\n",
                       __func__, cwq->run_depth);
               dump_stack();
       }
       while (!list_empty(&cwq->worklist)) {
               struct work_struct *work = list_entry(cwq->worklist.next,
                                               struct work_struct, entry);
               work_func_t f = work->func;
               cwq->current_work = work;
               list_del_init(cwq->worklist.next);
               spin_unlock_irq(&cwq->lock);

               BUG_ON(get_wq_data(work) != cwq);
               work_clear_pending(work);
               lock_map_acquire(&cwq->wq->lockdep_map);
               lock_map_acquire(&lockdep_map);
               f(work);
               lock_map_release(&lockdep_map);
               lock_map_release(&cwq->wq->lockdep_map);

               if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
                       printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "
                                       "%s/0x%08x/%d\n",
                                       current->comm, preempt_count(),
                                       task_pid_nr(current));
                       printk(KERN_ERR "    last function: ");
                       print_symbol("%s\n", (unsigned long)f);
                       debug_show_held_locks(current);
                       dump_stack();
               }

               spin_lock_irq(&cwq->lock);
               cwq->current_work = NULL;
       }
       cwq->run_depth--;
       spin_unlock_irq(&cwq->lock);
}
singlethread==0の時(create_workqueue関数でワーク作成時)そのワークキュー内においてスリープするとその影響はそのワークキューで待つすべてのワークに影響します。従ってワークキューがスリープしてもいいと言うのは、linuxとして影響はないですが、そのあたりはユーザに責任でということでしょうか。

最終更新 2010/09/23 17:57:48 - north
(2010/09/23 17:48:15 作成)


検索

アクセス数
3689644
最近のコメント
コアダンプファイル - sakaia
list_head構造体 - yocto_no_yomikata
勧告ロックと強制ロック - wataash
LKMからのファイル出力 - 重松 宏昌
kprobe - ななし
ksetの実装 - スーパーコピー
カーネルスレッドとは - ノース
カーネルスレッドとは - nbyst
asmlinkageってなに? - ノース
asmlinkageってなに? - よろしく
Adsense
広告情報が設定されていません。