vmsplice
書込みpipeの vmspliceは引数データをpipeバッファに複写するのでなく、struct iovec.iov_base引数データのpageをpipeバッファpageとし、データのpage内のオフセット/サイズをpipeバッファの属性のoffset/lenに設定。故にvmsplice後に設定したユーザバッファデータ変更すれば、pipeバッファを更新した事になる。
pipeのwriteは、書込データがpage領域を超えるとpipe->nrbufs++とし次書込みは次バッファの別ページで、page領域を超えなければ同pageに追加され、pipe_buffernのlenに追加サイズが加算されるが、vmspliceはデータがpageサイズを超えなくても設定毎にpipe->nrbufs++。従ってvmspliceは設定バッファに関係なくデフォルトでPIPE_DEF_BUFFERSの16回しか実行できない。小さいサイズバッファを多々扱う場合、配列によるバッファを配列[0]のアドレスをvmspliceすべきである。
読込pipeのvmspliceは、書込みpipeの係る実装でなく、pipeのreadそのものである。そもそも読/書pipeバッファは共有、読込pipeバッファを設定できるとすると、書込pipeバッファにも設定した事になり、読込pipeなのに書込みしたことになってしまう。
vmsplice :23
pipe read:a1234b1234567c123456789
[root@north a]# ./w-vmsplice.out max-do_vmsplice
vmspliced cnt:16 [PIPE_DEF_BUFFERS=16]
pipeのwriteは、書込データがpage領域を超えるとpipe->nrbufs++とし次書込みは次バッファの別ページで、page領域を超えなければ同pageに追加され、pipe_buffernのlenに追加サイズが加算されるが、vmspliceはデータがpageサイズを超えなくても設定毎にpipe->nrbufs++。従ってvmspliceは設定バッファに関係なくデフォルトでPIPE_DEF_BUFFERSの16回しか実行できない。小さいサイズバッファを多々扱う場合、配列によるバッファを配列[0]のアドレスをvmspliceすべきである。
読込pipeのvmspliceは、書込みpipeの係る実装でなく、pipeのreadそのものである。そもそも読/書pipeバッファは共有、読込pipeバッファを設定できるとすると、書込pipeバッファにも設定した事になり、読込pipeなのに書込みしたことになってしまう。
サンプル
[root@north a]# cat w-vmsplice.c
#define _GNU_SOURCE
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <sys/syscall.h>
void pread1(int pfd);
void pread2(int pfd);
void pbuff_read(void);
char buf1[20], buf2[20], buf3[20];
void main(int argc, char *argv[])
{
int cnt, fd, pw, pr, pfd[2];
struct iovec iov[3];
pipe2(pfd, O_NONBLOCK);
pr = pfd[0]; pw = pfd[1];
strcpy(buf1, "0123456789");
strcpy(buf2, "0123456789");
strcpy(buf3, "0123456789");
iov[0].iov_base = buf1; iov[0].iov_len = 5;
iov[1].iov_base = buf2; iov[1].iov_len = 8;
iov[2].iov_base = buf3; iov[2].iov_len = 10;
if (argc == 1) {
cnt = syscall(SYS_vmsplice, pw, iov, 3, O_NONBLOCK);
printf("vmspliced :%d\n", cnt);
buf1[0] = 'a'; buf2[0] = 'b'; buf3[0] = 'c'; <- pipeバッファに設定されたbuf1/buf2/buf3を更新
pread1(pr);
}
else {
cnt = 0;
while (1) {
if (syscall(SYS_vmsplice, pw, &iov[0], 1, SPLICE_F_NONBLOCK) <= 0) {
break;
}
cnt++;
}
printf("vmspliced cnt:%d [PIPE_DEF_BUFFERS=16]\n", cnt);
}
}
void pread1(int pfd)
{
char buf[32];
printf("pipe read:");
while(1) {
buf[1] = 0;
if (read(pfd, buf, 1) > 0) {
printf("%s", buf);
}
else {
break;
}
}
printf("\n");
}
[root@north a]# ./w-vmsplice.out vmsplice :23
pipe read:a1234b1234567c123456789
[root@north a]# ./w-vmsplice.out max-do_vmsplice
vmspliced cnt:16 [PIPE_DEF_BUFFERS=16]
カーネル
pipeはpageをバッファとするstruct pipe_bufferを16有するデフォルト領域で、pipeのwriteは、pageサイズを超て、次のstruct pipe_bufferへ書き込まれるが、vmspliceは同じページであっても設定毎に新規struct pipe_bufferへ設定される。
#define PIPE_DEF_BUFFERS 16
struct inode {
:
union {
struct pipe_inode_info *i_pipe;
struct block_device *i_bdev;
struct cdev *i_cdev;
};
:
};
struct pipe_inode_info {
wait_queue_head_t wait;
unsigned int nrbufs, curbuf, buffers;
unsigned int readers;
unsigned int writers;
unsigned int waiting_writers;
unsigned int r_counter;
unsigned int w_counter;
struct page *tmp_page;
struct fasync_struct *fasync_readers;
struct fasync_struct *fasync_writers;
struct inode *inode;
struct pipe_buffer *bufs; <-- bufs[PIPE_DEF_BUFFERS]
};
struct pipe_buffer {
struct page *page;
unsigned int offset, len;
const struct pipe_buf_operations *ops;
unsigned int flags;
unsigned long private;
};
読込page(bufs->page) bufs = inode->i_pipe->bufs + inode->i_pipe->curbuf
書込page(bufs->page) bufs = inode->i_pipe->bufs + inode->i_pipe->curbuf + inode->i_pipe->nrbufs
pipe作成のデフォルトメモリサイズはpage*PIPE_DEF_BUFFERS(4096*16)
struct pipe_inode_info * alloc_pipe_info(struct inode *inode)
{
struct pipe_inode_info *pipe;
pipe = kzalloc(sizeof(struct pipe_inode_info), GFP_KERNEL);
if (pipe) {
pipe->bufs = kzalloc(sizeof(struct pipe_buffer) * PIPE_DEF_BUFFERS, GFP_KERNEL);
if (pipe->bufs) {
init_waitqueue_head(&pipe->wait);
pipe->r_counter = pipe->w_counter = 1;
pipe->inode = inode;
pipe->buffers = PIPE_DEF_BUFFERS;
return pipe;
}
kfree(pipe);
}
return NULL;
}
SYSCALL_DEFINE4(vmsplice, int, fd, const struct iovec __user *, iov,
unsigned long, nr_segs, unsigned int, flags)
{
struct fd f;
long error;
if (unlikely(nr_segs > UIO_MAXIOV))
return -EINVAL;
else if (unlikely(!nr_segs))
return 0;
error = -EBADF;
f = fdget(fd);
if (f.file) {
if (f.file->f_mode & FMODE_WRITE)
error = vmsplice_to_pipe(f.file, iov, nr_segs, flags);
else if (f.file->f_mode & FMODE_READ)
error = vmsplice_to_user(f.file, iov, nr_segs, flags);
fdput(f);
}
return error;
}
static long vmsplice_to_pipe(struct file *file, const struct iovec __user *iov,
unsigned long nr_segs, unsigned int flags)
{
struct pipe_inode_info *pipe;
struct page *pages[PIPE_DEF_BUFFERS];
struct partial_page partial[PIPE_DEF_BUFFERS];
struct splice_pipe_desc spd = {
.pages = pages,
.partial = partial,
.nr_pages_max = PIPE_DEF_BUFFERS,
.flags = flags,
.ops = &user_page_pipe_buf_ops,
.spd_release = spd_release_page,
};
long ret;
pipe = get_pipe_info(file);
if (!pipe)
return -EBADF;
if (splice_grow_spd(pipe, &spd))
return -ENOMEM;
spd.nr_pages = get_iovec_page_array(iov, nr_segs, spd.pages,
spd.partial, false,
spd.nr_pages_max);
if (spd.nr_pages <= 0)
ret = spd.nr_pages;
else
ret = splice_to_pipe(pipe, &spd);
splice_shrink_spd(&spd);
return ret;
}
get_iovec_page_array()でユーザ引数のiovec __user *iovのデータのpageと係るoffset/len等の属性が*spdに設定される。
ssize_t splice_to_pipe(struct pipe_inode_info *pipe, struct splice_pipe_desc *spd)
{
unsigned int spd_pages = spd->nr_pages;
int ret, do_wakeup, page_nr;
ret = 0;
do_wakeup = 0;
page_nr = 0;
pipe_lock(pipe);
for (;;) {
if (!pipe->readers) {
send_sig(SIGPIPE, current, 0);
if (!ret)
ret = -EPIPE;
break;
}
if (pipe->nrbufs < pipe->buffers) {
int newbuf = (pipe->curbuf + pipe->nrbufs) & (pipe->buffers - 1);
struct pipe_buffer *buf = pipe->bufs + newbuf;
buf->page = spd->pages[page_nr]; <-設定バッファのページ(page_nrはiov[]の配列数に係る引数)
buf->offset = spd->partial[page_nr].offset; <-設定バッファのページ内のオフセット
buf->len = spd->partial[page_nr].len; <-設定バッファの長さ
buf->private = spd->partial[page_nr].private;
buf->ops = spd->ops;
if (spd->flags & SPLICE_F_GIFT)
buf->flags |= PIPE_BUF_FLAG_GIFT;
pipe->nrbufs++; <-設定バッファがページサイズを超えなくても、次pipe書込みは新規pipeバッファ
page_nr++;
ret += buf->len;
if (pipe->inode)
do_wakeup = 1;
if (!--spd->nr_pages)
break;
if (pipe->nrbufs < pipe->buffers)
continue;
break;
}
if (spd->flags & SPLICE_F_NONBLOCK) {
if (!ret)
ret = -EAGAIN;
break;
}
if (signal_pending(current)) {
if (!ret)
ret = -ERESTARTSYS;
break;
}
if (do_wakeup) {
smp_mb();
if (waitqueue_active(&pipe->wait))
wake_up_interruptible_sync(&pipe->wait);
kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
do_wakeup = 0;
}
pipe->waiting_writers++;
pipe_wait(pipe);
pipe->waiting_writers--;
}
pipe_unlock(pipe);
if (do_wakeup)
wakeup_pipe_readers(pipe);
while (page_nr < spd_pages)
spd->spd_release(spd, page_nr++);
return ret;
}






