STREAM送信のウエイト
Rev.1を表示中。最新版はこちら。
sk->sk_sndtimeoはconnectでの実装のように、STREAM送信でも同じような実装がなされています。STREAM送信実装は、sk->sk_sndbuを最大サイズとし、sk->sk_sndbu/2 - 64サイズ単位で送信し(受信プロセス等の他プロセスのスケジュール効率化故かと思います。)、この時、これまでの送信済みサイズがsk->sk_sndbuを超えるなら、sk->sk_sndtimeoによりウエイトします。送信サイズは送信毎にsk->sk_wmem_allocに加算され、受信毎に減算されます。従って、受信側と連動してるなら、送信データサイズに制限はありません。
検証サンプル
[root@localhost c]# cat sk_sendtime.c #include <stdio.h> #include <sys/socket.h> #include <time.h> char buf[2000]; int set_buffsize(int socket, int optname, int buf_size) { int val; int len = sizeof(sizeof(int)); char *name; val = buf_size; setsockopt(socket, SOL_SOCKET, optname, (char *)&val, len); getsockopt(socket, SOL_SOCKET, optname, &val, &len); printf(" sk->sk_sndbu:%d\n\n", val); return val; } void sd_time(int sock, int time) { struct timeval send_tv; send_tv.tv_sec = time; send_tv.tv_usec = 0; setsockopt( sock, SOL_SOCKET, SO_SNDTIMEO, &send_tv, sizeof(send_tv) ); } void main(int argc, char *argv[]) { int sk[2], wtime, ret, bufsize; time_t time1, time2; socketpair( AF_UNIX, SOCK_STREAM, 0, sk); sd_time(sk[0], atoi(argv[2])); bufsize = set_buffsize(sk[0], SO_SNDBUF, 0); time(&time1); ret = write(sk[0], buf, atoi(argv[1])); time(&time2); wtime = (int)difftime(time2,time1); printf("per writed size:%4d\n", ((bufsize >> 1) - 64)); printf(" writed:%4d(waited:%d)\n", ret, wtime); }結果
[root@localhost c]# ./sk_sendtime.o 960 0 <----sk->sk_sndbu:2048/2 - 64 = 960以下なら送信は1回で完了 sk->sk_sndbu:2048 per writed size: 960 writed: 960(waited:0) [root@localhost c]# ./sk_sendtime.o 961 0 <----sk->sk_sndbu:2048/2 - 64 = 960以上なら2回送信され、2回目のバッファ取得でウエイト。 sk->sk_sndbu:2048 ^C [root@localhost c]# ./sk_sendtime.o 961 1 <----2回目のバッファ取得で1秒ウエイト後、全データが送信される事なく完了 sk->sk_sndbu:2048 per writed size: 960 writed: 960(waited:1) [root@localhost c]# ./sk_sendtime.o 961 -1 <----2回目のバッファ取得でウエイトせず、全データが送信される事なく完了 sk->sk_sndbu:2048 per writed size: 960 writed: 960(waited:0)実装
static int unix_stream_sendmsg(struct kiocb *kiocb, struct socket *sock, struct msghdr *msg, size_t len) { struct sock_iocb *siocb = kiocb_to_siocb(kiocb); struct sock *sk = sock->sk; struct sock *other = NULL; int err, size; struct sk_buff *skb; int sent = 0; struct scm_cookie tmp_scm; bool fds_sent = false; int max_level; if (NULL == siocb->scm) siocb->scm = &tmp_scm; wait_for_unix_gc(); err = scm_send(sock, msg, siocb->scm); if (err < 0) return err; err = -EOPNOTSUPP; if (msg->msg_flags&MSG_OOB) goto out_err; if (msg->msg_namelen) { err = sk->sk_state == TCP_ESTABLISHED ? -EISCONN : -EOPNOTSUPP; goto out_err; } else { err = -ENOTCONN; other = unix_peer(sk); if (!other) goto out_err; } if (sk->sk_shutdown & SEND_SHUTDOWN) goto pipe_err; while (sent < len) { size = len-sent; if (size > ((sk->sk_sndbuf >> 1) - 64)) size = (sk->sk_sndbuf >> 1) - 64; if (size > SKB_MAX_ALLOC) size = SKB_MAX_ALLOC; skb = sock_alloc_send_skb(sk, size, msg->msg_flags&MSG_DONTWAIT, &err); if (skb == NULL) goto out_err; size = min_t(int, size, skb_tailroom(skb)); err = unix_scm_to_skb(siocb->scm, skb, !fds_sent); if (err < 0) { kfree_skb(skb); goto out_err; } max_level = err + 1; fds_sent = true; err = memcpy_fromiovec(skb_put(skb, size), msg->msg_iov, size); if (err) { kfree_skb(skb); goto out_err; } unix_state_lock(other); if (sock_flag(other, SOCK_DEAD) || (other->sk_shutdown & RCV_SHUTDOWN)) goto pipe_err_free; maybe_add_creds(skb, sock, other); skb_queue_tail(&other->sk_receive_queue, skb); if (max_level > unix_sk(other)->recursion_level) unix_sk(other)->recursion_level = max_level; unix_state_unlock(other); other->sk_data_ready(other, size); sent += size; } scm_destroy(siocb->scm); siocb->scm = NULL; return sent; pipe_err_free: unix_state_unlock(other); kfree_skb(skb); pipe_err: if (sent == 0 && !(msg->msg_flags&MSG_NOSIGNAL)) send_sig(SIGPIPE, current, 0); err = -EPIPE; out_err: scm_destroy(siocb->scm); siocb->scm = NULL; return sent ? : err; } struct sk_buff *sock_alloc_send_skb(struct sock *sk, unsigned long size, int noblock, int *errcode) { return sock_alloc_send_pskb(sk, size, 0, noblock, errcode); } struct sk_buff *sock_alloc_send_pskb(struct sock *sk, unsigned long header_len, unsigned long data_len, int noblock, int *errcode) { struct sk_buff *skb; gfp_t gfp_mask; long timeo; int err; gfp_mask = sk->sk_allocation; if (gfp_mask & __GFP_WAIT) gfp_mask |= __GFP_REPEAT; timeo = sock_sndtimeo(sk, noblock); while (1) { err = sock_error(sk); if (err != 0) goto failure; err = -EPIPE; if (sk->sk_shutdown & SEND_SHUTDOWN) goto failure; if (atomic_read(&sk->sk_wmem_alloc) < sk->sk_sndbuf) { skb = alloc_skb(header_len, gfp_mask); if (skb) { int npages; int i; if (!data_len) break; npages = (data_len + (PAGE_SIZE - 1)) >> PAGE_SHIFT; skb->truesize += data_len; skb_shinfo(skb)->nr_frags = npages; for (i = 0; i < npages; i++) { struct page *page; page = alloc_pages(sk->sk_allocation, 0); if (!page) { err = -ENOBUFS; skb_shinfo(skb)->nr_frags = i; kfree_skb(skb); goto failure; } __skb_fill_page_desc(skb, i, page, 0, (data_len >= PAGE_SIZE ? PAGE_SIZE : data_len)); data_len -= PAGE_SIZE; } break; } err = -ENOBUFS; goto failure; } set_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags); set_bit(SOCK_NOSPACE, &sk->sk_socket->flags); err = -EAGAIN; if (!timeo) goto failure; if (signal_pending(current)) goto interrupted; timeo = sock_wait_for_wmem(sk, timeo); } skb_set_owner_w(skb, sk); return skb; interrupted: err = sock_intr_errno(timeo); failure: *errcode = err; return NULL; }