STREAM送信のウエイト
sk->sk_sndtimeoはconnectでの実装のように、STREAM送信でも同じような実装がなされています。
STREAM送信実装は、sk->sk_sndbuを最大サイズとし、sk->sk_sndbu/2 - 64サイズ単位で送信し(受信プロセス等の他プロセスのスケジュール効率化故かと思います。)、この時、これまでの送信済みサイズがsk->sk_sndbuを超えるなら、sk->sk_sndtimeoによりウエイトします。送信サイズは送信毎にsk->sk_wmem_allocに加算され、受信毎に減算されます。従って、受信側と連動してるなら、送信データサイズに制限はありません。
検証サンプル
STREAM送信実装は、sk->sk_sndbuを最大サイズとし、sk->sk_sndbu/2 - 64サイズ単位で送信し(受信プロセス等の他プロセスのスケジュール効率化故かと思います。)、この時、これまでの送信済みサイズがsk->sk_sndbuを超えるなら、sk->sk_sndtimeoによりウエイトします。送信サイズは送信毎にsk->sk_wmem_allocに加算され、受信毎に減算されます。従って、受信側と連動してるなら、送信データサイズに制限はありません。
検証サンプル
[root@localhost c]# cat sk_sendtime.c
#include <stdio.h>
#include <sys/socket.h>
#include <time.h>
char buf[2000];
int set_buffsize(int socket, int optname, int buf_size)
{
int val;
int len = sizeof(sizeof(int));
char *name;
val = buf_size;
setsockopt(socket, SOL_SOCKET, optname, (char *)&val, len);
getsockopt(socket, SOL_SOCKET, optname, &val, &len);
printf(" sk->sk_sndbu:%d\n\n", val);
return val;
}
void sd_time(int sock, int time)
{
struct timeval send_tv;
send_tv.tv_sec = time;
send_tv.tv_usec = 0;
setsockopt( sock, SOL_SOCKET, SO_SNDTIMEO, &send_tv, sizeof(send_tv) );
}
void main(int argc, char *argv[]) {
int sk[2], wtime, ret, bufsize;
time_t time1, time2;
socketpair( AF_UNIX, SOCK_STREAM, 0, sk);
sd_time(sk[0], atoi(argv[2]));
bufsize = set_buffsize(sk[0], SO_SNDBUF, 0);
time(&time1);
ret = write(sk[0], buf, atoi(argv[1]));
time(&time2);
wtime = (int)difftime(time2,time1);
printf("per writed size:%4d\n", ((bufsize >> 1) - 64));
printf(" writed:%4d(waited:%d)\n", ret, wtime);
}
結果
[root@localhost c]# ./sk_sendtime.o 960 0 <----sk->sk_sndbu:2048/2 - 64 = 960以下なら送信は1回で完了
sk->sk_sndbu:2048
per writed size: 960
writed: 960(waited:0)
[root@localhost c]# ./sk_sendtime.o 961 0 <----sk->sk_sndbu:2048/2 - 64 = 960以上なら2回送信され、2回目のバッファ取得でウエイト。
sk->sk_sndbu:2048
^C
[root@localhost c]# ./sk_sendtime.o 961 1 <----2回目のバッファ取得で1秒ウエイト後、全データが送信される事なく完了
sk->sk_sndbu:2048
per writed size: 960
writed: 960(waited:1)
[root@localhost c]# ./sk_sendtime.o 961 -1 <----2回目のバッファ取得でウエイトせず、全データが送信される事なく完了
sk->sk_sndbu:2048
per writed size: 960
writed: 960(waited:0)
実装
static int unix_stream_sendmsg(struct kiocb *kiocb, struct socket *sock,
struct msghdr *msg, size_t len)
{
struct sock_iocb *siocb = kiocb_to_siocb(kiocb);
struct sock *sk = sock->sk;
struct sock *other = NULL;
int err, size;
struct sk_buff *skb;
int sent = 0;
struct scm_cookie tmp_scm;
bool fds_sent = false;
int max_level;
if (NULL == siocb->scm)
siocb->scm = &tmp_scm;
wait_for_unix_gc();
err = scm_send(sock, msg, siocb->scm);
if (err < 0)
return err;
err = -EOPNOTSUPP;
if (msg->msg_flags&MSG_OOB)
goto out_err;
if (msg->msg_namelen) {
err = sk->sk_state == TCP_ESTABLISHED ? -EISCONN : -EOPNOTSUPP;
goto out_err;
} else {
err = -ENOTCONN;
other = unix_peer(sk);
if (!other)
goto out_err;
}
if (sk->sk_shutdown & SEND_SHUTDOWN)
goto pipe_err;
while (sent < len) {
size = len-sent;
if (size > ((sk->sk_sndbuf >> 1) - 64))
size = (sk->sk_sndbuf >> 1) - 64;
if (size > SKB_MAX_ALLOC)
size = SKB_MAX_ALLOC;
skb = sock_alloc_send_skb(sk, size, msg->msg_flags&MSG_DONTWAIT,
&err);
if (skb == NULL)
goto out_err;
size = min_t(int, size, skb_tailroom(skb));
err = unix_scm_to_skb(siocb->scm, skb, !fds_sent);
if (err < 0) {
kfree_skb(skb);
goto out_err;
}
max_level = err + 1;
fds_sent = true;
err = memcpy_fromiovec(skb_put(skb, size), msg->msg_iov, size);
if (err) {
kfree_skb(skb);
goto out_err;
}
unix_state_lock(other);
if (sock_flag(other, SOCK_DEAD) ||
(other->sk_shutdown & RCV_SHUTDOWN))
goto pipe_err_free;
maybe_add_creds(skb, sock, other);
skb_queue_tail(&other->sk_receive_queue, skb);
if (max_level > unix_sk(other)->recursion_level)
unix_sk(other)->recursion_level = max_level;
unix_state_unlock(other);
other->sk_data_ready(other, size);
sent += size;
}
scm_destroy(siocb->scm);
siocb->scm = NULL;
return sent;
pipe_err_free:
unix_state_unlock(other);
kfree_skb(skb);
pipe_err:
if (sent == 0 && !(msg->msg_flags&MSG_NOSIGNAL))
send_sig(SIGPIPE, current, 0);
err = -EPIPE;
out_err:
scm_destroy(siocb->scm);
siocb->scm = NULL;
return sent ? : err;
}
struct sk_buff *sock_alloc_send_skb(struct sock *sk, unsigned long size,
int noblock, int *errcode)
{
return sock_alloc_send_pskb(sk, size, 0, noblock, errcode);
}
struct sk_buff *sock_alloc_send_pskb(struct sock *sk, unsigned long header_len,
unsigned long data_len, int noblock,
int *errcode)
{
struct sk_buff *skb;
gfp_t gfp_mask;
long timeo;
int err;
gfp_mask = sk->sk_allocation;
if (gfp_mask & __GFP_WAIT)
gfp_mask |= __GFP_REPEAT;
timeo = sock_sndtimeo(sk, noblock);
while (1) {
err = sock_error(sk);
if (err != 0)
goto failure;
err = -EPIPE;
if (sk->sk_shutdown & SEND_SHUTDOWN)
goto failure;
if (atomic_read(&sk->sk_wmem_alloc) < sk->sk_sndbuf) {
skb = alloc_skb(header_len, gfp_mask);
if (skb) {
int npages;
int i;
if (!data_len)
break;
npages = (data_len + (PAGE_SIZE - 1)) >> PAGE_SHIFT;
skb->truesize += data_len;
skb_shinfo(skb)->nr_frags = npages;
for (i = 0; i < npages; i++) {
struct page *page;
page = alloc_pages(sk->sk_allocation, 0);
if (!page) {
err = -ENOBUFS;
skb_shinfo(skb)->nr_frags = i;
kfree_skb(skb);
goto failure;
}
__skb_fill_page_desc(skb, i,
page, 0,
(data_len >= PAGE_SIZE ?
PAGE_SIZE :
data_len));
data_len -= PAGE_SIZE;
}
break;
}
err = -ENOBUFS;
goto failure;
}
set_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags);
set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
err = -EAGAIN;
if (!timeo)
goto failure;
if (signal_pending(current))
goto interrupted;
timeo = sock_wait_for_wmem(sk, timeo);
}
skb_set_owner_w(skb, sk);
return skb;
interrupted:
err = sock_intr_errno(timeo);
failure:
*errcode = err;
return NULL;
}
取得サイズは、sizeof(struct skb_shared_info)を加算したアライメントとなります。
struct sk_buff *sock_alloc_send_pskb(struct sock *sk, unsigned long header_len,
unsigned long data_len, int noblock,
int *errcode)
{
struct sk_buff *skb;
gfp_t gfp_mask;
long timeo;
int err;
gfp_mask = sk->sk_allocation;
if (gfp_mask & __GFP_WAIT)
gfp_mask |= __GFP_REPEAT;
timeo = sock_sndtimeo(sk, noblock);
while (1) {
err = sock_error(sk);
if (err != 0)
goto failure;
err = -EPIPE;
if (sk->sk_shutdown & SEND_SHUTDOWN)
goto failure;
if (atomic_read(&sk->sk_wmem_alloc) < sk->sk_sndbuf) {
skb = alloc_skb(header_len, gfp_mask);
if (skb) {
int npages;
int i;
if (!data_len)
break;
npages = (data_len + (PAGE_SIZE - 1)) >> PAGE_SHIFT;
skb->truesize += data_len;
skb_shinfo(skb)->nr_frags = npages;
for (i = 0; i < npages; i++) {
struct page *page;
page = alloc_pages(sk->sk_allocation, 0);
if (!page) {
err = -ENOBUFS;
skb_shinfo(skb)->nr_frags = i;
kfree_skb(skb);
goto failure;
}
__skb_fill_page_desc(skb, i,
page, 0,
(data_len >= PAGE_SIZE ?
PAGE_SIZE :
data_len));
data_len -= PAGE_SIZE;
}
break;
}
err = -ENOBUFS;
goto failure;
}
set_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags);
set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
err = -EAGAIN;
if (!timeo)
goto failure;
if (signal_pending(current))
goto interrupted;
timeo = sock_wait_for_wmem(sk, timeo);
}
skb_set_owner_w(skb, sk);
return skb;
interrupted:
err = sock_intr_errno(timeo);
failure:
*errcode = err;
return NULL;
}





