Io uring 简单使用
原理可参考:https://arthurchiao.art/blog/intro-to-io-uring-zh/
io_uring是Linux内核在v5.1引入的一套异步IO接口,和aio不同的是,它可以提供更高的性能
io_uring 具体有三个系统调用
分别是 io_uring_setup
, io_uring_enter
, io_uring_register
,我们可以通过这三个系统调用来完成异步事件提交,收割,自己处理的流程
异步io的优点在于,不用我们自己去等待 io操作的完成,我们只需要告诉内核,我们的任务,内核来帮我们完成。这样就可以让我们的进程去干其他的事情,实现更高的吞吐量。
io_uring 利用 mmap 开辟出一块空间,让用户态和内核态的程序都可以共享的一块区域
io_uring 分为 提交队列 和 完成队列
用户提交的任务放在提交队列中,由内核去处理,处理好的东西会放在完成队列中。
内核如何处理,不是用户关系的问题,内核可以回调、轮询的方式都可以进行处理。
用户只需要设置好就可以使用
由于这三个系统调用要用好并不容易
开发作者也提供了一个liburing来给我们使用
注: 内核版本最好高一点,比如 Linux 5.4 不支持 read,但支持 readv (亲身教训)
除了 io_uring的结构要了解外,我们还需了解两个用到的东西
这两个分别代表了 完成队列和提交队列的一项元素
其中的user_data可以是一个可以由我们进行diy的指针
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 struct io_uring_cqe { __u64 user_data; __s32 res; __u32 flags; __u64 big_cqe[]; }; 复制代码 struct io_uring_sqe { __u8 opcode; __u8 flags; __u16 ioprio; __s32 fd; union { __u64 off; __u64 addr2; struct { __u32 cmd_op; __u32 __pad1; }; }; union { __u64 addr; __u64 splice_off_in; }; __u32 len; union { __kernel_rwf_t rw_flags; __u32 fsync_flags; __u16 poll_events; __u32 poll32_events; __u32 sync_range_flags; __u32 msg_flags; __u32 timeout_flags; __u32 accept_flags; __u32 cancel_flags; __u32 open_flags; __u32 statx_flags; __u32 fadvise_advice; __u32 splice_flags; __u32 rename_flags; __u32 unlink_flags; __u32 hardlink_flags; __u32 xattr_flags; __u32 msg_ring_flags; }; __u64 user_data; union { __u16 buf_index; __u16 buf_group; } __attribute__((packed)); __u16 personality; union { __s32 splice_fd_in; __u32 file_index; struct { __u16 addr_len; __u16 __pad3[1 ]; }; }; union { struct { __u64 addr3; __u64 __pad2[1 ]; }; __u8 cmd[0 ]; }; }; 复制代码
我们来看两个使用 liburing的简单例子
例子 1
第一个例子诠释了 io_uring 最直接的一个流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 #include <bits/stdc++.h> #include <liburing.h> #include <unistd.h> char buf[1024 ] = {0 };int main () { int fd = open ("1.txt" , O_RDONLY, 0 ); io_uring ring; io_uring_queue_init (32 , &ring, 0 ); auto sqe = io_uring_get_sqe (&ring); io_uring_prep_read (sqe, fd, buf, sizeof (buf), 0 ); io_uring_submit (&ring); io_uring_cqe* res; io_uring_wait_cqe (&ring, &res); assert (res); std::cout << "read bytes: " << res->res << " \n" ; std::cout << buf << std::endl; io_uring_cqe_seen (&ring, res); io_uring_queue_exit (&ring); return 0 ; } 复制代码
io_uring 有三个东西
提交队列
完成队列
任务实体
提交队列和完成队列都可以看成持有一项指针
我们得到一个 任务实体,通过 io_uring_prep_read
准备任务 和 io_uring_submit
提交任务
提交任务之后就到了提交队列中去
在提交队列里面,内核操作完以后。
任务就到了完成队列中去。
然后我们可以阻塞等待 io_uring_wait_cqe
一项任务
当然,我们也可以使用非阻塞的方式,去干其他事情
在拿到这一项任务之后,我们就可以对其进行处理,处理完成记得 从完成队列中清除
(至于 完成队列和提交队列是如何高效的且不出错的并发执行 暂且不谈)
例子 2 echo_server
上述 的 io_uring写着还是比较长,我们可以把它封装一下。
比如要用 read的操作,要用accpet的操作,都给他封装一下
同时,我们在写echo_server时,我们是 几个不同的操作
可能是 ACCEPT 操作,可能是 READ 操作, 可能是 WRITE操作
并且READ和WRITE操作都要有自己的缓冲区
所以,我们定义一下我们在任务之间传递的结构体
然后把它放到 user_data 中。
结构体如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 struct request { enum STATE { ACCEPT, READ, WRITE }; int fd; STATE state; union { struct { sockaddr_in ipv4_addr; socklen_t lens; } addr; char buf[BUFSIZE]; }; }; 复制代码
我们对可能用到的操作进行一下封装
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 class IOuring { io_uring ring; public : IOuring (int queue_size) { io_uring_queue_init (queue_size, &ring, 0 ); } ~IOuring () { io_uring_queue_exit (&ring); } void seen (io_uring_cqe* cqe) { io_uring_cqe_seen (&ring, cqe); } int wait (io_uring_cqe** cqe) { return io_uring_wait_cqe (&ring, cqe); } int submit () { return io_uring_submit (&ring); } void accpet_asyn (int sock_fd, request* body) { auto sqe = io_uring_get_sqe (&ring); body->state = request::ACCEPT; body->fd = sock_fd; body->addr.lens = sizeof (sockaddr_in); io_uring_prep_accept (sqe, sock_fd, (sockaddr*)&(body->addr.ipv4_addr), &(body->addr.lens), 0 ); io_uring_sqe_set_data (sqe, body); } void read_asyn (int client_fd, request* body) { auto sqe = io_uring_get_sqe (&ring); body->state = request::READ; body->fd = client_fd; io_uring_prep_read (sqe, client_fd, body->buf, sizeof (body->buf), -1 ); io_uring_sqe_set_data (sqe, body); } void write_asyn (int client_fd, request* body) { auto sqe = io_uring_get_sqe (&ring); body->state = request::WRITE; body->fd = client_fd; io_uring_prep_write (sqe, client_fd, body->buf, sizeof (body->buf), -1 ); io_uring_sqe_set_data (sqe, body); } }; 复制代码
整体代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 #include <arpa/inet.h> #include <bits/stdc++.h> #include <liburing.h> #include <sys/socket.h> #include <sys/types.h> #include <unistd.h> const int BUFSIZE = 1024 ;struct request { enum STATE { ACCEPT, READ, WRITE }; int fd; STATE state; union { struct { sockaddr_in ipv4_addr; socklen_t lens; } addr; char buf[BUFSIZE]; }; }; class IOuring { io_uring ring; public : IOuring (int queue_size) { io_uring_queue_init (queue_size, &ring, 0 ); } ~IOuring () { io_uring_queue_exit (&ring); } void seen (io_uring_cqe* cqe) { io_uring_cqe_seen (&ring, cqe); } int wait (io_uring_cqe** cqe) { return io_uring_wait_cqe (&ring, cqe); } int submit () { return io_uring_submit (&ring); } void accpet_asyn (int sock_fd, request* body) { auto sqe = io_uring_get_sqe (&ring); body->state = request::ACCEPT; body->fd = sock_fd; body->addr.lens = sizeof (sockaddr_in); io_uring_prep_accept (sqe, sock_fd, (sockaddr*)&(body->addr.ipv4_addr), &(body->addr.lens), 0 ); io_uring_sqe_set_data (sqe, body); } void read_asyn (int client_fd, request* body) { auto sqe = io_uring_get_sqe (&ring); body->state = request::READ; body->fd = client_fd; io_uring_prep_read (sqe, client_fd, body->buf, sizeof (body->buf), -1 ); io_uring_sqe_set_data (sqe, body); } void write_asyn (int client_fd, request* body) { auto sqe = io_uring_get_sqe (&ring); body->state = request::WRITE; body->fd = client_fd; io_uring_prep_write (sqe, client_fd, body->buf, sizeof (body->buf), -1 ); io_uring_sqe_set_data (sqe, body); } }; int main () { int sock_fd = socket (AF_INET, SOCK_STREAM, 0 ); sockaddr_in sock_addr; sock_addr.sin_port = htons (8000 ); sock_addr.sin_family = AF_INET; sock_addr.sin_addr.s_addr = INADDR_ANY; int ret = bind (sock_fd, (sockaddr*)&sock_addr, sizeof (sock_addr)); perror ("" ); listen (sock_fd, 10 ); std::cout << "listen begin ..." << std::endl; IOuring ring (1024 ) ; ring.accpet_asyn (sock_fd, new request); ring.submit (); while (true ) { io_uring_cqe* cqe; ring.wait (&cqe); request* res = (request*)cqe->user_data; switch (res->state) { case request::ACCEPT: if (cqe->res > 0 ) { int client_fd = cqe->res; ring.accpet_asyn (sock_fd, res); ring.read_asyn (client_fd, new request); ring.submit (); } std::cout << cqe->res << std::endl; break ; case request::READ: if (cqe->res > 0 ) std::cout << res->buf << std::endl; ring.write_asyn (res->fd, res); ring.submit (); break ; case request::WRITE: if (cqe->res > 0 ) { close (res->fd); delete res; } break ; default : std::cout << "error " << std::endl; break ; } ring.seen (cqe); } return 0 ; }