0%

io_uring

Io uring 简单使用

原理可参考:https://arthurchiao.art/blog/intro-to-io-uring-zh/

io_uring是Linux内核在v5.1引入的一套异步IO接口,和aio不同的是,它可以提供更高的性能

io_uring 具体有三个系统调用

分别是 io_uring_setup, io_uring_enter, io_uring_register,我们可以通过这三个系统调用来完成异步事件提交,收割,自己处理的流程

异步io的优点在于,不用我们自己去等待 io操作的完成,我们只需要告诉内核,我们的任务,内核来帮我们完成。这样就可以让我们的进程去干其他的事情,实现更高的吞吐量。

io_uring 利用 mmap 开辟出一块空间,让用户态和内核态的程序都可以共享的一块区域

io_uring 分为 提交队列 和 完成队列

用户提交的任务放在提交队列中,由内核去处理,处理好的东西会放在完成队列中。

内核如何处理,不是用户关系的问题,内核可以回调、轮询的方式都可以进行处理。

用户只需要设置好就可以使用

由于这三个系统调用要用好并不容易

开发作者也提供了一个liburing来给我们使用

注: 内核版本最好高一点,比如 Linux 5.4 不支持 read,但支持 readv (亲身教训)

除了 io_uring的结构要了解外,我们还需了解两个用到的东西

这两个分别代表了 完成队列和提交队列的一项元素

其中的user_data可以是一个可以由我们进行diy的指针

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
struct io_uring_cqe {
__u64 user_data; /* sqe->data submission passed back */
__s32 res; /* result code for this event */
__u32 flags;

/*
* If the ring is initialized with IORING_SETUP_CQE32, then this field
* contains 16-bytes of padding, doubling the size of the CQE.
*/
__u64 big_cqe[];
};
复制代码
struct io_uring_sqe {
__u8 opcode; /* type of operation for this sqe */
__u8 flags; /* IOSQE_ flags */
__u16 ioprio; /* ioprio for the request */
__s32 fd; /* file descriptor to do IO on */
union {
__u64 off; /* offset into file */
__u64 addr2;
struct {
__u32 cmd_op;
__u32 __pad1;
};
};
union {
__u64 addr; /* pointer to buffer or iovecs */
__u64 splice_off_in;
};
__u32 len; /* buffer size or number of iovecs */
union {
__kernel_rwf_t rw_flags;
__u32 fsync_flags;
__u16 poll_events; /* compatibility */
__u32 poll32_events; /* word-reversed for BE */
__u32 sync_range_flags;
__u32 msg_flags;
__u32 timeout_flags;
__u32 accept_flags;
__u32 cancel_flags;
__u32 open_flags;
__u32 statx_flags;
__u32 fadvise_advice;
__u32 splice_flags;
__u32 rename_flags;
__u32 unlink_flags;
__u32 hardlink_flags;
__u32 xattr_flags;
__u32 msg_ring_flags;
};
__u64 user_data; /* data to be passed back at completion time */
/* pack this to avoid bogus arm OABI complaints */
union {
/* index into fixed buffers, if used */
__u16 buf_index;
/* for grouped buffer selection */
__u16 buf_group;
} __attribute__((packed));
/* personality to use, if used */
__u16 personality;
union {
__s32 splice_fd_in;
__u32 file_index;
struct {
__u16 addr_len;
__u16 __pad3[1];
};
};
union {
struct {
__u64 addr3;
__u64 __pad2[1];
};
/*
* If the ring is initialized with IORING_SETUP_SQE128, then
* this field is used for 80 bytes of arbitrary command data
*/
__u8 cmd[0];
};
};
复制代码

我们来看两个使用 liburing的简单例子

例子 1

第一个例子诠释了 io_uring 最直接的一个流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* 读取文件
**/
#include <bits/stdc++.h>
#include <liburing.h>
#include <unistd.h>

char buf[1024] = {0};

int main() {
int fd = open("1.txt", O_RDONLY, 0);

io_uring ring;
io_uring_queue_init(32, &ring, 0); // 初始化
auto sqe = io_uring_get_sqe(&ring); // 从环中得到一块空位
io_uring_prep_read(sqe, fd, buf, sizeof(buf), 0); // 为这块空位准备好操作
io_uring_submit(&ring); // 提交任务
io_uring_cqe* res; // 完成队列指针
io_uring_wait_cqe(&ring, &res); // 阻塞等待一项完成的任务
assert(res);
std::cout << "read bytes: " << res->res << " \n";
std::cout << buf << std::endl;
io_uring_cqe_seen(&ring, res); // 将任务移出完成队列
io_uring_queue_exit(&ring); // 退出
return 0;
}

复制代码

io_uring 有三个东西

提交队列

完成队列

任务实体

提交队列和完成队列都可以看成持有一项指针

我们得到一个 任务实体,通过 io_uring_prep_read 准备任务 和 io_uring_submit 提交任务

提交任务之后就到了提交队列中去

在提交队列里面,内核操作完以后。

任务就到了完成队列中去。

然后我们可以阻塞等待 io_uring_wait_cqe 一项任务

当然,我们也可以使用非阻塞的方式,去干其他事情

在拿到这一项任务之后,我们就可以对其进行处理,处理完成记得 从完成队列中清除

(至于 完成队列和提交队列是如何高效的且不出错的并发执行 暂且不谈)

例子 2

echo_server

上述 的 io_uring写着还是比较长,我们可以把它封装一下。

比如要用 read的操作,要用accpet的操作,都给他封装一下

同时,我们在写echo_server时,我们是 几个不同的操作

可能是 ACCEPT 操作,可能是 READ 操作, 可能是 WRITE操作

并且READ和WRITE操作都要有自己的缓冲区

所以,我们定义一下我们在任务之间传递的结构体

然后把它放到 user_data 中。

1
2
__u64	user_data;	/* data to be passed back at completion time */
复制代码

结构体如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
struct request {
enum STATE { ACCEPT, READ, WRITE };
int fd;
STATE state;
union {
struct {
sockaddr_in ipv4_addr;
socklen_t lens;
} addr;
char buf[BUFSIZE];
};
};
复制代码

我们对可能用到的操作进行一下封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class IOuring {
io_uring ring;

public:
IOuring(int queue_size) { io_uring_queue_init(queue_size, &ring, 0); }

~IOuring() { io_uring_queue_exit(&ring); }

void seen(io_uring_cqe* cqe) { io_uring_cqe_seen(&ring, cqe); }

int wait(io_uring_cqe** cqe) { return io_uring_wait_cqe(&ring, cqe); }

int submit() { return io_uring_submit(&ring); }

void accpet_asyn(int sock_fd, request* body) {
auto sqe = io_uring_get_sqe(&ring);
body->state = request::ACCEPT;
body->fd = sock_fd;
body->addr.lens = sizeof(sockaddr_in);
io_uring_prep_accept(sqe, sock_fd, (sockaddr*)&(body->addr.ipv4_addr),
&(body->addr.lens), 0);
io_uring_sqe_set_data(sqe, body);
}

void read_asyn(int client_fd, request* body) {
auto sqe = io_uring_get_sqe(&ring);
body->state = request::READ;
body->fd = client_fd;
io_uring_prep_read(sqe, client_fd, body->buf, sizeof(body->buf), -1);
io_uring_sqe_set_data(sqe, body);
}

void write_asyn(int client_fd, request* body) {
auto sqe = io_uring_get_sqe(&ring);
body->state = request::WRITE;
body->fd = client_fd;
io_uring_prep_write(sqe, client_fd, body->buf, sizeof(body->buf), -1);
io_uring_sqe_set_data(sqe, body);
}
};

复制代码

整体代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
#include <arpa/inet.h>
#include <bits/stdc++.h>
#include <liburing.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>

const int BUFSIZE = 1024;
struct request {
enum STATE { ACCEPT, READ, WRITE };
int fd;
STATE state;
union {
struct {
sockaddr_in ipv4_addr;
socklen_t lens;
} addr;
char buf[BUFSIZE];
};
};
class IOuring {
io_uring ring;

public:
IOuring(int queue_size) { io_uring_queue_init(queue_size, &ring, 0); }

~IOuring() { io_uring_queue_exit(&ring); }

void seen(io_uring_cqe* cqe) { io_uring_cqe_seen(&ring, cqe); }

int wait(io_uring_cqe** cqe) { return io_uring_wait_cqe(&ring, cqe); }

int submit() { return io_uring_submit(&ring); }

void accpet_asyn(int sock_fd, request* body) {
auto sqe = io_uring_get_sqe(&ring);
body->state = request::ACCEPT;
body->fd = sock_fd;
body->addr.lens = sizeof(sockaddr_in);
io_uring_prep_accept(sqe, sock_fd, (sockaddr*)&(body->addr.ipv4_addr),
&(body->addr.lens), 0);
io_uring_sqe_set_data(sqe, body);
}

void read_asyn(int client_fd, request* body) {
auto sqe = io_uring_get_sqe(&ring);
body->state = request::READ;
body->fd = client_fd;
io_uring_prep_read(sqe, client_fd, body->buf, sizeof(body->buf), -1);
io_uring_sqe_set_data(sqe, body);
}

void write_asyn(int client_fd, request* body) {
auto sqe = io_uring_get_sqe(&ring);
body->state = request::WRITE;
body->fd = client_fd;
io_uring_prep_write(sqe, client_fd, body->buf, sizeof(body->buf), -1);
io_uring_sqe_set_data(sqe, body);
}
};

int main() {
/*init socket*/
int sock_fd = socket(AF_INET, SOCK_STREAM, 0);
sockaddr_in sock_addr;
sock_addr.sin_port = htons(8000);
sock_addr.sin_family = AF_INET;
sock_addr.sin_addr.s_addr = INADDR_ANY;
int ret = bind(sock_fd, (sockaddr*)&sock_addr, sizeof(sock_addr));
perror("");
listen(sock_fd, 10);

std::cout << "listen begin ..." << std::endl;

/*io_uring*/
IOuring ring(1024);

ring.accpet_asyn(sock_fd, new request);
ring.submit();

while (true) {
io_uring_cqe* cqe;
ring.wait(&cqe);
request* res = (request*)cqe->user_data;
switch (res->state) {
case request::ACCEPT:
if (cqe->res > 0) {
int client_fd = cqe->res;
ring.accpet_asyn(sock_fd, res);
ring.read_asyn(client_fd, new request);
ring.submit();
}
std::cout << cqe->res << std::endl;
break;
case request::READ:
if (cqe->res > 0) std::cout << res->buf << std::endl;
ring.write_asyn(res->fd, res);
ring.submit();
break;
case request::WRITE:
if (cqe->res > 0) {
close(res->fd);
delete res;
}
break;
default:
std::cout << "error " << std::endl;
break;
}
ring.seen(cqe);
}

return 0;
}