unixドメインソケット送信


・DGRAM型
データサイズがsk->sk_sndbuf - 32を超えるとエラーです。sock_alloc_send_skb()で送信データサイズのバッファ等を設定したskbを取得し、skb->tailに送信データを複写し、 skb_queue_tail()で送信先ソケットのother->sk_receive_queueにリストします。other->sk_data_ready()は、受信待機しているプロセスを起床させother->sk_receive_queueにリストしたskbを取得し受信します。

other->sk_receive_queueにリストされている数が、other->sk_max_ack_backlogを超えてるなら、unix_wait_for_peer()でother->peer_waitを起床させながら、other->sk_receive_queueにリストできるまでgoto restartでループします。
static int unix_dgram_sendmsg(struct kiocb *kiocb, struct socket *sock,
                             struct msghdr *msg, int len)
{
       struct sock_iocb *siocb = kiocb_to_siocb(kiocb);
       struct sock *sk = sock->sk;
       struct unix_sock *u = unix_sk(sk);
       struct sockaddr_un *sunaddr=msg->msg_name;
       struct sock *other = NULL;
       int namelen = 0; /* fake GCC */
       int err;
       unsigned hash;
       struct sk_buff *skb;
       long timeo;
       struct scm_cookie tmp_scm;

       if (NULL == siocb->scm)
               siocb->scm = &tmp_scm;
       err = scm_send(sock, msg, siocb->scm);
       if (err < 0)
               return err;

       err = -EOPNOTSUPP;
       if (msg->msg_flags&MSG_OOB)
               goto out;

       if (msg->msg_namelen) {
               err = unix_mkname(sunaddr, msg->msg_namelen, &hash);
               if (err < 0)
                       goto out;
               namelen = err;
       } else {
               sunaddr = NULL;
               err = -ENOTCONN;
               other = unix_peer_get(sk);
               if (!other)
                       goto out;
       }

       if (sock->passcred && !u->addr && (err = unix_autobind(sock)) != 0)
               goto out;

       err = -EMSGSIZE;
       if ((unsigned)len > sk->sk_sndbuf - 32)
               goto out;

       skb = sock_alloc_send_skb(sk, len, msg->msg_flags&MSG_DONTWAIT, &err);
       if (skb==NULL)
               goto out;

       memcpy(UNIXCREDS(skb), &siocb->scm->creds, sizeof(struct ucred));
       if (siocb->scm->fp)
               unix_attach_fds(siocb->scm, skb);

       skb->h.raw = skb->data;
       err = memcpy_fromiovec(skb_put(skb,len), msg->msg_iov, len);
       if (err)
               goto out_free;

       timeo = sock_sndtimeo(sk, msg->msg_flags & MSG_DONTWAIT);

restart:
       if (!other) {
               err = -ECONNRESET;
               if (sunaddr == NULL)
                       goto out_free;

               other = unix_find_other(sunaddr, namelen, sk->sk_type,
                                       hash, &err);
               if (other==NULL)
                       goto out_free;
       }

       unix_state_rlock(other);
       err = -EPERM;
       if (!unix_may_send(sk, other))
               goto out_unlock;

       if (sock_flag(other, SOCK_DEAD)) {
               unix_state_runlock(other);
               sock_put(other);

               err = 0;
               unix_state_wlock(sk);
               if (unix_peer(sk) == other) {
                       unix_peer(sk)=NULL;
                       unix_state_wunlock(sk);

                       unix_dgram_disconnected(sk, other);
                       sock_put(other);
                       err = -ECONNREFUSED;
               } else {
                       unix_state_wunlock(sk);
               }

               other = NULL;
               if (err)
                       goto out_free;
               goto restart;
       }

       err = -EPIPE;
       if (other->sk_shutdown & RCV_SHUTDOWN)
               goto out_unlock;

       err = security_unix_may_send(sk->sk_socket, other->sk_socket);
       if (err)
               goto out_unlock;

       if (unix_peer(other) != sk &&
           (skb_queue_len(&other->sk_receive_queue) >
            other->sk_max_ack_backlog)) {
               if (!timeo) {
                       err = -EAGAIN;
                       goto out_unlock;
               }

               timeo = unix_wait_for_peer(other, timeo);

               err = sock_intr_errno(timeo);
               if (signal_pending(current))
                       goto out_free;

               goto restart;
       }

       skb_queue_tail(&other->sk_receive_queue, skb);
       unix_state_runlock(other);
       other->sk_data_ready(other, len);
       sock_put(other);
       scm_destroy(siocb->scm);
       return len;

out_unlock:
       unix_state_runlock(other);
out_free:
       kfree_skb(skb);
out:
       if (other)
               sock_put(other);
       scm_destroy(siocb->scm);
       return err;
}
・STREAM型
DGRAM型と異なり、送信データのサイズ制限はありませんが、送信ブロックをソケットのsk->sk_sndbuf/2単位でskbに設定し、送信先のother->sk_receive_queueにリストします。1対1のやり取り故、分割送信での受信側の問題はなく、分割送信する事でタスクスケジュール機能が向上します。なお、other->sk_receive_queueにリストできる制限もありません。
static int unix_stream_sendmsg(struct kiocb *kiocb, struct socket *sock,
                              struct msghdr *msg, int len)
{
       struct sock_iocb *siocb = kiocb_to_siocb(kiocb);
       struct sock *sk = sock->sk;
       struct sock *other = NULL;
       struct sockaddr_un *sunaddr=msg->msg_name;
       int err,size;
       struct sk_buff *skb;
       int sent=0;
       struct scm_cookie tmp_scm;

       if (NULL == siocb->scm)
               siocb->scm = &tmp_scm;
       err = scm_send(sock, msg, siocb->scm);
       if (err < 0)
               return err;

       err = -EOPNOTSUPP;
       if (msg->msg_flags&MSG_OOB)
               goto out_err;

       if (msg->msg_namelen) {
               err = sk->sk_state == TCP_ESTABLISHED ? -EISCONN : -EOPNOTSUPP;
               goto out_err;
       } else {
               sunaddr = NULL;
               err = -ENOTCONN;
               other = unix_peer_get(sk);
               if (!other)
                       goto out_err;
       }

       if (sk->sk_shutdown & SEND_SHUTDOWN)
               goto pipe_err;

       while(sent < len)
       {

               size=len-sent;

               if (size > sk->sk_sndbuf / 2 - 64)
                       size = sk->sk_sndbuf / 2 - 64;

               if (size > SKB_MAX_ALLOC)
                       size = SKB_MAX_ALLOC;
                
               skb=sock_alloc_send_skb(sk,size,msg->msg_flags&MSG_DONTWAIT, &err);

               if (skb==NULL)
                       goto out_err;


               /*
                *      If you pass two values to the sock_alloc_send_skb
                *      it tries to grab the large buffer with GFP_NOFS
                *      (which can fail easily), and if it fails grab the
                *      fallback size buffer which is under a page and will
                *      succeed. [Alan]
                */
               size = min_t(int, size, skb_tailroom(skb));

               memcpy(UNIXCREDS(skb), &siocb->scm->creds, sizeof(struct ucred));
               if (siocb->scm->fp)
                       unix_attach_fds(siocb->scm, skb);

               if ((err = memcpy_fromiovec(skb_put(skb,size), msg->msg_iov, size)) != 0) {
                       kfree_skb(skb);
                       goto out_err;
               }

               unix_state_rlock(other);

               if (sock_flag(other, SOCK_DEAD) ||
                   (other->sk_shutdown & RCV_SHUTDOWN))
                       goto pipe_err_free;

               skb_queue_tail(&other->sk_receive_queue, skb);
               unix_state_runlock(other);
               other->sk_data_ready(other, size);
               sent+=size;
       }
       sock_put(other);

       scm_destroy(siocb->scm);
       siocb->scm = NULL;

       return sent;

pipe_err_free:
       unix_state_runlock(other);
       kfree_skb(skb);
pipe_err:
       if (sent==0 && !(msg->msg_flags&MSG_NOSIGNAL))
               send_sig(SIGPIPE,current,0);
       err = -EPIPE;
out_err:
       if (other)
               sock_put(other);
       scm_destroy(siocb->scm);
       siocb->scm = NULL;
       return sent ? : err;
}
・SEQPACKET型
connect/acceptは、accept/connectを介する1対1のやり取りのSTREAMの実装としながら、送信はDGRAMの実装となります。
static const struct proto_ops unix_seqpacket_ops = {
       .family =       PF_UNIX,
       .owner =        THIS_MODULE,
       .release =      unix_release,
       .bind =         unix_bind,
       .connect =      unix_stream_connect,
       .socketpair =   unix_socketpair,
       .accept =       unix_accept,
       .getname =      unix_getname,
       .poll =         unix_dgram_poll,
       .ioctl =        unix_ioctl,
       .listen =       unix_listen,
       .shutdown =     unix_shutdown,
       .setsockopt =   sock_no_setsockopt,
       .getsockopt =   sock_no_getsockopt,
       .sendmsg =      unix_seqpacket_sendmsg,
       .recvmsg =      unix_seqpacket_recvmsg,
       .mmap =         sock_no_mmap,
       .sendpage =     sock_no_sendpage,
};

static int unix_seqpacket_sendmsg(struct kiocb *kiocb, struct socket *sock,
                                 struct msghdr *msg, size_t len)
{
       int err;
       struct sock *sk = sock->sk;

       err = sock_error(sk);
       if (err)
               return err;

       if (sk->sk_state != TCP_ESTABLISHED)
               return -ENOTCONN;

       if (msg->msg_namelen)
               msg->msg_namelen = 0;

       return unix_dgram_sendmsg(kiocb, sock, msg, len);
}

バッファサイズはアライメントしたサイズに、struct skb_shared_infoの領域を付加したサイズで取得し、そのサイズ値をskb->tail = data/skb->end = data + sizeとします。struct skb_shared_infoはpage単位で取得した領域のアドレスが設定され、struct skb_shared_infoをすべてのsdnに設定することで、STREAMで送信が分割されるケースでも、受信では共有するデータを受信できるような事が可能となります。なおUNIXドメインソケットでは利用されていません。
struct sk_buff *alloc_skb(unsigned int size, int gfp_mask)
{
       struct sk_buff *skb;
       u8 *data;

       skb = kmem_cache_alloc(skbuff_head_cache,
                              gfp_mask & ~__GFP_DMA);
       if (!skb)
               goto out;

       size = SKB_DATA_ALIGN(size);
       data = kmalloc(size + sizeof(struct skb_shared_info), gfp_mask);
       if (!data)
               goto nodata;

       memset(skb, 0, offsetof(struct sk_buff, truesize));
       skb->truesize = size + sizeof(struct sk_buff);
       atomic_set(&skb->users, 1);
       skb->head = data;
       skb->data = data;
       skb->tail = data;
       skb->end  = data + size;

       atomic_set(&(skb_shinfo(skb)->dataref), 1);
       skb_shinfo(skb)->nr_frags  = 0;
       skb_shinfo(skb)->tso_size = 0;
       skb_shinfo(skb)->tso_segs = 0;
       skb_shinfo(skb)->frag_list = NULL;
out:
       return skb;
nodata:
       kmem_cache_free(skbuff_head_cache, skb);
       skb = NULL;
       goto out;
}

補足

ソケット作成時のデフォルトsk->sk_sndbufは、/proc/sys/net/core/wmem_defaultで、setsockoptシステムコールで更新できます。
[root@localhost north]# cat /proc/sys/net/core/wmem_default
163840

疑問

if ((unsigned)len > sk->sk_sndbuf - 32)の32は、struct skb_shared_infoを考慮しての事でしょうか?

size = min_t(int, size, skb_tailroom(skb))で、アライメントとstruct skb_shared_infoの領域で、size < skb_tailroom(skb)ではありますが、ソースコメントの説明だと、メモリ獲得時に待機する場合で、しかも分割して送信するケース故・・・、でsize>skb_tailroom(skb)の発生するのって、そのケースが全く分からず、min_t(int, size, skb_tailroom(skb))の必要性で悩んでいます。

skb_is_nonlinear(skb)は、struct skb_shared_infoのバッファのpage数で、UNIXドメインソケットは0です。
static inline int skb_tailroom(const struct sk_buff *skb)
{
       return skb_is_nonlinear(skb) ? 0 : skb->end - skb->tail;
}

static inline int skb_is_nonlinear(const struct sk_buff *skb)
{
       return skb->data_len;
}

最終更新 2015/12/25 17:08:36 - north
(2015/12/25 17:08:36 作成)


検索

アクセス数
3713062
最近のコメント
コアダンプファイル - sakaia
list_head構造体 - yocto_no_yomikata
勧告ロックと強制ロック - wataash
LKMからのファイル出力 - 重松 宏昌
kprobe - ななし
ksetの実装 - スーパーコピー
カーネルスレッドとは - ノース
カーネルスレッドとは - nbyst
asmlinkageってなに? - ノース
asmlinkageってなに? - よろしく
Adsense
広告情報が設定されていません。