msgrcv()とrecv_msg()の違い
コンドの日本人仲間と楽しむために、カラオケサーバを作成した。当初フラッシュで選曲/再生ともクライアントで、そいつをTVにつないで楽しむ。という按配で、複数の仲間と楽しむといったケースで操作性に問題あり。そこで、再生はサーバ側とすべき、mplayerをバックエンドで動作させるシステムを構築した。
上記を実装する上で、mplayer自身にいろいろオプションがあって(在りすぎて良くわからん。と言うのが正直な所。ソースを調べてまでの思いも無し。)、実現する上で、よりスマートな方法があるんだろうが、
、プレーア・プロセスに依存しない形態との思いで、クライアントから送られてくるメッセージ(曲)
を、CGIとしてIPCメッセージに蓄えて、デーモンとして起動している再生プロセスが、そのメッセージを受信して、mplayerにて再生するものである。
mplayerは、idleオプションを指定しなければ、再生が終わると終了する。mplayerは子プロセスとして起動するようにして、親プロセスはwaitで待機しており、mplayerの終了シグナルが送られてくる毎に、再度メッセージを受信し、子プロセスとしてmplayerを起動しつずけるというものだ。
そこで、クライアントが選曲したら、サーバとしてその曲をメッセージに蓄えるわけだが、クライアントへのリスポンスとして、現在の再生待ちのリストを返したい。
IPCメッセージには、PEEKするような機能が見当たらない。調べてみるとrecv_msg()と言うのがあって、これには、MS_PEEKオプションを指定することで、メッセージのPEEKが可能と言うことだ。recv_msg()はソケットでメッセージをやり取りするのだが、たぶんglibで実装されていて、実態はIPCメッセージだと。そうであるなら、msgrcvシステムコールでPEEKする事は可能では。と思い立ったのが事の始まり。
recv_msgはソケットインターフェースのコールバック関数として定義されていて、メッセージは、このソケット下で管理されていて、結論として、msgrcvとrecv_msgはまったく別物だった。そしてmsgrcvには、PEEKの処理は実装されていない。
(現行カラオケシステムでは、すべてメッセージ全て受信して、再度送信させている。)
ipcperms()でメッセージが読み込み可能かチェックした後、tmp = msq->q_messages.nextとする。tmpはメッセージリストの最初のメッセージとなり、while (tmp != &msq->q_messages)で、すべてのメッセージを捜査する。ここでmsgtyp 引き数に応じた、メッセージを取得することになる。
メッセージが取得できれば、msgにメッセージが、そうでないならmsgにはERR_PTR(-EAGAIN)でループを抜けることになる。
if (!IS_ERR(msg))はメッセージが取得できた時の処理で、メッセージキューのメッセージ数/受信時刻等の更新をしているが、同時に、無条件にlist_del()でそのメッセージをそのものを削除している。
( ss_wakeup()は、メッセージバッファーが一杯で、メッセージバッファー待ちでウエイトしている送信側プロセスを起床するものである。)
とりあえず、msgrcvシステムコールの実装では、メッセージをピークすることはできない。
上記を実装する上で、mplayer自身にいろいろオプションがあって(在りすぎて良くわからん。と言うのが正直な所。ソースを調べてまでの思いも無し。)、実現する上で、よりスマートな方法があるんだろうが、
、プレーア・プロセスに依存しない形態との思いで、クライアントから送られてくるメッセージ(曲)
を、CGIとしてIPCメッセージに蓄えて、デーモンとして起動している再生プロセスが、そのメッセージを受信して、mplayerにて再生するものである。
mplayerは、idleオプションを指定しなければ、再生が終わると終了する。mplayerは子プロセスとして起動するようにして、親プロセスはwaitで待機しており、mplayerの終了シグナルが送られてくる毎に、再度メッセージを受信し、子プロセスとしてmplayerを起動しつずけるというものだ。
そこで、クライアントが選曲したら、サーバとしてその曲をメッセージに蓄えるわけだが、クライアントへのリスポンスとして、現在の再生待ちのリストを返したい。
IPCメッセージには、PEEKするような機能が見当たらない。調べてみるとrecv_msg()と言うのがあって、これには、MS_PEEKオプションを指定することで、メッセージのPEEKが可能と言うことだ。recv_msg()はソケットでメッセージをやり取りするのだが、たぶんglibで実装されていて、実態はIPCメッセージだと。そうであるなら、msgrcvシステムコールでPEEKする事は可能では。と思い立ったのが事の始まり。
recv_msgはソケットインターフェースのコールバック関数として定義されていて、メッセージは、このソケット下で管理されていて、結論として、msgrcvとrecv_msgはまったく別物だった。そしてmsgrcvには、PEEKの処理は実装されていない。
(現行カラオケシステムでは、すべてメッセージ全て受信して、再度送信させている。)
msgrcvシステムコールの実装
msgrcvシステムコールはdo_msgrcv()をコールする。まず引数のチェックを行い、プロセスディスクリプタのipcネームスペースから、msg_lock_check()でipcメッセージヘッダをロックして取得する。後のforループがメッセージ取得処理となる。ipcperms()でメッセージが読み込み可能かチェックした後、tmp = msq->q_messages.nextとする。tmpはメッセージリストの最初のメッセージとなり、while (tmp != &msq->q_messages)で、すべてのメッセージを捜査する。ここでmsgtyp 引き数に応じた、メッセージを取得することになる。
メッセージが取得できれば、msgにメッセージが、そうでないならmsgにはERR_PTR(-EAGAIN)でループを抜けることになる。
if (!IS_ERR(msg))はメッセージが取得できた時の処理で、メッセージキューのメッセージ数/受信時刻等の更新をしているが、同時に、無条件にlist_del()でそのメッセージをそのものを削除している。
( ss_wakeup()は、メッセージバッファーが一杯で、メッセージバッファー待ちでウエイトしている送信側プロセスを起床するものである。)
とりあえず、msgrcvシステムコールの実装では、メッセージをピークすることはできない。
long do_msgrcv(int msqid, long *pmtype, void __user *mtext,
size_t msgsz, long msgtyp, int msgflg)
{
struct msg_queue *msq;
struct msg_msg *msg;
int mode;
struct ipc_namespace *ns;
if (msqid < 0 || (long) msgsz < 0)
return -EINVAL;
mode = convert_mode(&msgtyp, msgflg);
ns = current->nsproxy->ipc_ns;
msq = msg_lock_check(ns, msqid);
if (IS_ERR(msq))
return PTR_ERR(msq);
for (;;) {
struct msg_receiver msr_d;
struct list_head *tmp;
msg = ERR_PTR(-EACCES);
if (ipcperms(ns, &msq->q_perm, S_IRUGO))
goto out_unlock;
msg = ERR_PTR(-EAGAIN);
tmp = msq->q_messages.next;
while (tmp != &msq->q_messages) {
struct msg_msg *walk_msg;
walk_msg = list_entry(tmp, struct msg_msg, m_list);
if (testmsg(walk_msg, msgtyp, mode) &&
!security_msg_queue_msgrcv(msq, walk_msg, current,
msgtyp, mode)) {
msg = walk_msg;
if (mode == SEARCH_LESSEQUAL &&
walk_msg->m_type != 1) {
msg = walk_msg;
msgtyp = walk_msg->m_type - 1;
} else {
msg = walk_msg;
break;
}
}
tmp = tmp->next;
}
if (!IS_ERR(msg)) {
if ((msgsz < msg->m_ts) && !(msgflg & MSG_NOERROR)) {
msg = ERR_PTR(-E2BIG);
goto out_unlock;
}
list_del(&msg->m_list);
msq->q_qnum--;
msq->q_rtime = get_seconds();
msq->q_lrpid = task_tgid_vnr(current);
msq->q_cbytes -= msg->m_ts;
atomic_sub(msg->m_ts, &ns->msg_bytes);
atomic_dec(&ns->msg_hdrs);
ss_wakeup(&msq->q_senders, 0);
msg_unlock(msq);
break;
}
if (msgflg & IPC_NOWAIT) {
msg = ERR_PTR(-ENOMSG);
goto out_unlock;
}
list_add_tail(&msr_d.r_list, &msq->q_receivers);
msr_d.r_tsk = current;
msr_d.r_msgtype = msgtyp;
msr_d.r_mode = mode;
if (msgflg & MSG_NOERROR)
msr_d.r_maxsize = INT_MAX;
else
msr_d.r_maxsize = msgsz;
msr_d.r_msg = ERR_PTR(-EAGAIN);
current->state = TASK_INTERRUPTIBLE;
msg_unlock(msq);
schedule();
rcu_read_lock();
msg = (struct msg_msg*)msr_d.r_msg;
while (msg == NULL) {
cpu_relax();
msg = (struct msg_msg *)msr_d.r_msg;
}
if (msg != ERR_PTR(-EAGAIN)) {
rcu_read_unlock();
break;
}
ipc_lock_by_ptr(&msq->q_perm);
rcu_read_unlock();
msg = (struct msg_msg*)msr_d.r_msg;
if (msg != ERR_PTR(-EAGAIN))
goto out_unlock;
list_del(&msr_d.r_list);
if (signal_pending(current)) {
msg = ERR_PTR(-ERESTARTNOHAND);
out_unlock:
msg_unlock(msq);
break;
}
}
if (IS_ERR(msg))
return PTR_ERR(msg);
msgsz = (msgsz > msg->m_ts) ? msg->m_ts : msgsz;
*pmtype = msg->m_type;
if (store_msg(mtext, msg, msgsz))
msgsz = -EFAULT;
free_msg(msg);
return msgsz;
}
recv_msgコールバック関数の実装
tipc_create()でソケットを作成すると、struct socket *sock->opsに、コールバック関数群のstruct proto_ops packet_opsが設定される。そのメンバー.recvmsg=recv_msgが設定されている。
static const struct proto_ops packet_ops = {
.owner = THIS_MODULE,
.family = AF_TIPC,
.release = release,
.bind = bind,
.connect = connect,
.socketpair = sock_no_socketpair,
.accept = accept,
.getname = get_name,
.poll = poll,
.ioctl = sock_no_ioctl,
.listen = listen,
.shutdown = shutdown,
.setsockopt = setsockopt,
.getsockopt = getsockopt,
.sendmsg = send_packet,
.recvmsg = recv_msg,
.mmap = sock_no_mmap,
.sendpage = sock_no_sendpage
};
recv_msg()のメッセージ取得は、ソケットのsk->sk_receive_queue下のリストで管理しており、IPCメッセージとは別物である事が判断できる。
static int recv_msg(struct kiocb *iocb, struct socket *sock,
struct msghdr *m, size_t buf_len, int flags)
{
struct sock *sk = sock->sk;
struct tipc_port *tport = tipc_sk_port(sk);
struct sk_buff *buf;
struct tipc_msg *msg;
long timeout;
unsigned int sz;
u32 err;
int res;
if (unlikely(!buf_len))
return -EINVAL;
lock_sock(sk);
if (unlikely(sock->state == SS_UNCONNECTED)) {
res = -ENOTCONN;
goto exit;
}
timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
restart:
while (skb_queue_empty(&sk->sk_receive_queue)) {
if (sock->state == SS_DISCONNECTING) {
res = -ENOTCONN;
goto exit;
}
if (timeout <= 0L) {
res = timeout ? timeout : -EWOULDBLOCK;
goto exit;
}
release_sock(sk);
timeout = wait_event_interruptible_timeout(*sk_sleep(sk),
tipc_rx_ready(sock),
timeout);
lock_sock(sk);
}
buf = skb_peek(&sk->sk_receive_queue);
msg = buf_msg(buf);
sz = msg_data_sz(msg);
:
:
}
static inline int skb_queue_empty(const struct sk_buff_head *list)
{
return list->next == (struct sk_buff *)list;
}
static inline struct sk_buff *skb_peek(const struct sk_buff_head *list_)
{
struct sk_buff *list = ((const struct sk_buff *)list_)->next;
if (list == (struct sk_buff *)list_)
list = NULL;
return list;
}







