vmsplice


書込みpipeの vmspliceは引数データをpipeバッファに複写するのでなく、struct iovec.iov_base引数データのpageをpipeバッファpageとし、データのpage内のオフセット/サイズをpipeバッファの属性のoffset/lenに設定。故にvmsplice後に設定したユーザバッファデータ変更すれば、pipeバッファを更新した事になる。

pipeのwriteは、書込データがpage領域を超えるとpipe->nrbufs++とし次書込みは次バッファの別ページで、page領域を超えなければ同pageに追加され、pipe_buffernのlenに追加サイズが加算されるが、vmspliceはデータがpageサイズを超えなくても設定毎にpipe->nrbufs++。従ってvmspliceは設定バッファに関係なくデフォルトでPIPE_DEF_BUFFERSの16回しか実行できない。小さいサイズバッファを多々扱う場合、配列によるバッファを配列[0]のアドレスをvmspliceすべきである。

読込pipeのvmspliceは、書込みpipeの係る実装でなく、pipeのreadそのものである。そもそも読/書pipeバッファは共有、読込pipeバッファを設定できるとすると、書込pipeバッファにも設定した事になり、読込pipeなのに書込みしたことになってしまう。

サンプル

[root@north a]# cat w-vmsplice.c
#define _GNU_SOURCE

#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <sys/syscall.h>

void    pread1(int pfd);
void    pread2(int pfd);
void    pbuff_read(void);

char    buf1[20], buf2[20], buf3[20];
void main(int argc, char *argv[])
{
    int     cnt, fd, pw, pr, pfd[2];
    struct iovec     iov[3];

    pipe2(pfd, O_NONBLOCK);
    pr = pfd[0]; pw = pfd[1];

    strcpy(buf1, "0123456789");
    strcpy(buf2, "0123456789");
    strcpy(buf3, "0123456789");

    iov[0].iov_base = buf1; iov[0].iov_len  = 5;
    iov[1].iov_base = buf2; iov[1].iov_len  = 8;
    iov[2].iov_base = buf3; iov[2].iov_len  = 10;

    if (argc == 1) { 
        cnt = syscall(SYS_vmsplice, pw, iov, 3, O_NONBLOCK);
        printf("vmspliced  :%d\n", cnt);

        buf1[0] = 'a';  buf2[0] = 'b';  buf3[0] = 'c';          <- pipeバッファに設定されたbuf1/buf2/buf3を更新
        pread1(pr);
    }
    else {
        cnt = 0;
        while (1) {
            if (syscall(SYS_vmsplice, pw, &iov[0], 1, SPLICE_F_NONBLOCK) <= 0) {
                break;
               }
            cnt++;
            }
         printf("vmspliced cnt:%d [PIPE_DEF_BUFFERS=16]\n", cnt);
    }
}

void     pread1(int pfd)
{
    char     buf[32];

    printf("pipe read:");
    while(1) {
         buf[1] = 0;
         if (read(pfd, buf, 1) > 0) {
              printf("%s", buf);
            }
         else {
              break;
         }
    }
    printf("\n");
}

[root@north a]# ./w-vmsplice.out
vmsplice :23
pipe read:a1234b1234567c123456789

[root@north a]# ./w-vmsplice.out max-do_vmsplice
vmspliced cnt:16 [PIPE_DEF_BUFFERS=16]

カーネル

pipeはpageをバッファとするstruct pipe_bufferを16有するデフォルト領域で、pipeのwriteは、pageサイズを超て、次のstruct pipe_bufferへ書き込まれるが、vmspliceは同じページであっても設定毎に新規struct pipe_bufferへ設定される。
#define PIPE_DEF_BUFFERS        16
struct inode {
 :
       union {
               struct pipe_inode_info  *i_pipe;
               struct block_device     *i_bdev;
               struct cdev             *i_cdev;
       };
 :
};

struct pipe_inode_info {
       wait_queue_head_t wait;
       unsigned int nrbufs, curbuf, buffers;
       unsigned int readers;
       unsigned int writers;
       unsigned int waiting_writers;
       unsigned int r_counter;
       unsigned int w_counter;
       struct page *tmp_page;
       struct fasync_struct *fasync_readers;
       struct fasync_struct *fasync_writers;
       struct inode *inode;
       struct pipe_buffer *bufs;               <-- bufs[PIPE_DEF_BUFFERS]
};

struct pipe_buffer {
       struct page *page;
       unsigned int offset, len;
       const struct pipe_buf_operations *ops;
       unsigned int flags;
       unsigned long private;
};
読込page(bufs->page) bufs = inode->i_pipe->bufs + inode->i_pipe->curbuf
書込page(bufs->page) bufs = inode->i_pipe->bufs + inode->i_pipe->curbuf + inode->i_pipe->nrbufs

pipe作成のデフォルトメモリサイズはpage*PIPE_DEF_BUFFERS(4096*16)
struct pipe_inode_info * alloc_pipe_info(struct inode *inode)
{
      struct pipe_inode_info *pipe;

      pipe = kzalloc(sizeof(struct pipe_inode_info), GFP_KERNEL);
      if (pipe) {
              pipe->bufs = kzalloc(sizeof(struct pipe_buffer) * PIPE_DEF_BUFFERS, GFP_KERNEL);
              if (pipe->bufs) {
                      init_waitqueue_head(&pipe->wait);
                      pipe->r_counter = pipe->w_counter = 1;
                      pipe->inode = inode;
                      pipe->buffers = PIPE_DEF_BUFFERS;
                      return pipe;
              }
              kfree(pipe);
      }
      return NULL;
}

SYSCALL_DEFINE4(vmsplice, int, fd, const struct iovec __user *, iov,
               unsigned long, nr_segs, unsigned int, flags)
{
       struct fd f;
       long error;

       if (unlikely(nr_segs > UIO_MAXIOV))
               return -EINVAL;
       else if (unlikely(!nr_segs))
               return 0;

       error = -EBADF;
       f = fdget(fd);
       if (f.file) {
               if (f.file->f_mode & FMODE_WRITE)
                       error = vmsplice_to_pipe(f.file, iov, nr_segs, flags);
               else if (f.file->f_mode & FMODE_READ)
                       error = vmsplice_to_user(f.file, iov, nr_segs, flags);

               fdput(f);
       }

       return error;
}

static long vmsplice_to_pipe(struct file *file, const struct iovec __user *iov,
                            unsigned long nr_segs, unsigned int flags)
{
       struct pipe_inode_info *pipe;
       struct page *pages[PIPE_DEF_BUFFERS];
       struct partial_page partial[PIPE_DEF_BUFFERS];
       struct splice_pipe_desc spd = {
               .pages = pages,
               .partial = partial,
               .nr_pages_max = PIPE_DEF_BUFFERS,
               .flags = flags,
               .ops = &user_page_pipe_buf_ops,
               .spd_release = spd_release_page,
       };
       long ret;

       pipe = get_pipe_info(file);
       if (!pipe)
               return -EBADF;

       if (splice_grow_spd(pipe, &spd))
               return -ENOMEM;

       spd.nr_pages = get_iovec_page_array(iov, nr_segs, spd.pages,
                                           spd.partial, false,
                                           spd.nr_pages_max);
       if (spd.nr_pages <= 0)
               ret = spd.nr_pages;
       else
               ret = splice_to_pipe(pipe, &spd);

       splice_shrink_spd(&spd);
       return ret;
}

get_iovec_page_array()でユーザ引数のiovec __user *iovのデータのpageと係るoffset/len等の属性が*spdに設定される。
ssize_t splice_to_pipe(struct pipe_inode_info *pipe, struct splice_pipe_desc *spd)
{
    unsigned int spd_pages = spd->nr_pages;
    int ret, do_wakeup, page_nr;

    ret = 0;
    do_wakeup = 0;
    page_nr = 0;

    pipe_lock(pipe);

    for (;;) {
        if (!pipe->readers) {
            send_sig(SIGPIPE, current, 0);
            if (!ret)
                ret = -EPIPE;
                break;
          }

        if (pipe->nrbufs < pipe->buffers) {
              int newbuf = (pipe->curbuf + pipe->nrbufs) & (pipe->buffers - 1);
              struct pipe_buffer *buf = pipe->bufs + newbuf;

              buf->page = spd->pages[page_nr];             <-設定バッファのページ(page_nrはiov[]の配列数に係る引数)
              buf->offset = spd->partial[page_nr].offset;  <-設定バッファのページ内のオフセット
              buf->len = spd->partial[page_nr].len;        <-設定バッファの長さ
              buf->private = spd->partial[page_nr].private;
              buf->ops = spd->ops;
              if (spd->flags & SPLICE_F_GIFT)
                  buf->flags |= PIPE_BUF_FLAG_GIFT;

              pipe->nrbufs++;       <-設定バッファがページサイズを超えなくても、次pipe書込みは新規pipeバッファ
              page_nr++;
              ret += buf->len;

              if (pipe->inode)
                   do_wakeup = 1;

              if (!--spd->nr_pages)
                   break;
              if (pipe->nrbufs < pipe->buffers)
                   continue;

              break;
          }

        if (spd->flags & SPLICE_F_NONBLOCK) {
            if (!ret)
                ret = -EAGAIN;
                break;
               }

            if (signal_pending(current)) {
                if (!ret)
                     ret = -ERESTARTSYS;
                break;
               }

            if (do_wakeup) {
                smp_mb();
                if (waitqueue_active(&pipe->wait))
                    wake_up_interruptible_sync(&pipe->wait);
                kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
                do_wakeup = 0;
          }

        pipe->waiting_writers++;
        pipe_wait(pipe);
        pipe->waiting_writers--;
     }

    pipe_unlock(pipe);

    if (do_wakeup)
        wakeup_pipe_readers(pipe);

    while (page_nr < spd_pages)
        spd->spd_release(spd, page_nr++);

    return ret;
}


最終更新 2018/02/24 12:06:22 - north
(2014/04/11 17:38:22 作成)


検索

アクセス数
3689330
最近のコメント
コアダンプファイル - sakaia
list_head構造体 - yocto_no_yomikata
勧告ロックと強制ロック - wataash
LKMからのファイル出力 - 重松 宏昌
kprobe - ななし
ksetの実装 - スーパーコピー
カーネルスレッドとは - ノース
カーネルスレッドとは - nbyst
asmlinkageってなに? - ノース
asmlinkageってなに? - よろしく
Adsense
広告情報が設定されていません。