msgrcv()とrecv_msg()の違い
コンドの日本人仲間と楽しむために、カラオケサーバを作成した。当初フラッシュで選曲/再生ともクライアントで、そいつをTVにつないで楽しむ。という按配で、複数の仲間と楽しむといったケースで操作性に問題あり。そこで、再生はサーバ側とすべき、mplayerをバックエンドで動作させるシステムを構築した。
上記を実装する上で、mplayer自身にいろいろオプションがあって(在りすぎて良くわからん。と言うのが正直な所。ソースを調べてまでの思いも無し。)、実現する上で、よりスマートな方法があるんだろうが、
、プレーア・プロセスに依存しない形態との思いで、クライアントから送られてくるメッセージ(曲)
を、CGIとしてIPCメッセージに蓄えて、デーモンとして起動している再生プロセスが、そのメッセージを受信して、mplayerにて再生するものである。
mplayerは、idleオプションを指定しなければ、再生が終わると終了する。mplayerは子プロセスとして起動するようにして、親プロセスはwaitで待機しており、mplayerの終了シグナルが送られてくる毎に、再度メッセージを受信し、子プロセスとしてmplayerを起動しつずけるというものだ。
そこで、クライアントが選曲したら、サーバとしてその曲をメッセージに蓄えるわけだが、クライアントへのリスポンスとして、現在の再生待ちのリストを返したい。
IPCメッセージには、PEEKするような機能が見当たらない。調べてみるとrecv_msg()と言うのがあって、これには、MS_PEEKオプションを指定することで、メッセージのPEEKが可能と言うことだ。recv_msg()はソケットでメッセージをやり取りするのだが、たぶんglibで実装されていて、実態はIPCメッセージだと。そうであるなら、msgrcvシステムコールでPEEKする事は可能では。と思い立ったのが事の始まり。
recv_msgはソケットインターフェースのコールバック関数として定義されていて、メッセージは、このソケット下で管理されていて、結論として、msgrcvとrecv_msgはまったく別物だった。そしてmsgrcvには、PEEKの処理は実装されていない。
(現行カラオケシステムでは、すべてメッセージ全て受信して、再度送信させている。)
ipcperms()でメッセージが読み込み可能かチェックした後、tmp = msq->q_messages.nextとする。tmpはメッセージリストの最初のメッセージとなり、while (tmp != &msq->q_messages)で、すべてのメッセージを捜査する。ここでmsgtyp 引き数に応じた、メッセージを取得することになる。
メッセージが取得できれば、msgにメッセージが、そうでないならmsgにはERR_PTR(-EAGAIN)でループを抜けることになる。
if (!IS_ERR(msg))はメッセージが取得できた時の処理で、メッセージキューのメッセージ数/受信時刻等の更新をしているが、同時に、無条件にlist_del()でそのメッセージをそのものを削除している。
( ss_wakeup()は、メッセージバッファーが一杯で、メッセージバッファー待ちでウエイトしている送信側プロセスを起床するものである。)
とりあえず、msgrcvシステムコールの実装では、メッセージをピークすることはできない。
上記を実装する上で、mplayer自身にいろいろオプションがあって(在りすぎて良くわからん。と言うのが正直な所。ソースを調べてまでの思いも無し。)、実現する上で、よりスマートな方法があるんだろうが、
、プレーア・プロセスに依存しない形態との思いで、クライアントから送られてくるメッセージ(曲)
を、CGIとしてIPCメッセージに蓄えて、デーモンとして起動している再生プロセスが、そのメッセージを受信して、mplayerにて再生するものである。
mplayerは、idleオプションを指定しなければ、再生が終わると終了する。mplayerは子プロセスとして起動するようにして、親プロセスはwaitで待機しており、mplayerの終了シグナルが送られてくる毎に、再度メッセージを受信し、子プロセスとしてmplayerを起動しつずけるというものだ。
そこで、クライアントが選曲したら、サーバとしてその曲をメッセージに蓄えるわけだが、クライアントへのリスポンスとして、現在の再生待ちのリストを返したい。
IPCメッセージには、PEEKするような機能が見当たらない。調べてみるとrecv_msg()と言うのがあって、これには、MS_PEEKオプションを指定することで、メッセージのPEEKが可能と言うことだ。recv_msg()はソケットでメッセージをやり取りするのだが、たぶんglibで実装されていて、実態はIPCメッセージだと。そうであるなら、msgrcvシステムコールでPEEKする事は可能では。と思い立ったのが事の始まり。
recv_msgはソケットインターフェースのコールバック関数として定義されていて、メッセージは、このソケット下で管理されていて、結論として、msgrcvとrecv_msgはまったく別物だった。そしてmsgrcvには、PEEKの処理は実装されていない。
(現行カラオケシステムでは、すべてメッセージ全て受信して、再度送信させている。)
msgrcvシステムコールの実装
msgrcvシステムコールはdo_msgrcv()をコールする。まず引数のチェックを行い、プロセスディスクリプタのipcネームスペースから、msg_lock_check()でipcメッセージヘッダをロックして取得する。後のforループがメッセージ取得処理となる。ipcperms()でメッセージが読み込み可能かチェックした後、tmp = msq->q_messages.nextとする。tmpはメッセージリストの最初のメッセージとなり、while (tmp != &msq->q_messages)で、すべてのメッセージを捜査する。ここでmsgtyp 引き数に応じた、メッセージを取得することになる。
メッセージが取得できれば、msgにメッセージが、そうでないならmsgにはERR_PTR(-EAGAIN)でループを抜けることになる。
if (!IS_ERR(msg))はメッセージが取得できた時の処理で、メッセージキューのメッセージ数/受信時刻等の更新をしているが、同時に、無条件にlist_del()でそのメッセージをそのものを削除している。
( ss_wakeup()は、メッセージバッファーが一杯で、メッセージバッファー待ちでウエイトしている送信側プロセスを起床するものである。)
とりあえず、msgrcvシステムコールの実装では、メッセージをピークすることはできない。
long do_msgrcv(int msqid, long *pmtype, void __user *mtext, size_t msgsz, long msgtyp, int msgflg) { struct msg_queue *msq; struct msg_msg *msg; int mode; struct ipc_namespace *ns; if (msqid < 0 || (long) msgsz < 0) return -EINVAL; mode = convert_mode(&msgtyp, msgflg); ns = current->nsproxy->ipc_ns; msq = msg_lock_check(ns, msqid); if (IS_ERR(msq)) return PTR_ERR(msq); for (;;) { struct msg_receiver msr_d; struct list_head *tmp; msg = ERR_PTR(-EACCES); if (ipcperms(ns, &msq->q_perm, S_IRUGO)) goto out_unlock; msg = ERR_PTR(-EAGAIN); tmp = msq->q_messages.next; while (tmp != &msq->q_messages) { struct msg_msg *walk_msg; walk_msg = list_entry(tmp, struct msg_msg, m_list); if (testmsg(walk_msg, msgtyp, mode) && !security_msg_queue_msgrcv(msq, walk_msg, current, msgtyp, mode)) { msg = walk_msg; if (mode == SEARCH_LESSEQUAL && walk_msg->m_type != 1) { msg = walk_msg; msgtyp = walk_msg->m_type - 1; } else { msg = walk_msg; break; } } tmp = tmp->next; } if (!IS_ERR(msg)) { if ((msgsz < msg->m_ts) && !(msgflg & MSG_NOERROR)) { msg = ERR_PTR(-E2BIG); goto out_unlock; } list_del(&msg->m_list); msq->q_qnum--; msq->q_rtime = get_seconds(); msq->q_lrpid = task_tgid_vnr(current); msq->q_cbytes -= msg->m_ts; atomic_sub(msg->m_ts, &ns->msg_bytes); atomic_dec(&ns->msg_hdrs); ss_wakeup(&msq->q_senders, 0); msg_unlock(msq); break; } if (msgflg & IPC_NOWAIT) { msg = ERR_PTR(-ENOMSG); goto out_unlock; } list_add_tail(&msr_d.r_list, &msq->q_receivers); msr_d.r_tsk = current; msr_d.r_msgtype = msgtyp; msr_d.r_mode = mode; if (msgflg & MSG_NOERROR) msr_d.r_maxsize = INT_MAX; else msr_d.r_maxsize = msgsz; msr_d.r_msg = ERR_PTR(-EAGAIN); current->state = TASK_INTERRUPTIBLE; msg_unlock(msq); schedule(); rcu_read_lock(); msg = (struct msg_msg*)msr_d.r_msg; while (msg == NULL) { cpu_relax(); msg = (struct msg_msg *)msr_d.r_msg; } if (msg != ERR_PTR(-EAGAIN)) { rcu_read_unlock(); break; } ipc_lock_by_ptr(&msq->q_perm); rcu_read_unlock(); msg = (struct msg_msg*)msr_d.r_msg; if (msg != ERR_PTR(-EAGAIN)) goto out_unlock; list_del(&msr_d.r_list); if (signal_pending(current)) { msg = ERR_PTR(-ERESTARTNOHAND); out_unlock: msg_unlock(msq); break; } } if (IS_ERR(msg)) return PTR_ERR(msg); msgsz = (msgsz > msg->m_ts) ? msg->m_ts : msgsz; *pmtype = msg->m_type; if (store_msg(mtext, msg, msgsz)) msgsz = -EFAULT; free_msg(msg); return msgsz; }
recv_msgコールバック関数の実装
tipc_create()でソケットを作成すると、struct socket *sock->opsに、コールバック関数群のstruct proto_ops packet_opsが設定される。そのメンバー.recvmsg=recv_msgが設定されている。static const struct proto_ops packet_ops = { .owner = THIS_MODULE, .family = AF_TIPC, .release = release, .bind = bind, .connect = connect, .socketpair = sock_no_socketpair, .accept = accept, .getname = get_name, .poll = poll, .ioctl = sock_no_ioctl, .listen = listen, .shutdown = shutdown, .setsockopt = setsockopt, .getsockopt = getsockopt, .sendmsg = send_packet, .recvmsg = recv_msg, .mmap = sock_no_mmap, .sendpage = sock_no_sendpage };recv_msg()のメッセージ取得は、ソケットのsk->sk_receive_queue下のリストで管理しており、IPCメッセージとは別物である事が判断できる。
static int recv_msg(struct kiocb *iocb, struct socket *sock, struct msghdr *m, size_t buf_len, int flags) { struct sock *sk = sock->sk; struct tipc_port *tport = tipc_sk_port(sk); struct sk_buff *buf; struct tipc_msg *msg; long timeout; unsigned int sz; u32 err; int res; if (unlikely(!buf_len)) return -EINVAL; lock_sock(sk); if (unlikely(sock->state == SS_UNCONNECTED)) { res = -ENOTCONN; goto exit; } timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT); restart: while (skb_queue_empty(&sk->sk_receive_queue)) { if (sock->state == SS_DISCONNECTING) { res = -ENOTCONN; goto exit; } if (timeout <= 0L) { res = timeout ? timeout : -EWOULDBLOCK; goto exit; } release_sock(sk); timeout = wait_event_interruptible_timeout(*sk_sleep(sk), tipc_rx_ready(sock), timeout); lock_sock(sk); } buf = skb_peek(&sk->sk_receive_queue); msg = buf_msg(buf); sz = msg_data_sz(msg); : : } static inline int skb_queue_empty(const struct sk_buff_head *list) { return list->next == (struct sk_buff *)list; } static inline struct sk_buff *skb_peek(const struct sk_buff_head *list_) { struct sk_buff *list = ((const struct sk_buff *)list_)->next; if (list == (struct sk_buff *)list_) list = NULL; return list; }