vmsplice
書込みpipeの vmspliceは引数データをpipeバッファに複写するのでなく、struct iovec.iov_base引数データのpageをpipeバッファpageとし、データのpage内のオフセット/サイズをpipeバッファの属性のoffset/lenに設定。故にvmsplice後に設定したユーザバッファデータ変更すれば、pipeバッファを更新した事になる。
pipeのwriteは、書込データがpage領域を超えるとpipe->nrbufs++とし次書込みは次バッファの別ページで、page領域を超えなければ同pageに追加され、pipe_buffernのlenに追加サイズが加算されるが、vmspliceはデータがpageサイズを超えなくても設定毎にpipe->nrbufs++。従ってvmspliceは設定バッファに関係なくデフォルトでPIPE_DEF_BUFFERSの16回しか実行できない。小さいサイズバッファを多々扱う場合、配列によるバッファを配列[0]のアドレスをvmspliceすべきである。
読込pipeのvmspliceは、書込みpipeの係る実装でなく、pipeのreadそのものである。そもそも読/書pipeバッファは共有、読込pipeバッファを設定できるとすると、書込pipeバッファにも設定した事になり、読込pipeなのに書込みしたことになってしまう。
vmsplice :23
pipe read:a1234b1234567c123456789
[root@north a]# ./w-vmsplice.out max-do_vmsplice
vmspliced cnt:16 [PIPE_DEF_BUFFERS=16]
pipeのwriteは、書込データがpage領域を超えるとpipe->nrbufs++とし次書込みは次バッファの別ページで、page領域を超えなければ同pageに追加され、pipe_buffernのlenに追加サイズが加算されるが、vmspliceはデータがpageサイズを超えなくても設定毎にpipe->nrbufs++。従ってvmspliceは設定バッファに関係なくデフォルトでPIPE_DEF_BUFFERSの16回しか実行できない。小さいサイズバッファを多々扱う場合、配列によるバッファを配列[0]のアドレスをvmspliceすべきである。
読込pipeのvmspliceは、書込みpipeの係る実装でなく、pipeのreadそのものである。そもそも読/書pipeバッファは共有、読込pipeバッファを設定できるとすると、書込pipeバッファにも設定した事になり、読込pipeなのに書込みしたことになってしまう。
サンプル
[root@north a]# cat w-vmsplice.c#define _GNU_SOURCE #include <stdio.h> #include <unistd.h> #include <fcntl.h> #include <string.h> #include <sys/syscall.h> void pread1(int pfd); void pread2(int pfd); void pbuff_read(void); char buf1[20], buf2[20], buf3[20]; void main(int argc, char *argv[]) { int cnt, fd, pw, pr, pfd[2]; struct iovec iov[3]; pipe2(pfd, O_NONBLOCK); pr = pfd[0]; pw = pfd[1]; strcpy(buf1, "0123456789"); strcpy(buf2, "0123456789"); strcpy(buf3, "0123456789"); iov[0].iov_base = buf1; iov[0].iov_len = 5; iov[1].iov_base = buf2; iov[1].iov_len = 8; iov[2].iov_base = buf3; iov[2].iov_len = 10; if (argc == 1) { cnt = syscall(SYS_vmsplice, pw, iov, 3, O_NONBLOCK); printf("vmspliced :%d\n", cnt); buf1[0] = 'a'; buf2[0] = 'b'; buf3[0] = 'c'; <- pipeバッファに設定されたbuf1/buf2/buf3を更新 pread1(pr); } else { cnt = 0; while (1) { if (syscall(SYS_vmsplice, pw, &iov[0], 1, SPLICE_F_NONBLOCK) <= 0) { break; } cnt++; } printf("vmspliced cnt:%d [PIPE_DEF_BUFFERS=16]\n", cnt); } } void pread1(int pfd) { char buf[32]; printf("pipe read:"); while(1) { buf[1] = 0; if (read(pfd, buf, 1) > 0) { printf("%s", buf); } else { break; } } printf("\n"); }[root@north a]# ./w-vmsplice.out
vmsplice :23
pipe read:a1234b1234567c123456789
[root@north a]# ./w-vmsplice.out max-do_vmsplice
vmspliced cnt:16 [PIPE_DEF_BUFFERS=16]
カーネル
pipeはpageをバッファとするstruct pipe_bufferを16有するデフォルト領域で、pipeのwriteは、pageサイズを超て、次のstruct pipe_bufferへ書き込まれるが、vmspliceは同じページであっても設定毎に新規struct pipe_bufferへ設定される。#define PIPE_DEF_BUFFERS 16 struct inode { : union { struct pipe_inode_info *i_pipe; struct block_device *i_bdev; struct cdev *i_cdev; }; : }; struct pipe_inode_info { wait_queue_head_t wait; unsigned int nrbufs, curbuf, buffers; unsigned int readers; unsigned int writers; unsigned int waiting_writers; unsigned int r_counter; unsigned int w_counter; struct page *tmp_page; struct fasync_struct *fasync_readers; struct fasync_struct *fasync_writers; struct inode *inode; struct pipe_buffer *bufs; <-- bufs[PIPE_DEF_BUFFERS] }; struct pipe_buffer { struct page *page; unsigned int offset, len; const struct pipe_buf_operations *ops; unsigned int flags; unsigned long private; }; 読込page(bufs->page) bufs = inode->i_pipe->bufs + inode->i_pipe->curbuf 書込page(bufs->page) bufs = inode->i_pipe->bufs + inode->i_pipe->curbuf + inode->i_pipe->nrbufs pipe作成のデフォルトメモリサイズはpage*PIPE_DEF_BUFFERS(4096*16) struct pipe_inode_info * alloc_pipe_info(struct inode *inode) { struct pipe_inode_info *pipe; pipe = kzalloc(sizeof(struct pipe_inode_info), GFP_KERNEL); if (pipe) { pipe->bufs = kzalloc(sizeof(struct pipe_buffer) * PIPE_DEF_BUFFERS, GFP_KERNEL); if (pipe->bufs) { init_waitqueue_head(&pipe->wait); pipe->r_counter = pipe->w_counter = 1; pipe->inode = inode; pipe->buffers = PIPE_DEF_BUFFERS; return pipe; } kfree(pipe); } return NULL; } SYSCALL_DEFINE4(vmsplice, int, fd, const struct iovec __user *, iov, unsigned long, nr_segs, unsigned int, flags) { struct fd f; long error; if (unlikely(nr_segs > UIO_MAXIOV)) return -EINVAL; else if (unlikely(!nr_segs)) return 0; error = -EBADF; f = fdget(fd); if (f.file) { if (f.file->f_mode & FMODE_WRITE) error = vmsplice_to_pipe(f.file, iov, nr_segs, flags); else if (f.file->f_mode & FMODE_READ) error = vmsplice_to_user(f.file, iov, nr_segs, flags); fdput(f); } return error; } static long vmsplice_to_pipe(struct file *file, const struct iovec __user *iov, unsigned long nr_segs, unsigned int flags) { struct pipe_inode_info *pipe; struct page *pages[PIPE_DEF_BUFFERS]; struct partial_page partial[PIPE_DEF_BUFFERS]; struct splice_pipe_desc spd = { .pages = pages, .partial = partial, .nr_pages_max = PIPE_DEF_BUFFERS, .flags = flags, .ops = &user_page_pipe_buf_ops, .spd_release = spd_release_page, }; long ret; pipe = get_pipe_info(file); if (!pipe) return -EBADF; if (splice_grow_spd(pipe, &spd)) return -ENOMEM; spd.nr_pages = get_iovec_page_array(iov, nr_segs, spd.pages, spd.partial, false, spd.nr_pages_max); if (spd.nr_pages <= 0) ret = spd.nr_pages; else ret = splice_to_pipe(pipe, &spd); splice_shrink_spd(&spd); return ret; } get_iovec_page_array()でユーザ引数のiovec __user *iovのデータのpageと係るoffset/len等の属性が*spdに設定される。 ssize_t splice_to_pipe(struct pipe_inode_info *pipe, struct splice_pipe_desc *spd) { unsigned int spd_pages = spd->nr_pages; int ret, do_wakeup, page_nr; ret = 0; do_wakeup = 0; page_nr = 0; pipe_lock(pipe); for (;;) { if (!pipe->readers) { send_sig(SIGPIPE, current, 0); if (!ret) ret = -EPIPE; break; } if (pipe->nrbufs < pipe->buffers) { int newbuf = (pipe->curbuf + pipe->nrbufs) & (pipe->buffers - 1); struct pipe_buffer *buf = pipe->bufs + newbuf; buf->page = spd->pages[page_nr]; <-設定バッファのページ(page_nrはiov[]の配列数に係る引数) buf->offset = spd->partial[page_nr].offset; <-設定バッファのページ内のオフセット buf->len = spd->partial[page_nr].len; <-設定バッファの長さ buf->private = spd->partial[page_nr].private; buf->ops = spd->ops; if (spd->flags & SPLICE_F_GIFT) buf->flags |= PIPE_BUF_FLAG_GIFT; pipe->nrbufs++; <-設定バッファがページサイズを超えなくても、次pipe書込みは新規pipeバッファ page_nr++; ret += buf->len; if (pipe->inode) do_wakeup = 1; if (!--spd->nr_pages) break; if (pipe->nrbufs < pipe->buffers) continue; break; } if (spd->flags & SPLICE_F_NONBLOCK) { if (!ret) ret = -EAGAIN; break; } if (signal_pending(current)) { if (!ret) ret = -ERESTARTSYS; break; } if (do_wakeup) { smp_mb(); if (waitqueue_active(&pipe->wait)) wake_up_interruptible_sync(&pipe->wait); kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN); do_wakeup = 0; } pipe->waiting_writers++; pipe_wait(pipe); pipe->waiting_writers--; } pipe_unlock(pipe); if (do_wakeup) wakeup_pipe_readers(pipe); while (page_nr < spd_pages) spd->spd_release(spd, page_nr++); return ret; }