無料Wikiサービス | デモページ
検索

アクセス数
最近のコメント
kprobe - ななし
ksetの実装 - スーパーコピー
カーネルスレッドとは - ノース
カーネルスレッドとは - nbyst
asmlinkageってなに? - ノース
asmlinkageってなに? - よろしく
はじめ - ノース
はじめ - ノース
はじめ - 楽打連動ユーザー
はじめ - 楽打連動ユーザー
Adsense
広告情報が設定されていません。

msgrcv()とrecv_msg()の違い


コンドの日本人仲間と楽しむために、カラオケサーバを作成した。当初フラッシュで選曲/再生ともクライアントで、そいつをTVにつないで楽しむ。という按配で、複数の仲間と楽しむといったケースで操作性に問題あり。そこで、再生はサーバ側とすべき、mplayerをバックエンドで動作させるシステムを構築した。

上記を実装する上で、mplayer自身にいろいろオプションがあって(在りすぎて良くわからん。と言うのが正直な所。ソースを調べてまでの思いも無し。)、実現する上で、よりスマートな方法があるんだろうが、
、プレーア・プロセスに依存しない形態との思いで、クライアントから送られてくるメッセージ(曲)
を、CGIとしてIPCメッセージに蓄えて、デーモンとして起動している再生プロセスが、そのメッセージを受信して、mplayerにて再生するものである。

mplayerは、idleオプションを指定しなければ、再生が終わると終了する。mplayerは子プロセスとして起動するようにして、親プロセスはwaitで待機しており、mplayerの終了シグナルが送られてくる毎に、再度メッセージを受信し、子プロセスとしてmplayerを起動しつずけるというものだ。

そこで、クライアントが選曲したら、サーバとしてその曲をメッセージに蓄えるわけだが、クライアントへのリスポンスとして、現在の再生待ちのリストを返したい。

IPCメッセージには、PEEKするような機能が見当たらない。調べてみるとrecv_msg()と言うのがあって、これには、MS_PEEKオプションを指定することで、メッセージのPEEKが可能と言うことだ。recv_msg()はソケットでメッセージをやり取りするのだが、たぶんglibで実装されていて、実態はIPCメッセージだと。そうであるなら、msgrcvシステムコールでPEEKする事は可能では。と思い立ったのが事の始まり。

recv_msgはソケットインターフェースのコールバック関数として定義されていて、メッセージは、このソケット下で管理されていて、結論として、msgrcvとrecv_msgはまったく別物だった。そしてmsgrcvには、PEEKの処理は実装されていない。
(現行カラオケシステムでは、すべてメッセージ全て受信して、再度送信させている。)

msgrcvシステムコールの実装

msgrcvシステムコールはdo_msgrcv()をコールする。まず引数のチェックを行い、プロセスディスクリプタのipcネームスペースから、msg_lock_check()でipcメッセージヘッダをロックして取得する。後のforループがメッセージ取得処理となる。

ipcperms()でメッセージが読み込み可能かチェックした後、tmp = msq->q_messages.nextとする。tmpはメッセージリストの最初のメッセージとなり、while (tmp != &msq->q_messages)で、すべてのメッセージを捜査する。ここでmsgtyp 引き数に応じた、メッセージを取得することになる。

メッセージが取得できれば、msgにメッセージが、そうでないならmsgにはERR_PTR(-EAGAIN)でループを抜けることになる。

if (!IS_ERR(msg))はメッセージが取得できた時の処理で、メッセージキューのメッセージ数/受信時刻等の更新をしているが、同時に、無条件にlist_del()でそのメッセージをそのものを削除している。
( ss_wakeup()は、メッセージバッファーが一杯で、メッセージバッファー待ちでウエイトしている送信側プロセスを起床するものである。)

とりあえず、msgrcvシステムコールの実装では、メッセージをピークすることはできない。
long do_msgrcv(int msqid, long *pmtype, void __user *mtext,
               size_t msgsz, long msgtyp, int msgflg)
{
       struct msg_queue *msq;
       struct msg_msg *msg;
       int mode;
       struct ipc_namespace *ns;

       if (msqid < 0 || (long) msgsz < 0)
               return -EINVAL;
       mode = convert_mode(&msgtyp, msgflg);
       ns = current->nsproxy->ipc_ns;

       msq = msg_lock_check(ns, msqid);
       if (IS_ERR(msq))
               return PTR_ERR(msq);

       for (;;) {
               struct msg_receiver msr_d;
               struct list_head *tmp;

               msg = ERR_PTR(-EACCES);
               if (ipcperms(ns, &msq->q_perm, S_IRUGO))
                       goto out_unlock;

               msg = ERR_PTR(-EAGAIN);
               tmp = msq->q_messages.next;
               while (tmp != &msq->q_messages) {
                       struct msg_msg *walk_msg;

                       walk_msg = list_entry(tmp, struct msg_msg, m_list);
                       if (testmsg(walk_msg, msgtyp, mode) &&
                           !security_msg_queue_msgrcv(msq, walk_msg, current,
                                                      msgtyp, mode)) {

                               msg = walk_msg;
                               if (mode == SEARCH_LESSEQUAL &&
                                               walk_msg->m_type != 1) {
                                       msg = walk_msg;
                                       msgtyp = walk_msg->m_type - 1;
                               } else {
                                       msg = walk_msg;
                                       break;
                               }
                       }
                       tmp = tmp->next;
               }
               if (!IS_ERR(msg)) {
                       if ((msgsz < msg->m_ts) && !(msgflg & MSG_NOERROR)) {
                               msg = ERR_PTR(-E2BIG);
                               goto out_unlock;
                       }
                       list_del(&msg->m_list);
                       msq->q_qnum--;
                       msq->q_rtime = get_seconds();
                       msq->q_lrpid = task_tgid_vnr(current);
                       msq->q_cbytes -= msg->m_ts;
                       atomic_sub(msg->m_ts, &ns->msg_bytes);
                       atomic_dec(&ns->msg_hdrs);
                       ss_wakeup(&msq->q_senders, 0);
                       msg_unlock(msq);
                       break;
               }
               if (msgflg & IPC_NOWAIT) {
                       msg = ERR_PTR(-ENOMSG);
                       goto out_unlock;
               }
               list_add_tail(&msr_d.r_list, &msq->q_receivers);
               msr_d.r_tsk = current;
               msr_d.r_msgtype = msgtyp;
               msr_d.r_mode = mode;
               if (msgflg & MSG_NOERROR)
                       msr_d.r_maxsize = INT_MAX;
               else
                       msr_d.r_maxsize = msgsz;
               msr_d.r_msg = ERR_PTR(-EAGAIN);
               current->state = TASK_INTERRUPTIBLE;
               msg_unlock(msq);

               schedule();

               rcu_read_lock();

               msg = (struct msg_msg*)msr_d.r_msg;
               while (msg == NULL) {
                       cpu_relax();
                       msg = (struct msg_msg *)msr_d.r_msg;
               }

               if (msg != ERR_PTR(-EAGAIN)) {
                       rcu_read_unlock();
                       break;
               }
               ipc_lock_by_ptr(&msq->q_perm);
               rcu_read_unlock();

               msg = (struct msg_msg*)msr_d.r_msg;
               if (msg != ERR_PTR(-EAGAIN))
                       goto out_unlock;

               list_del(&msr_d.r_list);
               if (signal_pending(current)) {
                       msg = ERR_PTR(-ERESTARTNOHAND);
out_unlock:
                       msg_unlock(msq);
                       break;
               }
       }
       if (IS_ERR(msg))
               return PTR_ERR(msg);

       msgsz = (msgsz > msg->m_ts) ? msg->m_ts : msgsz;
       *pmtype = msg->m_type;
       if (store_msg(mtext, msg, msgsz))
               msgsz = -EFAULT;

       free_msg(msg);

       return msgsz;
}

recv_msgコールバック関数の実装

tipc_create()でソケットを作成すると、struct socket *sock->opsに、コールバック関数群のstruct proto_ops packet_opsが設定される。そのメンバー.recvmsg=recv_msgが設定されている。
static const struct proto_ops packet_ops = {
       .owner          = THIS_MODULE,
       .family         = AF_TIPC,
       .release        = release,
       .bind           = bind,
       .connect        = connect,
       .socketpair     = sock_no_socketpair,
       .accept         = accept,
       .getname        = get_name,
       .poll           = poll,
       .ioctl          = sock_no_ioctl,
       .listen         = listen,
       .shutdown       = shutdown,
       .setsockopt     = setsockopt,
       .getsockopt     = getsockopt,
       .sendmsg        = send_packet,
       .recvmsg        = recv_msg,
       .mmap           = sock_no_mmap,
       .sendpage       = sock_no_sendpage
};
recv_msg()のメッセージ取得は、ソケットのsk->sk_receive_queue下のリストで管理しており、IPCメッセージとは別物である事が判断できる。
static int recv_msg(struct kiocb *iocb, struct socket *sock,
                   struct msghdr *m, size_t buf_len, int flags)
{
       struct sock *sk = sock->sk;
       struct tipc_port *tport = tipc_sk_port(sk);
       struct sk_buff *buf;
       struct tipc_msg *msg;
       long timeout;
       unsigned int sz;
       u32 err;
       int res;

       if (unlikely(!buf_len))
               return -EINVAL;

       lock_sock(sk);

       if (unlikely(sock->state == SS_UNCONNECTED)) {
               res = -ENOTCONN;
               goto exit;
       }

       timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
restart:

       while (skb_queue_empty(&sk->sk_receive_queue)) {
               if (sock->state == SS_DISCONNECTING) {
                       res = -ENOTCONN;
                       goto exit;
               }
               if (timeout <= 0L) {
                       res = timeout ? timeout : -EWOULDBLOCK;
                       goto exit;
               }
               release_sock(sk);
               timeout = wait_event_interruptible_timeout(*sk_sleep(sk),
                                                          tipc_rx_ready(sock),
                                                          timeout);
               lock_sock(sk);
       }
       buf = skb_peek(&sk->sk_receive_queue);
       msg = buf_msg(buf);
       sz = msg_data_sz(msg);
   :
   :
}

static inline int skb_queue_empty(const struct sk_buff_head *list)
{
       return list->next == (struct sk_buff *)list;
}

static inline struct sk_buff *skb_peek(const struct sk_buff_head *list_)
{
       struct sk_buff *list = ((const struct sk_buff *)list_)->next;
       if (list == (struct sk_buff *)list_)
               list = NULL;
       return list;
}

最終更新 2012/10/04 15:07:11 - north
(2012/10/04 15:06:15 作成)