メッセージ
メッセージは以下の3つの構造体をベースに管理されています。struct msg_queueはその大元の構造体です。メッセージを作成すると、ipc下の空きスロットにこの構造体が設定され、対応するIDが返されてきます。この処理は共有メモリの処理と全く同じで、ipc下の空きスロットのポイント先が、struct msg_queu/の違いでしかありません。
struct msg_queueで管理されるメッセージはstruct list_head q_messagesに設定されます。メッセージそのものはstruct msg_msgでパックされ、そのアドレスをstruct list_head q_messagesにリストしていくことになります。そして次のメッセージはstruct msg_msgのstruct list_head m_listにリストされていきます。
1メッセージが1ページを超える場合(正しくは、4096-struct msg_msgのサイズあるいは4096-struct msg_msgsegのサイズを超える場合)、超えた分はstruct msg_msgsegに蓄えられ、struct msg_msgのtruct msg_msgseg* nextないし、struct msg_msgsegのstruct msg_msgseg* nextへとリストされていきます。
struct list_head q_receiversはメッセージを受信したものの、そのメッセージが無い場合、このプロセスをウエイトする必要がある場合、このメンバーにリストされていきます。
struct list_head q_sendersは、メッセージを送信したものの、メッセージが所定の数を超えていた場合、このプロセスをリストするためのメンバーです。
forループの中が実際のメッセージ処理となります。まずセキュリティ絡みのチェックをしています。そしてメッセージサイズ/メッセージ数(?注)をチェックして、メッセージが送信可能ならforループを抜けます。以降はメッセージが送信できなかった場合の処理となります。この場合ss_add()でカレントプロセス(メッセージ送信プロセス)をstruct list_head q_sendersにリストした後、schedule()をコールする事で、他のプロセスに実行権を譲ります。そしてこのプロセスに実行権が戻ってきた時、先にstruct list_head q_sendersにリストしたプロセスを、ss_del()で削除し、再度forループを繰り返すことで、メッセージ送信処理を試みるといった具合です。
forループを抜けた処理はメッセージが送信できた処理となります。pipelined_send()でこのメッセージを受信しているプロセスがあるかどうかチェックを行い、待ちプロセスが有しているなら、そのプロセスを起床させています。もし、待ちプロセスが無いなら、送信メッセージをmsq->q_messagesにリストし、msq->q_cbytes += msgszで、ウエイトリストしたメッセージのバイト数を、msq->q_qnum++でメッセージ数を更新します。またネームスペース下のメンバーにも全メッセージのバイト数およびメッセージ数を更新しています。
で・・・、メッセージ数のチェックと思われるforループの最初のif文で、1 + msq->q_qnum <= msq->q_qbytesのmsq->q_qbytesと比較するのは? msq->q_qbytesは全メッセージのバイト数なわけですから。
load_msg()でメッセージをstruct msg_msgをヘッダーとする領域にメッセージを複写します。ここで複写したstruct msg_msgはstruct msg_queueにリスト登録されることになります。
まず、メッセージサイズのチェックです。DATALEN_MSGは、ページサイズからstruct msg_msgのサイズ分を引いた値となります。そのサイズとstruct msg_msgのサイズ分で、kmalloc()でstruct msg_msg *msgを取得します。そして、このstruct msg_msg *msgをヘッダとして、その次の領域にメッセージをcopy_from_user()で複写しています。(この第1引数がmsg + 1となっている。)
whileループは、メッセージがDATALEN_MSGより大きかった場合の処理になります。この場合、struct msg_msgsegに残りのメッセージを設定する事になります。
kmalloc()でstruct msg_msgsegサイズ分と、残りのメッセージサイズ分とでstruct msg_msgseg *segを取得します。そして同じようにstruct msg_msgsegをヘッダとして、その次の領域に残りのメッセージを複写しています。複写し終えたら、struct msg_msgのnextにリストしていきます。(whileループの前にpseg = &msg->nextとしている。)
まだメッセージが大きくて、複写し終えないようだと、同じ処理を繰り返します。なおpseg = &seg->nextとしているように、次のstruct msg_msgseg **psegは前のstruct msg_msgsegのnextにリストされる事になります。
pipelined_send()でメッセージを受信しているプロセスの処理を行います。メッセージキューのq_receiversにリストされているプロセスです。そのプロセス1つづつをtestmsg()で受信プロセスかチェックします。受信プロセスなら、メッセージキューのq_receiversにリストされている、受信待ちリストからそのプロセスを削除し、wake_up_process()でそのプロセスを起床させています。
struct msg_queueで管理されるメッセージはstruct list_head q_messagesに設定されます。メッセージそのものはstruct msg_msgでパックされ、そのアドレスをstruct list_head q_messagesにリストしていくことになります。そして次のメッセージはstruct msg_msgのstruct list_head m_listにリストされていきます。
1メッセージが1ページを超える場合(正しくは、4096-struct msg_msgのサイズあるいは4096-struct msg_msgsegのサイズを超える場合)、超えた分はstruct msg_msgsegに蓄えられ、struct msg_msgのtruct msg_msgseg* nextないし、struct msg_msgsegのstruct msg_msgseg* nextへとリストされていきます。
struct list_head q_receiversはメッセージを受信したものの、そのメッセージが無い場合、このプロセスをウエイトする必要がある場合、このメンバーにリストされていきます。
struct list_head q_sendersは、メッセージを送信したものの、メッセージが所定の数を超えていた場合、このプロセスをリストするためのメンバーです。
struct msg_queue {
struct kern_ipc_perm q_perm;
time_t q_stime; /* last msgsnd time */
time_t q_rtime; /* last msgrcv time */
time_t q_ctime; /* last change time */
unsigned long q_cbytes; /* current number of bytes on queue */
unsigned long q_qnum; /* number of messages in queue */
unsigned long q_qbytes; /* max number of bytes on queue */
pid_t q_lspid; /* pid of last msgsnd */
pid_t q_lrpid; /* last receive pid */
struct list_head q_messages;
struct list_head q_receivers;
struct list_head q_senders;
}
struct msg_msg {
struct list_head m_list;
long m_type;
int m_ts; /* message text size */
struct msg_msgseg* next;
void *security;
}
struct msg_msgseg {
struct msg_msgseg* next;
}
システムコールmsgsndの実処理はdo_msgsnd()です。まず引数のチェックを行った後、load_msg()でユーザからのメッセージをstruct msg_msg *msgに設定します。なおstruct msg_msgに、メッセージ文を設定するメンバーが無いのに?と言うことですが、kmalloc()する時に、この構造体サイズ+size_t msgszとしてメモリを取得しています。そしてstruct msg_msg *msgにメッセージタイプ/メッセージサイズを設定した後、msg_lock_check()でメッセージIDmsqidに対応するstruct msg_queue *msqを取得します。forループの中が実際のメッセージ処理となります。まずセキュリティ絡みのチェックをしています。そしてメッセージサイズ/メッセージ数(?注)をチェックして、メッセージが送信可能ならforループを抜けます。以降はメッセージが送信できなかった場合の処理となります。この場合ss_add()でカレントプロセス(メッセージ送信プロセス)をstruct list_head q_sendersにリストした後、schedule()をコールする事で、他のプロセスに実行権を譲ります。そしてこのプロセスに実行権が戻ってきた時、先にstruct list_head q_sendersにリストしたプロセスを、ss_del()で削除し、再度forループを繰り返すことで、メッセージ送信処理を試みるといった具合です。
forループを抜けた処理はメッセージが送信できた処理となります。pipelined_send()でこのメッセージを受信しているプロセスがあるかどうかチェックを行い、待ちプロセスが有しているなら、そのプロセスを起床させています。もし、待ちプロセスが無いなら、送信メッセージをmsq->q_messagesにリストし、msq->q_cbytes += msgszで、ウエイトリストしたメッセージのバイト数を、msq->q_qnum++でメッセージ数を更新します。またネームスペース下のメンバーにも全メッセージのバイト数およびメッセージ数を更新しています。
long do_msgsnd(int msqid, long mtype, void __user *mtext,
size_t msgsz, int msgflg)
{
struct msg_queue *msq;
struct msg_msg *msg;
int err;
struct ipc_namespace *ns;
ns = current->nsproxy->ipc_ns;
if (msgsz > ns->msg_ctlmax || (long) msgsz < 0 || msqid < 0)
return -EINVAL;
if (mtype < 1)
return -EINVAL;
msg = load_msg(mtext, msgsz);
if (IS_ERR(msg))
return PTR_ERR(msg);
msg->m_type = mtype;
msg->m_ts = msgsz;
msq = msg_lock_check(ns, msqid);
if (IS_ERR(msq)) {
err = PTR_ERR(msq);
goto out_free;
}
for (;;) {
struct msg_sender s;
err = -EACCES;
if (ipcperms(&msq->q_perm, S_IWUGO))
goto out_unlock_free;
err = security_msg_queue_msgsnd(msq, msg, msgflg);
if (err)
goto out_unlock_free;
if (msgsz + msq->q_cbytes <= msq->q_qbytes &&
1 + msq->q_qnum <= msq->q_qbytes) {
break;
}
/* queue full, wait: */
if (msgflg & IPC_NOWAIT) {
err = -EAGAIN;
goto out_unlock_free;
}
ss_add(msq, &s);
ipc_rcu_getref(msq);
msg_unlock(msq);
schedule();
ipc_lock_by_ptr(&msq->q_perm);
ipc_rcu_putref(msq);
if (msq->q_perm.deleted) {
err = -EIDRM;
goto out_unlock_free;
}
ss_del(&s);
if (signal_pending(current)) {
err = -ERESTARTNOHAND;
goto out_unlock_free;
}
}
msq->q_lspid = task_tgid_vnr(current);
msq->q_stime = get_seconds();
if (!pipelined_send(msq, msg)) {
/* noone is waiting for this message, enqueue it */
list_add_tail(&msg->m_list, &msq->q_messages);
msq->q_cbytes += msgsz;
msq->q_qnum++;
atomic_add(msgsz, &ns->msg_bytes);
atomic_inc(&ns->msg_hdrs);
}
err = 0;
msg = NULL;
out_unlock_free:
msg_unlock(msq);
out_free:
if (msg != NULL)
free_msg(msg);
return err;
}
補足
メッセージ送信可能かどうかのチェックとして、メッセージキュー内のメッセージバイト数およびメッセージ数のチェックを行っていました。メッセージを追加する毎に、msq->q_messagesにリストし、msq->q_qnum++としていました。すなわちmsq->q_qnumがメッセージ数と言うわけです。で・・・、メッセージ数のチェックと思われるforループの最初のif文で、1 + msq->q_qnum <= msq->q_qbytesのmsq->q_qbytesと比較するのは? msq->q_qbytesは全メッセージのバイト数なわけですから。
load_msg()でメッセージをstruct msg_msgをヘッダーとする領域にメッセージを複写します。ここで複写したstruct msg_msgはstruct msg_queueにリスト登録されることになります。
まず、メッセージサイズのチェックです。DATALEN_MSGは、ページサイズからstruct msg_msgのサイズ分を引いた値となります。そのサイズとstruct msg_msgのサイズ分で、kmalloc()でstruct msg_msg *msgを取得します。そして、このstruct msg_msg *msgをヘッダとして、その次の領域にメッセージをcopy_from_user()で複写しています。(この第1引数がmsg + 1となっている。)
whileループは、メッセージがDATALEN_MSGより大きかった場合の処理になります。この場合、struct msg_msgsegに残りのメッセージを設定する事になります。
kmalloc()でstruct msg_msgsegサイズ分と、残りのメッセージサイズ分とでstruct msg_msgseg *segを取得します。そして同じようにstruct msg_msgsegをヘッダとして、その次の領域に残りのメッセージを複写しています。複写し終えたら、struct msg_msgのnextにリストしていきます。(whileループの前にpseg = &msg->nextとしている。)
まだメッセージが大きくて、複写し終えないようだと、同じ処理を繰り返します。なおpseg = &seg->nextとしているように、次のstruct msg_msgseg **psegは前のstruct msg_msgsegのnextにリストされる事になります。
#define DATALEN_MSG (PAGE_SIZE-sizeof(struct msg_msg))
#define DATALEN_SEG (PAGE_SIZE-sizeof(struct msg_msgseg))
struct msg_msg *load_msg(const void __user *src, int len)
{
struct msg_msg *msg;
struct msg_msgseg **pseg;
int err;
int alen;
alen = len;
if (alen > DATALEN_MSG)
alen = DATALEN_MSG;
msg = kmalloc(sizeof(*msg) + alen, GFP_KERNEL);
if (msg == NULL)
return ERR_PTR(-ENOMEM);
msg->next = NULL;
msg->security = NULL;
if (copy_from_user(msg + 1, src, alen)) {
err = -EFAULT;
goto out_err;
}
len -= alen;
src = ((char __user *)src) + alen;
pseg = &msg->next;
while (len > 0) {
struct msg_msgseg *seg;
alen = len;
if (alen > DATALEN_SEG)
alen = DATALEN_SEG;
seg = kmalloc(sizeof(*seg) + alen,
GFP_KERNEL);
if (seg == NULL) {
err = -ENOMEM;
goto out_err;
}
*pseg = seg;
seg->next = NULL;
if (copy_from_user(seg + 1, src, alen)) {
err = -EFAULT;
goto out_err;
}
pseg = &seg->next;
len -= alen;
src = ((char __user *)src) + alen;
}
err = security_msg_msg_alloc(msg);
if (err)
goto out_err;
return msg;
out_err:
free_msg(msg);
return ERR_PTR(err);
}
pipelined_send()でメッセージを受信しているプロセスの処理を行います。メッセージキューのq_receiversにリストされているプロセスです。そのプロセス1つづつをtestmsg()で受信プロセスかチェックします。受信プロセスなら、メッセージキューのq_receiversにリストされている、受信待ちリストからそのプロセスを削除し、wake_up_process()でそのプロセスを起床させています。
static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
{
struct list_head *tmp;
tmp = msq->q_receivers.next;
while (tmp != &msq->q_receivers) {
struct msg_receiver *msr;
msr = list_entry(tmp, struct msg_receiver, r_list);
tmp = tmp->next;
if (testmsg(msg, msr->r_msgtype, msr->r_mode) &&
!security_msg_queue_msgrcv(msq, msg, msr->r_tsk,
msr->r_msgtype, msr->r_mode)) {
list_del(&msr->r_list);
if (msr->r_maxsize < msg->m_ts) {
msr->r_msg = NULL;
wake_up_process(msr->r_tsk);
smp_mb();
msr->r_msg = ERR_PTR(-E2BIG);
} else {
msr->r_msg = NULL;
msq->q_lrpid = task_pid_vnr(msr->r_tsk);
msq->q_rtime = get_seconds();
wake_up_process(msr->r_tsk);
smp_mb();
msr->r_msg = msg;
return 1;
}
}
}
return 0;
}
testms()は、type/modeのより受信するかどうかのチェックしています。なお、引数modeは、受信システムコールmsgrcv()のlong msgtyp, int msgflgから、convert_mode()をコールすることで、SEARCH_ANY/SEARCH_EQUA/SEARCH_NOTEQUAL/SEARCH_LESSEQUAのいづれかが設定されるようになっています。
#define SEARCH_ANY 1
#define SEARCH_EQUAL 2
#define SEARCH_NOTEQUAL 3
#define SEARCH_LESSEQUAL 4
static int testmsg(struct msg_msg *msg, long type, int mode)
{
switch(mode)
{
case SEARCH_ANY:
return 1;
case SEARCH_LESSEQUAL:
if (msg->m_type <=type)
return 1;
break;
case SEARCH_EQUAL:
if (msg->m_type == type)
return 1;
break;
case SEARCH_NOTEQUAL:
if (msg->m_type != type)
return 1;
break;
}
return 0;
}
static inline int convert_mode(long *msgtyp, int msgflg)
{
/*
* find message of correct type.
* msgtyp = 0 => get first.
* msgtyp > 0 => get first message of matching type.
* msgtyp < 0 => get message with least type must be < abs(msgtype).
*/
if (*msgtyp == 0)
return SEARCH_ANY;
if (*msgtyp < 0) {
*msgtyp = -*msgtyp;
return SEARCH_LESSEQUAL;
}
if (msgflg & MSG_EXCEPT)
return SEARCH_NOTEQUAL;
return SEARCH_EQUAL;
}






