rcu(READ COPY UPDATE)
Rev.3を表示中。最新版はこちら。
びーさんのLinuxのRCU実装の後追いです。struct rcu_data { long qsctr; /* User-mode/idle loop etc. */ long last_qsctr; /* value of qsctr at beginning */ /* of rcu grace period */ long batch; /* Batch # for current RCU batch */ struct list_head nxtlist; struct list_head curlist; }; struct rcu_ctrlblk { spinlock_t mutex; /* Guard this struct */ long curbatch; /* Current batch number. */ long maxbatch; /* Max requested batch number. */ cpumask_t rcu_cpu_mask; /* CPUs that need to switch in order */ /* for current batch to proceed. */ };カーネルが起動すると、rcu_init()でRCUの設定を行う。
asmlinkage void __init start_kernel(void) { : trap_init(); rcu_init(); init_IRQ(); pidhash_init(); sched_init(); softirq_init(); time_init(); : }
void __init rcu_init(void) { rcu_cpu_notify(&rcu_nb, CPU_UP_PREPARE, (void *)(long)smp_processor_id()); register_cpu_notifier(&rcu_nb); } static int __devinit rcu_cpu_notify(struct notifier_block *self, unsigned long action, void *hcpu) { long cpu = (long)hcpu; switch (action) { case CPU_UP_PREPARE: rcu_online_cpu(cpu); break; /* Space reserved for CPU_OFFLINE :) */ default: break; } return NOTIFY_OK; }tasklet_init()で、タスクレットがコールする関数がrcu_process_callbacks()となるように
、タスクレットの設定を行い、ローカルCPUのstruct rcu_dataのnxtlist/curlistを初期化する。
static void __devinit rcu_online_cpu(int cpu) { memset(&per_cpu(rcu_data, cpu), 0, sizeof(struct rcu_data)); tasklet_init(&RCU_tasklet(cpu), rcu_process_callbacks, 0UL); INIT_LIST_HEAD(&RCU_nxtlist(cpu)); INIT_LIST_HEAD(&RCU_curlist(cpu)); }本関数がタスクレットとして実行されるRCUのメイン処理である。ここではrcu_start_batch()でリストされている一連のコールバックを一気に行うための、リスト処理を行う。call_rcu()では、rcu_data->nxtlistにリストされていくが、実際に処理されるのはrcu_data->curlistとなる。そうする事で、リスト化と、そのリストの処理を平行して行うことができる。
rcu_data->curlistが空でないなら、そのリストをローカルのlistに繋ぎ変え、rcu_data->curlistを空にする。(なおこの時、rcu_ctrlblk.curbatch>rcu_data->batchでなければならない。(rcu_ctrlblkの変数の取り扱いが???。たぶんこの変数がRCUのキモでないかと・・・)
で、今度はrcu_data->nxtlistをrcu_data->curlistに繋ぎ変えて、rcu_do_batch()をコールすることで、コールバック関数が実行される。
static void rcu_process_callbacks(unsigned long unused) { int cpu = smp_processor_id(); LIST_HEAD(list); if (!list_empty(&RCU_curlist(cpu)) && rcu_batch_after(rcu_ctrlblk.curbatch, RCU_batch(cpu))) { list_splice(&RCU_curdlist(cpu), &list); INIT_LIST_HEAD(&RCU_curlist(cpu)); } local_irq_disable(); if (!list_empty(&RCU_nxtlist(cpu)) && list_empty(&RCU_curlist(cpu))) { list_splice(&RCU_nxtlist(cpu), &RCU_curlist(cpu)); INIT_LIST_HEAD(&RCU_nxtlist(cpu)); local_irq_enable(); /* * start the next batch of callbacks */ spin_lock(&rcu_ctrlblk.mutex); RCU_batch(cpu) = rcu_ctrlblk.curbatch + 1; rcu_start_batch(RCU_batch(cpu)); spin_unlock(&rcu_ctrlblk.mutex); } else { local_irq_enable(); } rcu_check_quiescent_state(); if (!list_empty(&list)) rcu_do_batch(&list); }call_rcu()でリストされたstruct rcu_headのコールバック関数をコールする。
static void rcu_do_batch(struct list_head *list) { struct list_head *entry; struct rcu_head *head; while (!list_empty(list)) { entry = list->next; list_del(entry); head = list_entry(entry, struct rcu_head, list); head->func(head->arg); } }ちなみにcall_rcu()は以下の通りで、ローカルCPU変数のrcu_data->nxtlistに、コールバック関数およびその引数を設定したheadをリストするだけです。
void call_rcu(struct rcu_head *head, void (*func)(void *arg), void *arg) { int cpu; unsigned long flags; head->func = func; head->arg = arg; local_irq_save(flags); cpu = smp_processor_id(); list_add_tail(&head->list, &RCU_nxtlist(cpu)); local_irq_restore(flags); }
補足
RCU(read copy update)と言うことで、リスト構造のデータの更新処理にかかる処理というイメージですが、更新処理と言うだけでなく、カーネル内では、単にリストから削除する。と言ったケースにもロックフリー処理として使われています。新しいノードに繋ぎ返るなり、そのノード自身を削除するなり、はcall_rcu()で設定されるコールバック関数の内容となるわけで、RCUの内部実装とは関係ないことです。プリエンプティブを不可にすると、そのタスクが動作しているCPU下で、ソフト割込みは発生しません。ただし、割込みは発生します。従ってタスクレットも動作されず、てプリエンプティブを可にするまでは、RCUの古いノードは保障されます。しかし他のCPUでは、ソフト割込みが発生します。ソフト割込みは複数のCPUで、並列動作が可能だからです。
びーさんの疑問で、他のCPUがrcu_read_lock()を呼び出していないということをどうやって保証しているのでしょうか。については私も疑問です。なんかrcu_nbが絡んでいるような気がしているのですが・・・。