STREAM送信のウエイト


sk->sk_sndtimeoはconnectでの実装のように、STREAM送信でも同じような実装がなされています。

STREAM送信実装は、sk->sk_sndbuを最大サイズとし、sk->sk_sndbu/2 - 64サイズ単位で送信し(受信プロセス等の他プロセスのスケジュール効率化故かと思います。)、この時、これまでの送信済みサイズがsk->sk_sndbuを超えるなら、sk->sk_sndtimeoによりウエイトします。送信サイズは送信毎にsk->sk_wmem_allocに加算され、受信毎に減算されます。従って、受信側と連動してるなら、送信データサイズに制限はありません。

検証サンプル
[root@localhost c]# cat sk_sendtime.c
#include <stdio.h>
#include <sys/socket.h>
#include <time.h>

char  buf[2000];

int    set_buffsize(int socket, int optname, int buf_size)
{
       int     val;
       int len = sizeof(sizeof(int));
       char    *name;

       val = buf_size;
       setsockopt(socket, SOL_SOCKET, optname, (char *)&val, len);
       getsockopt(socket, SOL_SOCKET, optname, &val, &len);
       printf("  sk->sk_sndbu:%d\n\n", val);
       return val;
}

void    sd_time(int sock, int time)
{
       struct timeval send_tv;

       send_tv.tv_sec  = time;
       send_tv.tv_usec = 0;
       setsockopt( sock, SOL_SOCKET, SO_SNDTIMEO, &send_tv, sizeof(send_tv) );
}

void main(int argc, char *argv[]) {
       int     sk[2], wtime, ret, bufsize;
       time_t  time1, time2;

       socketpair( AF_UNIX, SOCK_STREAM, 0, sk);

       sd_time(sk[0], atoi(argv[2]));
       bufsize = set_buffsize(sk[0], SO_SNDBUF, 0);

       time(&time1);
       ret = write(sk[0], buf, atoi(argv[1]));
       time(&time2);
       wtime = (int)difftime(time2,time1);

       printf("per writed size:%4d\n", ((bufsize >> 1) - 64));
       printf("         writed:%4d(waited:%d)\n", ret, wtime);
}
結果
[root@localhost c]# ./sk_sendtime.o 960 0  <----sk->sk_sndbu:2048/2 - 64 = 960以下なら送信は1回で完了
   sk->sk_sndbu:2048

per writed size: 960
         writed: 960(waited:0)

[root@localhost c]# ./sk_sendtime.o 961 0  <----sk->sk_sndbu:2048/2 - 64 = 960以上なら2回送信され、2回目のバッファ取得でウエイト。
   sk->sk_sndbu:2048

^C

[root@localhost c]# ./sk_sendtime.o 961 1  <----2回目のバッファ取得で1秒ウエイト後、全データが送信される事なく完了
   sk->sk_sndbu:2048

per writed size: 960
         writed: 960(waited:1)

[root@localhost c]# ./sk_sendtime.o 961 -1  <----2回目のバッファ取得でウエイトせず、全データが送信される事なく完了
   sk->sk_sndbu:2048

per writed size: 960
        writed: 960(waited:0)
実装
static int unix_stream_sendmsg(struct kiocb *kiocb, struct socket *sock,
                              struct msghdr *msg, size_t len)
{
       struct sock_iocb *siocb = kiocb_to_siocb(kiocb);
       struct sock *sk = sock->sk;
       struct sock *other = NULL;
       int err, size;
       struct sk_buff *skb;
       int sent = 0;
       struct scm_cookie tmp_scm;
       bool fds_sent = false;
       int max_level;

       if (NULL == siocb->scm)
               siocb->scm = &tmp_scm;
       wait_for_unix_gc();
       err = scm_send(sock, msg, siocb->scm);
       if (err < 0)
               return err;

       err = -EOPNOTSUPP;
       if (msg->msg_flags&MSG_OOB)
               goto out_err;

       if (msg->msg_namelen) {
               err = sk->sk_state == TCP_ESTABLISHED ? -EISCONN : -EOPNOTSUPP;
               goto out_err;
       } else {
               err = -ENOTCONN;
               other = unix_peer(sk);
               if (!other)
                       goto out_err;
       }

       if (sk->sk_shutdown & SEND_SHUTDOWN)
               goto pipe_err;

       while (sent < len) {
               size = len-sent;

               if (size > ((sk->sk_sndbuf >> 1) - 64))
                       size = (sk->sk_sndbuf >> 1) - 64;

               if (size > SKB_MAX_ALLOC)
                       size = SKB_MAX_ALLOC;

               skb = sock_alloc_send_skb(sk, size, msg->msg_flags&MSG_DONTWAIT,
                                         &err);

               if (skb == NULL)
                       goto out_err;

               size = min_t(int, size, skb_tailroom(skb));

               err = unix_scm_to_skb(siocb->scm, skb, !fds_sent);
               if (err < 0) {
                       kfree_skb(skb);
                       goto out_err;
               }
               max_level = err + 1;
               fds_sent = true;

               err = memcpy_fromiovec(skb_put(skb, size), msg->msg_iov, size);
               if (err) {
                       kfree_skb(skb);
                       goto out_err;
               }

               unix_state_lock(other);

               if (sock_flag(other, SOCK_DEAD) ||
                   (other->sk_shutdown & RCV_SHUTDOWN))
                       goto pipe_err_free;

               maybe_add_creds(skb, sock, other);
               skb_queue_tail(&other->sk_receive_queue, skb);
               if (max_level > unix_sk(other)->recursion_level)
                       unix_sk(other)->recursion_level = max_level;
               unix_state_unlock(other);
               other->sk_data_ready(other, size);
               sent += size;
       }

       scm_destroy(siocb->scm);
       siocb->scm = NULL;

       return sent;

pipe_err_free:
       unix_state_unlock(other);
       kfree_skb(skb);
pipe_err:
       if (sent == 0 && !(msg->msg_flags&MSG_NOSIGNAL))
               send_sig(SIGPIPE, current, 0);
       err = -EPIPE;
out_err:
       scm_destroy(siocb->scm);
       siocb->scm = NULL;
       return sent ? : err;
}

struct sk_buff *sock_alloc_send_skb(struct sock *sk, unsigned long size,
                                    int noblock, int *errcode)
{
       return sock_alloc_send_pskb(sk, size, 0, noblock, errcode);
}

struct sk_buff *sock_alloc_send_pskb(struct sock *sk, unsigned long header_len,
                                    unsigned long data_len, int noblock,
                                    int *errcode)
{
       struct sk_buff *skb;
       gfp_t gfp_mask;
       long timeo;
       int err;

       gfp_mask = sk->sk_allocation;
       if (gfp_mask & __GFP_WAIT)
               gfp_mask |= __GFP_REPEAT;

       timeo = sock_sndtimeo(sk, noblock);
       while (1) {
               err = sock_error(sk);
               if (err != 0)
                       goto failure;

               err = -EPIPE;
               if (sk->sk_shutdown & SEND_SHUTDOWN)
                       goto failure;

               if (atomic_read(&sk->sk_wmem_alloc) < sk->sk_sndbuf) {
                       skb = alloc_skb(header_len, gfp_mask);
                       if (skb) {
                               int npages;
                               int i;

                               if (!data_len)
                                       break;

                               npages = (data_len + (PAGE_SIZE - 1)) >> PAGE_SHIFT;
                               skb->truesize += data_len;
                               skb_shinfo(skb)->nr_frags = npages;
                               for (i = 0; i < npages; i++) {
                                       struct page *page;

                                       page = alloc_pages(sk->sk_allocation, 0);
                                       if (!page) {
                                               err = -ENOBUFS;
                                               skb_shinfo(skb)->nr_frags = i;
                                               kfree_skb(skb);
                                               goto failure;
                                       }

                                       __skb_fill_page_desc(skb, i,
                                                       page, 0,
                                                       (data_len >= PAGE_SIZE ?
                                                        PAGE_SIZE :
                                                        data_len));
                                       data_len -= PAGE_SIZE;
                               }

                               break;
                       }
                       err = -ENOBUFS;
                       goto failure;
               }
               set_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags);
               set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
               err = -EAGAIN;
               if (!timeo)
                       goto failure;
               if (signal_pending(current))
                       goto interrupted;
               timeo = sock_wait_for_wmem(sk, timeo);
       }

       skb_set_owner_w(skb, sk);
       return skb;

interrupted:
       err = sock_intr_errno(timeo);
failure:
       *errcode = err;
       return NULL;
}
取得サイズは、sizeof(struct skb_shared_info)を加算したアライメントとなります。
struct sk_buff *sock_alloc_send_pskb(struct sock *sk, unsigned long header_len,
                                    unsigned long data_len, int noblock,
                                    int *errcode)
{
       struct sk_buff *skb;
       gfp_t gfp_mask;
       long timeo;
       int err;

       gfp_mask = sk->sk_allocation;
       if (gfp_mask & __GFP_WAIT)
               gfp_mask |= __GFP_REPEAT;

       timeo = sock_sndtimeo(sk, noblock);
       while (1) {
               err = sock_error(sk);
               if (err != 0)
                       goto failure;

               err = -EPIPE;
               if (sk->sk_shutdown & SEND_SHUTDOWN)
                       goto failure;

               if (atomic_read(&sk->sk_wmem_alloc) < sk->sk_sndbuf) {
                       skb = alloc_skb(header_len, gfp_mask);
                       if (skb) {
                               int npages;
                               int i;

                               if (!data_len)
                                       break;

                               npages = (data_len + (PAGE_SIZE - 1)) >> PAGE_SHIFT;
                               skb->truesize += data_len;
                               skb_shinfo(skb)->nr_frags = npages;
                               for (i = 0; i < npages; i++) {
                                       struct page *page;

                                       page = alloc_pages(sk->sk_allocation, 0);
                                       if (!page) {
                                               err = -ENOBUFS;
                                               skb_shinfo(skb)->nr_frags = i;
                                               kfree_skb(skb);
                                               goto failure;
                                       }

                                       __skb_fill_page_desc(skb, i,
                                                       page, 0,
                                                       (data_len >= PAGE_SIZE ?
                                                        PAGE_SIZE :
                                                        data_len));
                                       data_len -= PAGE_SIZE;
                               }

                               break;
                       }
                       err = -ENOBUFS;
                       goto failure;
               }
               set_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags);
               set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
               err = -EAGAIN;
               if (!timeo)
                       goto failure;
               if (signal_pending(current))
                       goto interrupted;
               timeo = sock_wait_for_wmem(sk, timeo);
       }

       skb_set_owner_w(skb, sk);
       return skb;

interrupted:
       err = sock_intr_errno(timeo);
failure:
       *errcode = err;
       return NULL;
}


最終更新 2016/10/10 17:01:33 - north
(2016/10/10 16:50:28 作成)


検索

アクセス数
3661578
最近のコメント
コアダンプファイル - sakaia
list_head構造体 - yocto_no_yomikata
勧告ロックと強制ロック - wataash
LKMからのファイル出力 - 重松 宏昌
kprobe - ななし
ksetの実装 - スーパーコピー
カーネルスレッドとは - ノース
カーネルスレッドとは - nbyst
asmlinkageってなに? - ノース
asmlinkageってなに? - よろしく
Adsense
広告情報が設定されていません。