メッセージ


メッセージは以下の3つの構造体をベースに管理されています。struct msg_queueはその大元の構造体です。メッセージを作成すると、ipc下の空きスロットにこの構造体が設定され、対応するIDが返されてきます。この処理は共有メモリの処理と全く同じで、ipc下の空きスロットのポイント先が、struct msg_queu/の違いでしかありません。

struct msg_queueで管理されるメッセージはstruct list_head q_messagesに設定されます。メッセージそのものはstruct msg_msgでパックされ、そのアドレスをstruct list_head q_messagesにリストしていくことになります。そして次のメッセージはstruct msg_msgのstruct list_head m_listにリストされていきます。

1メッセージが1ページを超える場合(正しくは、4096-struct msg_msgのサイズあるいは4096-struct msg_msgsegのサイズを超える場合)、超えた分はstruct msg_msgsegに蓄えられ、struct msg_msgのtruct msg_msgseg* nextないし、struct msg_msgsegのstruct msg_msgseg* nextへとリストされていきます。

struct list_head q_receiversはメッセージを受信したものの、そのメッセージが無い場合、このプロセスをウエイトする必要がある場合、このメンバーにリストされていきます。

struct list_head q_sendersは、メッセージを送信したものの、メッセージが所定の数を超えていた場合、このプロセスをリストするためのメンバーです。
struct msg_queue {
       struct kern_ipc_perm q_perm;
       time_t q_stime;                 /* last msgsnd time */
       time_t q_rtime;                 /* last msgrcv time */
       time_t q_ctime;                 /* last change time */
       unsigned long q_cbytes;         /* current number of bytes on queue */
       unsigned long q_qnum;           /* number of messages in queue */
       unsigned long q_qbytes;         /* max number of bytes on queue */
       pid_t q_lspid;                  /* pid of last msgsnd */
       pid_t q_lrpid;                  /* last receive pid */

       struct list_head q_messages;
       struct list_head q_receivers;
       struct list_head q_senders;
}

struct msg_msg {
       struct list_head m_list; 
       long   m_type;          
       int    m_ts;           /* message text size */
       struct msg_msgseg* next;
       void   *security;
}

struct msg_msgseg {
       struct msg_msgseg* next;
}
システムコールmsgsndの実処理はdo_msgsnd()です。まず引数のチェックを行った後、load_msg()でユーザからのメッセージをstruct msg_msg *msgに設定します。なおstruct msg_msgに、メッセージ文を設定するメンバーが無いのに?と言うことですが、kmalloc()する時に、この構造体サイズ+size_t msgszとしてメモリを取得しています。そしてstruct msg_msg *msgにメッセージタイプ/メッセージサイズを設定した後、msg_lock_check()でメッセージIDmsqidに対応するstruct msg_queue *msqを取得します。

forループの中が実際のメッセージ処理となります。まずセキュリティ絡みのチェックをしています。そしてメッセージサイズ/メッセージ数(?注)をチェックして、メッセージが送信可能ならforループを抜けます。以降はメッセージが送信できなかった場合の処理となります。この場合ss_add()でカレントプロセス(メッセージ送信プロセス)をstruct list_head q_sendersにリストした後、schedule()をコールする事で、他のプロセスに実行権を譲ります。そしてこのプロセスに実行権が戻ってきた時、先にstruct list_head q_sendersにリストしたプロセスを、ss_del()で削除し、再度forループを繰り返すことで、メッセージ送信処理を試みるといった具合です。

forループを抜けた処理はメッセージが送信できた処理となります。pipelined_send()でこのメッセージを受信しているプロセスがあるかどうかチェックを行い、待ちプロセスが有しているなら、そのプロセスを起床させています。もし、待ちプロセスが無いなら、送信メッセージをmsq->q_messagesにリストし、msq->q_cbytes += msgszで、ウエイトリストしたメッセージのバイト数を、msq->q_qnum++でメッセージ数を更新します。またネームスペース下のメンバーにも全メッセージのバイト数およびメッセージ数を更新しています。
long do_msgsnd(int msqid, long mtype, void __user *mtext,
               size_t msgsz, int msgflg)
{
       struct msg_queue *msq;
       struct msg_msg *msg;
       int err;
       struct ipc_namespace *ns;

       ns = current->nsproxy->ipc_ns;

       if (msgsz > ns->msg_ctlmax || (long) msgsz < 0 || msqid < 0)
               return -EINVAL;
       if (mtype < 1)
               return -EINVAL;

       msg = load_msg(mtext, msgsz);
       if (IS_ERR(msg))
               return PTR_ERR(msg);

       msg->m_type = mtype;
       msg->m_ts = msgsz;

       msq = msg_lock_check(ns, msqid);
       if (IS_ERR(msq)) {
               err = PTR_ERR(msq);
               goto out_free;
       }

       for (;;) {
               struct msg_sender s;

               err = -EACCES;
               if (ipcperms(&msq->q_perm, S_IWUGO))
                       goto out_unlock_free;

               err = security_msg_queue_msgsnd(msq, msg, msgflg);
               if (err)
                       goto out_unlock_free;

               if (msgsz + msq->q_cbytes <= msq->q_qbytes &&
                               1 + msq->q_qnum <= msq->q_qbytes) {
                       break;
               }

               /* queue full, wait: */
               if (msgflg & IPC_NOWAIT) {
                       err = -EAGAIN;
                       goto out_unlock_free;
               }
               ss_add(msq, &s);
               ipc_rcu_getref(msq);
               msg_unlock(msq);
               schedule();

               ipc_lock_by_ptr(&msq->q_perm);
               ipc_rcu_putref(msq);
               if (msq->q_perm.deleted) {
                       err = -EIDRM;
                       goto out_unlock_free;
               }
               ss_del(&s);

               if (signal_pending(current)) {
                       err = -ERESTARTNOHAND;
                       goto out_unlock_free;
               }
       }

       msq->q_lspid = task_tgid_vnr(current);
       msq->q_stime = get_seconds();

       if (!pipelined_send(msq, msg)) {
               /* noone is waiting for this message, enqueue it */
               list_add_tail(&msg->m_list, &msq->q_messages);
               msq->q_cbytes += msgsz;
               msq->q_qnum++;
               atomic_add(msgsz, &ns->msg_bytes);
               atomic_inc(&ns->msg_hdrs);
       }

       err = 0;
       msg = NULL;

out_unlock_free:
       msg_unlock(msq);
out_free:
       if (msg != NULL)
               free_msg(msg);
       return err;
}

補足

メッセージ送信可能かどうかのチェックとして、メッセージキュー内のメッセージバイト数およびメッセージ数のチェックを行っていました。メッセージを追加する毎に、msq->q_messagesにリストし、msq->q_qnum++としていました。すなわちmsq->q_qnumがメッセージ数と言うわけです。
で・・・、メッセージ数のチェックと思われるforループの最初のif文で、1 + msq->q_qnum <= msq->q_qbytesのmsq->q_qbytesと比較するのは? msq->q_qbytesは全メッセージのバイト数なわけですから。

load_msg()でメッセージをstruct msg_msgをヘッダーとする領域にメッセージを複写します。ここで複写したstruct msg_msgはstruct msg_queueにリスト登録されることになります。

まず、メッセージサイズのチェックです。DATALEN_MSGは、ページサイズからstruct msg_msgのサイズ分を引いた値となります。そのサイズとstruct msg_msgのサイズ分で、kmalloc()でstruct msg_msg *msgを取得します。そして、このstruct msg_msg *msgをヘッダとして、その次の領域にメッセージをcopy_from_user()で複写しています。(この第1引数がmsg + 1となっている。)

whileループは、メッセージがDATALEN_MSGより大きかった場合の処理になります。この場合、struct msg_msgsegに残りのメッセージを設定する事になります。

kmalloc()でstruct msg_msgsegサイズ分と、残りのメッセージサイズ分とでstruct msg_msgseg *segを取得します。そして同じようにstruct msg_msgsegをヘッダとして、その次の領域に残りのメッセージを複写しています。複写し終えたら、struct msg_msgのnextにリストしていきます。(whileループの前にpseg = &msg->nextとしている。)

まだメッセージが大きくて、複写し終えないようだと、同じ処理を繰り返します。なおpseg = &seg->nextとしているように、次のstruct msg_msgseg **psegは前のstruct msg_msgsegのnextにリストされる事になります。

#define DATALEN_MSG     (PAGE_SIZE-sizeof(struct msg_msg))
#define DATALEN_SEG     (PAGE_SIZE-sizeof(struct msg_msgseg))

struct msg_msg *load_msg(const void __user *src, int len)
{
       struct msg_msg *msg;
       struct msg_msgseg **pseg;
       int err;
       int alen;

       alen = len;
       if (alen > DATALEN_MSG)
               alen = DATALEN_MSG;

       msg = kmalloc(sizeof(*msg) + alen, GFP_KERNEL);
       if (msg == NULL)
               return ERR_PTR(-ENOMEM);

       msg->next = NULL;
       msg->security = NULL;

       if (copy_from_user(msg + 1, src, alen)) {
               err = -EFAULT;
               goto out_err;
       }

       len -= alen;
       src = ((char __user *)src) + alen;
       pseg = &msg->next;
       while (len > 0) {
               struct msg_msgseg *seg;
               alen = len;
               if (alen > DATALEN_SEG)
                       alen = DATALEN_SEG;
               seg = kmalloc(sizeof(*seg) + alen,
                                                GFP_KERNEL);
               if (seg == NULL) {
                       err = -ENOMEM;
                       goto out_err;
               }
               *pseg = seg;
               seg->next = NULL;
               if (copy_from_user(seg + 1, src, alen)) {
                       err = -EFAULT;
                       goto out_err;
               }
               pseg = &seg->next;
               len -= alen;
               src = ((char __user *)src) + alen;
       }

       err = security_msg_msg_alloc(msg);
       if (err)
               goto out_err;

       return msg;

out_err:
       free_msg(msg);
       return ERR_PTR(err);
}

pipelined_send()でメッセージを受信しているプロセスの処理を行います。メッセージキューのq_receiversにリストされているプロセスです。そのプロセス1つづつをtestmsg()で受信プロセスかチェックします。受信プロセスなら、メッセージキューのq_receiversにリストされている、受信待ちリストからそのプロセスを削除し、wake_up_process()でそのプロセスを起床させています。
static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
{
       struct list_head *tmp;

       tmp = msq->q_receivers.next;
       while (tmp != &msq->q_receivers) {
               struct msg_receiver *msr;

               msr = list_entry(tmp, struct msg_receiver, r_list);
               tmp = tmp->next;
               if (testmsg(msg, msr->r_msgtype, msr->r_mode) &&
                   !security_msg_queue_msgrcv(msq, msg, msr->r_tsk,
                                              msr->r_msgtype, msr->r_mode)) {

                       list_del(&msr->r_list);
                       if (msr->r_maxsize < msg->m_ts) {
                               msr->r_msg = NULL;
                               wake_up_process(msr->r_tsk);
                               smp_mb();
                               msr->r_msg = ERR_PTR(-E2BIG);
                       } else {
                               msr->r_msg = NULL;
                               msq->q_lrpid = task_pid_vnr(msr->r_tsk);
                               msq->q_rtime = get_seconds();
                               wake_up_process(msr->r_tsk);
                               smp_mb();
                               msr->r_msg = msg;

                               return 1;
                       }
               }
       }
       return 0;
}
testms()は、type/modeのより受信するかどうかのチェックしています。なお、引数modeは、受信システムコールmsgrcv()のlong msgtyp, int msgflgから、convert_mode()をコールすることで、SEARCH_ANY/SEARCH_EQUA/SEARCH_NOTEQUAL/SEARCH_LESSEQUAのいづれかが設定されるようになっています。
#define SEARCH_ANY              1
#define SEARCH_EQUAL            2
#define SEARCH_NOTEQUAL         3
#define SEARCH_LESSEQUAL        4

static int testmsg(struct msg_msg *msg, long type, int mode)
{
       switch(mode)
       {
               case SEARCH_ANY:
                       return 1;
               case SEARCH_LESSEQUAL:
                       if (msg->m_type <=type)
                               return 1;
                       break;
               case SEARCH_EQUAL:
                       if (msg->m_type == type)
                               return 1;
                       break;
               case SEARCH_NOTEQUAL:
                       if (msg->m_type != type)
                               return 1;
                       break;
       }
       return 0;
}

static inline int convert_mode(long *msgtyp, int msgflg)
{
       /*
        *  find message of correct type.
        *  msgtyp = 0 => get first.
        *  msgtyp > 0 => get first message of matching type.
        *  msgtyp < 0 => get message with least type must be < abs(msgtype).
        */
       if (*msgtyp == 0)
               return SEARCH_ANY;
       if (*msgtyp < 0) {
               *msgtyp = -*msgtyp;
               return SEARCH_LESSEQUAL;
       }
       if (msgflg & MSG_EXCEPT)
               return SEARCH_NOTEQUAL;
       return SEARCH_EQUAL;
}

補足

convert_mode()のコメントにあるget first.云々についてですが、ここでは送信処理のアクションに伴う受信プロセスで、対象となるメッセージは送信されたメッセージとなります。get first.云々は、受信システムコールでの処理での内容となります。

最終更新 2011/09/05 07:33:52 - north
(2011/09/04 17:25:35 作成)


検索

アクセス数
3712743
最近のコメント
コアダンプファイル - sakaia
list_head構造体 - yocto_no_yomikata
勧告ロックと強制ロック - wataash
LKMからのファイル出力 - 重松 宏昌
kprobe - ななし
ksetの実装 - スーパーコピー
カーネルスレッドとは - ノース
カーネルスレッドとは - nbyst
asmlinkageってなに? - ノース
asmlinkageってなに? - よろしく
Adsense
広告情報が設定されていません。