メッセージ
Rev.1を表示中。最新版はこちら。
メッセージは以下の3つの構造体をベースに管理されています。struct msg_queueはその大元の構造体です。メッセージを作成すると、ipc下の空きスロットにこの構造体が設定され、対応するIDが返されてきます。この処理は共有メモリの処理と全く同じで、ipc下の空きスロットのポイント先が、struct msg_queu/の違いでしかありません。struct msg_queueで管理されるメッセージはstruct list_head q_messagesに設定されます。メッセージそのものはstruct msg_msgでパックされ、そのアドレスをstruct list_head q_messagesにリストしていくことになります。そして次のメッセージはstruct msg_msgのstruct list_head m_listにリストされていきます。
1メッセージが1ページを超える場合(正しくは、4096-struct msg_msgのサイズあるいは4096-struct msg_msgsegのサイズを超える場合)、超えた分はstruct msg_msgsegに蓄えられ、struct msg_msgのtruct msg_msgseg* nextないし、struct msg_msgsegのstruct msg_msgseg* nextへとリストされていきます。
struct list_head q_receiversはメッセージを受信したものの、そのメッセージが無い場合、このプロセスをウエイトする必要がある場合、このメンバーにリストされていきます。
struct list_head q_sendersは、メッセージを送信したものの、メッセージが所定の数を超えていた場合、このプロセスをリストするためのメンバーです。
struct msg_queue { struct kern_ipc_perm q_perm; time_t q_stime; /* last msgsnd time */ time_t q_rtime; /* last msgrcv time */ time_t q_ctime; /* last change time */ unsigned long q_cbytes; /* current number of bytes on queue */ unsigned long q_qnum; /* number of messages in queue */ unsigned long q_qbytes; /* max number of bytes on queue */ pid_t q_lspid; /* pid of last msgsnd */ pid_t q_lrpid; /* last receive pid */ struct list_head q_messages; struct list_head q_receivers; struct list_head q_senders; } struct msg_msg { struct list_head m_list; long m_type; int m_ts; /* message text size */ struct msg_msgseg* next; void *security; } struct msg_msgseg { struct msg_msgseg* next; }システムコールmsgsndの実処理はdo_msgsnd()です。まず引数のチェックを行った後、load_msg()でユーザからのメッセージをstruct msg_msg *msgに設定します。なおstruct msg_msgに、メッセージ文を設定するメンバーが無いのに?と言うことですが、kmalloc()する時に、この構造体サイズ+size_t msgszとしてメモリを取得しています。そしてstruct msg_msg *msgにメッセージタイプ/メッセージサイズを設定した後、msg_lock_check()でメッセージIDmsqidに対応するstruct msg_queue *msqを取得します。
forループの中が実際のメッセージ処理となります。まずセキュリティ絡みのチェックをしています。そしてメッセージサイズ/メッセージ数(?注)をチェックして、メッセージが送信可能ならforループを抜けます。以降はメッセージが送信できなかった場合の処理となります。この場合ss_add()でカレントプロセス(メッセージ送信プロセス)をstruct list_head q_sendersにリストした後、schedule()をコールする事で、他のプロセスに実行権を譲ります。そしてこのプロセスに実行権が戻ってきた時、先にstruct list_head q_sendersにリストしたプロセスを、ss_del()で削除し、再度forループを繰り返すことで、メッセージ送信処理を試みるといった具合です。
forループを抜けた処理はメッセージが送信できた処理となります。pipelined_send()でこのメッセージを受信しているプロセスがあるかどうかチェックを行い、待ちプロセスが有しているなら、そのプロセスを起床させています。もし、待ちプロセスが無いなら、送信メッセージをmsq->q_messagesにリストし、msq->q_cbytes += msgszで、ウエイトリストしたメッセージのバイト数を、msq->q_qnum++でメッセージ数を更新します。またネームスペース下のメンバーにも全メッセージのバイト数およびメッセージ数を更新しています。
long do_msgsnd(int msqid, long mtype, void __user *mtext, size_t msgsz, int msgflg) { struct msg_queue *msq; struct msg_msg *msg; int err; struct ipc_namespace *ns; ns = current->nsproxy->ipc_ns; if (msgsz > ns->msg_ctlmax || (long) msgsz < 0 || msqid < 0) return -EINVAL; if (mtype < 1) return -EINVAL; msg = load_msg(mtext, msgsz); if (IS_ERR(msg)) return PTR_ERR(msg); msg->m_type = mtype; msg->m_ts = msgsz; msq = msg_lock_check(ns, msqid); if (IS_ERR(msq)) { err = PTR_ERR(msq); goto out_free; } for (;;) { struct msg_sender s; err = -EACCES; if (ipcperms(&msq->q_perm, S_IWUGO)) goto out_unlock_free; err = security_msg_queue_msgsnd(msq, msg, msgflg); if (err) goto out_unlock_free; if (msgsz + msq->q_cbytes <= msq->q_qbytes && 1 + msq->q_qnum <= msq->q_qbytes) { break; } /* queue full, wait: */ if (msgflg & IPC_NOWAIT) { err = -EAGAIN; goto out_unlock_free; } ss_add(msq, &s); ipc_rcu_getref(msq); msg_unlock(msq); schedule(); ipc_lock_by_ptr(&msq->q_perm); ipc_rcu_putref(msq); if (msq->q_perm.deleted) { err = -EIDRM; goto out_unlock_free; } ss_del(&s); if (signal_pending(current)) { err = -ERESTARTNOHAND; goto out_unlock_free; } } msq->q_lspid = task_tgid_vnr(current); msq->q_stime = get_seconds(); if (!pipelined_send(msq, msg)) { /* noone is waiting for this message, enqueue it */ list_add_tail(&msg->m_list, &msq->q_messages); msq->q_cbytes += msgsz; msq->q_qnum++; atomic_add(msgsz, &ns->msg_bytes); atomic_inc(&ns->msg_hdrs); } err = 0; msg = NULL; out_unlock_free: msg_unlock(msq); out_free: if (msg != NULL) free_msg(msg); return err; }
補足
メッセージ送信可能かどうかのチェックとして、メッセージキュー内のメッセージバイト数およびメッセージ数のチェックを行っていました。メッセージを追加する毎に、msq->q_messagesにリストし、msq->q_qnum++としていました。すなわちmsq->q_qnumがメッセージ数と言うわけです。で・・・、メッセージ数のチェックと思われるforループの最初のif文で、1 + msq->q_qnum <= msq->q_qbytesのmsq->q_qbytesと比較するのは? msq->q_qbytesは全メッセージのバイト数なわけですから。
pipelined_send()でメッセージを受信しているプロセスの処理を行います。メッセージキューのq_receiversにリストされているプロセスです。そのプロセス1つづつをtestmsg()で受信プロセスかチェックします。受信プロセスなら、メッセージキューのq_receiversにリストされている、受信待ちリストからそのプロセスを削除し、wake_up_process()でそのプロセスを起床させています。
static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg) { struct list_head *tmp; tmp = msq->q_receivers.next; while (tmp != &msq->q_receivers) { struct msg_receiver *msr; msr = list_entry(tmp, struct msg_receiver, r_list); tmp = tmp->next; if (testmsg(msg, msr->r_msgtype, msr->r_mode) && !security_msg_queue_msgrcv(msq, msg, msr->r_tsk, msr->r_msgtype, msr->r_mode)) { list_del(&msr->r_list); if (msr->r_maxsize < msg->m_ts) { msr->r_msg = NULL; wake_up_process(msr->r_tsk); smp_mb(); msr->r_msg = ERR_PTR(-E2BIG); } else { msr->r_msg = NULL; msq->q_lrpid = task_pid_vnr(msr->r_tsk); msq->q_rtime = get_seconds(); wake_up_process(msr->r_tsk); smp_mb(); msr->r_msg = msg; return 1; } } } return 0; }testms()は、type/modeのより受信するかどうかのチェックしています。なお、引数modeは、受信システムコールmsgrcv()のlong msgtyp, int msgflgから、convert_mode()をコールすることで、SEARCH_ANY/SEARCH_EQUA/SEARCH_NOTEQUAL/SEARCH_LESSEQUAのいづれかが設定されるようになっています。
#define SEARCH_ANY 1 #define SEARCH_EQUAL 2 #define SEARCH_NOTEQUAL 3 #define SEARCH_LESSEQUAL 4 static int testmsg(struct msg_msg *msg, long type, int mode) { switch(mode) { case SEARCH_ANY: return 1; case SEARCH_LESSEQUAL: if (msg->m_type <=type) return 1; break; case SEARCH_EQUAL: if (msg->m_type == type) return 1; break; case SEARCH_NOTEQUAL: if (msg->m_type != type) return 1; break; } return 0; } static inline int convert_mode(long *msgtyp, int msgflg) { /* * find message of correct type. * msgtyp = 0 => get first. * msgtyp > 0 => get first message of matching type. * msgtyp < 0 => get message with least type must be < abs(msgtype). */ if (*msgtyp == 0) return SEARCH_ANY; if (*msgtyp < 0) { *msgtyp = -*msgtyp; return SEARCH_LESSEQUAL; } if (msgflg & MSG_EXCEPT) return SEARCH_NOTEQUAL; return SEARCH_EQUAL; }