Welcome! Log In Create A New Profile

Advanced

[PATCH 4 of 8] Added io_uring event module

SoYun Seong
November 24, 2020 03:46AM
# HG changeset patch
# User SoYun Seong <thdbsdox12@gmail.com>
# Date 1606128722 0
# Mon Nov 23 10:52:02 2020 +0000
# Node ID d7f0f0b78a24724c362cd0306dc6bbed82c66e62
# Parent ce34097321cf3cb7f9de55075bd1788577bb1ad8
Added io_uring event module.

Implemented nginx io_uring module that processes event loop with io_uring API. It requires Linux kernel 5.7 or higher with IORING_FEAT_FAST_POLL. And it supports only tcp protocol yet.
To move data between a file and a socket without copying between kernel address space and user address space, I implemented asynchronous sendfile of io_uring module using two IORING_OP_SPLICE requests and pipes.
However, performance of the splice requests is getting poor after accepting a client. I'm finding the reason of this problem.
Because of this problem, I implemented sendfile of ngx_uring_module with a read request and a send request. This makes copy between kernel space and user space.
This sendfile implementation has a high overhead when sending a big size of file.
To alleviate this, when sending a file over page size(4KB), ngx_uring_module uses a Linux sendfile() system call instead.

diff -r ce34097321cf -r d7f0f0b78a24 src/event/modules/ngx_uring_module.c
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/event/modules/ngx_uring_module.c Mon Nov 23 10:52:02 2020 +0000
@@ -0,0 +1,1191 @@
+
+#include <ngx_config.h>
+#include <ngx_core.h>
+#include <ngx_event.h>
+
+#include "liburing.h"
+
+
+typedef struct {
+ ngx_uint_t entries;
+ size_t sendfile_bound;
+} ngx_uring_conf_t;
+
+
+typedef struct {
+ ngx_connection_t *conn;
+ ngx_uint_t ev;
+ void *buf;
+} ngx_uring_info_t;
+
+
+static ngx_int_t ngx_uring_init(ngx_cycle_t *cycle, ngx_msec_t timer);
+static void ngx_uring_done(ngx_cycle_t *cycle);
+static ngx_int_t ngx_uring_add_event(ngx_event_t *ev, ngx_int_t event,
+ ngx_uint_t flags);
+static ngx_int_t ngx_uring_add_connection(ngx_connection_t *c);
+static ngx_int_t ngx_uring_del_connection(ngx_connection_t *c,
+ ngx_uint_t flags);
+static ngx_int_t ngx_uring_process_events(ngx_cycle_t *cycle, ngx_msec_t timer,
+ ngx_uint_t flags);
+
+static void *ngx_uring_create_conf(ngx_cycle_t *cycle);
+static char *ngx_uring_init_conf(ngx_cycle_t *cycle, void *conf);
+
+
+ngx_int_t ngx_uring_accept(ngx_connection_t *c);
+ssize_t ngx_uring_recv(ngx_connection_t *c, u_char *buf, size_t size);
+ssize_t ngx_uring_readv_chain(ngx_connection_t *c, ngx_chain_t *chain, off_t limit);
+ssize_t ngx_uring_send(ngx_connection_t *c, u_char *buf, size_t size);
+ngx_chain_t * ngx_uring_writev_chain(ngx_connection_t *c, ngx_chain_t *in, off_t limit);
+ngx_chain_t *ngx_uring_sendfile_chain(ngx_connection_t *c, ngx_chain_t *in, off_t limit);
+static ssize_t ngx_uring_writev(ngx_connection_t *c, int nelts, int start_el);
+#if (NGX_USE_URING_SPLICE)
+static ssize_t ngx_uring_splice_sendfile(ngx_connection_t *c, ngx_buf_t *file, size_t size);
+#else
+static ssize_t ngx_uring_read_sendfile(ngx_connection_t *c, ngx_buf_t *file, size_t size);
+#endif
+
+
+
+static struct io_uring ring;
+static size_t linux_sendfile_bound;
+
+static ngx_str_t uring_name = ngx_string("io_uring");
+
+static ngx_command_t ngx_uring_commands[] = {
+
+ { ngx_string("uring_entries"),
+ NGX_EVENT_CONF|NGX_CONF_TAKE1,
+ ngx_conf_set_num_slot,
+ 0,
+ offsetof(ngx_uring_conf_t, entries),
+ NULL },
+
+ { ngx_string("linux_sendfile_bound"),
+ NGX_EVENT_CONF|NGX_CONF_TAKE1,
+ ngx_conf_set_num_slot,
+ 0,
+ offsetof(ngx_uring_conf_t, sendfile_bound),
+ NULL },
+
+ ngx_null_command
+};
+
+
+static ngx_event_module_t ngx_uring_module_ctx = {
+ &uring_name,
+ ngx_uring_create_conf, /* create configuration */
+ ngx_uring_init_conf, /* init configuration */
+
+ {
+ ngx_uring_add_event, /* add an event */
+ NULL, /* delete an event */
+ ngx_uring_add_event, /* enable an event */
+ NULL, /* disable an event */
+ ngx_uring_add_connection, /* add an connection */
+ ngx_uring_del_connection, /* delete an connection */
+ NULL, /* trigger a notify */
+ ngx_uring_process_events, /* process the events */
+ ngx_uring_init, /* init the events */
+ ngx_uring_done, /* done the events */
+ }
+};
+
+ngx_module_t ngx_uring_module = {
+ NGX_MODULE_V1,
+ &ngx_uring_module_ctx, /* module context */
+ ngx_uring_commands, /* module directives */
+ NGX_EVENT_MODULE, /* module type */
+ NULL, /* init master */
+ NULL, /* init module */
+ NULL, /* init process */
+ NULL, /* init thread */
+ NULL, /* exit thread */
+ NULL, /* exit process */
+ NULL, /* exit master */
+ NGX_MODULE_V1_PADDING
+};
+
+ngx_os_io_t ngx_uring_io = {
+ ngx_uring_recv,
+ ngx_uring_readv_chain,
+ NULL, /* udp recv */
+ ngx_uring_send,
+ NULL, /* udp send */
+ NULL, /* udp sendmsg chain */
+#if (1)
+ ngx_uring_sendfile_chain,
+ NGX_IO_SENDFILE
+#else
+ ngx_uring_writev_chain,
+ 0
+#endif
+};
+
+
+static ngx_int_t
+ngx_uring_init(ngx_cycle_t *cycle, ngx_msec_t timer)
+{
+ ngx_uring_conf_t *urcf;
+ struct io_uring_params params;
+
+ urcf = ngx_event_get_conf(cycle->conf_ctx, ngx_uring_module);
+
+ if(ring.ring_fd == 0){
+ ngx_memzero(&params, sizeof(params));
+ if (io_uring_queue_init_params(urcf->entries, &ring, &params) < 0) {
+ ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
+ "io_uring_queue_init_params() failed");
+ return NGX_ERROR;
+ }
+
+ if(!(params.features & IORING_FEAT_FAST_POLL)){
+ ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
+ "IORING_FEAT_FAST_POLL is not available");
+ return NGX_ERROR;
+ }
+ }
+
+ linux_sendfile_bound = urcf->sendfile_bound;
+
+ ngx_io = ngx_uring_io;
+
+ ngx_event_actions = ngx_uring_module_ctx.actions;
+
+#if (NGX_HAVE_CLEAR_EVENT)
+ ngx_event_flags = NGX_USE_CLEAR_EVENT
+#else
+ ngx_event_flags = NGX_USE_LEVEL_EVENT
+#endif
+ |NGX_USE_GREEDY_EVENT
+ |NGX_USE_URING_EVENT;
+
+ return NGX_OK;
+}
+
+
+
+static void
+ngx_uring_done(ngx_cycle_t *cycle)
+{
+ io_uring_queue_exit(&ring);
+ ngx_memset(&ring, 0, sizeof(ring));
+}
+
+
+static ngx_int_t
+ngx_uring_add_event(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags)
+{
+ ngx_connection_t *c;
+
+ c = ev->data;
+
+ if(event == NGX_READ_EVENT && c->read->accept){
+ if(ngx_uring_accept(c) == NGX_ERROR)
+ return NGX_ERROR;
+ }
+
+ ev->active = 1;
+ ev->ready = 1;
+
+ return NGX_OK;
+}
+
+
+static ngx_int_t
+ngx_uring_add_connection(ngx_connection_t *c)
+{
+ if(c->read->accept){
+ if(ngx_uring_accept(c) == NGX_ERROR)
+ return NGX_ERROR;
+ }
+
+ c->read->active = 1;
+ c->write->active = 1;
+ c->read->ready = 1;
+ c->write->ready = 1;
+
+ return NGX_OK;
+}
+
+
+static ngx_int_t
+ngx_uring_del_connection(ngx_connection_t *c, ngx_uint_t flags)
+{
+ c->read->active = 0;
+ c->write->active = 0;
+
+ return NGX_OK;
+}
+
+
+static ngx_int_t
+ngx_uring_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags)
+{
+ unsigned head, count;
+ ngx_event_t *rev, *wev;
+ ngx_connection_t *c;
+ ngx_uring_info_t *ui;
+ struct io_uring_cqe *cqe;
+ struct io_uring_sqe *sqe;
+ struct __kernel_timespec ts;
+
+
+ /* NGX_TIMER_INFINITE == INFTIM */
+
+ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
+ "io_uring timer: %M", timer);
+
+ if(timer != NGX_TIMER_INFINITE) {
+ if(timer >= 1000){
+ ts.tv_sec = timer / 1000;
+ ts.tv_nsec = 0;
+ } else{
+ ts.tv_sec = 0;
+ ts.tv_nsec = timer * 1000000;
+ }
+
+ sqe = io_uring_get_sqe(&ring);
+ if(sqe == NULL) {
+ ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
+ "io_uring_get_sqe() failed");
+ return NGX_ERROR;
+ }
+
+ io_uring_prep_timeout(sqe, &ts, 1, 0);
+ io_uring_sqe_set_data(sqe, (void*)NGX_URING_TIMEOUT);
+ }
+
+ io_uring_submit_and_wait(&ring, 1);
+
+ if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) {
+ ngx_time_update();
+ }
+
+ count = 0;
+
+ io_uring_for_each_cqe(&ring, head, cqe) {
+ ++count;
+
+ if(cqe->user_data == NGX_URING_TIMEOUT) {
+ if(count > 1) continue;
+
+ io_uring_cq_advance(&ring, count);
+
+ if(timer != NGX_TIMER_INFINITE){
+ return NGX_OK;
+ }
+
+ ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
+ "invalid timeout event");
+ return NGX_ERROR;
+ }
+
+ ui = (ngx_uring_info_t*)cqe->user_data;
+
+ c = ui->conn;
+
+ if (c->fd == -1) {
+ /*
+ * the stale event from a file descriptor
+ * that was just closed in this iteration
+ */
+ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
+ "io_uring: stale event %p", c);
+ continue;
+ }
+
+ ngx_log_debug3(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
+ "io_uring: fd:%d rq:%d d:%p",
+ c->fd, ui->ev, cqe->user_data);
+
+ rev = c->read;
+ wev = c->write;
+
+ switch (ui->ev)
+ {
+ case NGX_URING_ACCEPT:{
+ ngx_pfree(c->pool, ui);
+ if(cqe->res != -11) {
+ rev->uring_res = cqe->res;
+
+ rev->ready = 1;
+ rev->complete = 1;
+ rev->available = -1;
+
+ if (flags & NGX_POST_EVENTS) {
+ ngx_post_event(rev, &ngx_posted_accept_events);
+
+ } else {
+ rev->handler(rev);
+ }
+ }
+
+ rev->ready = 0;
+ rev->complete = 0;
+ rev->available = 1;
+
+ if(ngx_uring_accept(c) == NGX_ERROR)
+ return NGX_ERROR;
+ break;
+ }
+ case NGX_URING_READ:
+ case NGX_URING_READV:{
+ ngx_pfree(c->pool, ui);
+ rev->uring_pending -= 1;
+ rev->uring_res += cqe->res;
+
+ if(rev->uring_pending == 0){
+ rev->complete = 1;
+ rev->ready = 1;
+ rev->available = -1;
+
+ if (flags & NGX_POST_EVENTS) {
+ ngx_post_event(rev, &ngx_posted_events);
+
+ } else {
+ rev->handler(rev);
+ }
+ }
+ break;
+ }
+ case NGX_URING_SEND:
+ case NGX_URING_WRITEV:
+ case NGX_URING_SPLICE_FROM_PIPE:{
+ if(ui->buf) ngx_pfree(c->pool, ui->buf);
+ ngx_pfree(c->pool, ui);
+ wev->uring_pending -= 1;
+ wev->uring_res += cqe->res;
+
+ if(wev->uring_pending == 0) {
+ wev->complete = 1;
+ wev->ready = 1;
+ wev->available = -1;
+
+ if (flags & NGX_POST_EVENTS) {
+ ngx_post_event(wev, &ngx_posted_events);
+
+ } else {
+ wev->handler(wev);
+ }
+ }
+ break;
+ }
+ case NGX_URING_READFILE:
+ case NGX_URING_SPLICE_TO_PIPE:{
+ ngx_pfree(c->pool, ui);
+ break;
+ }
+ default:
+ ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
+ "io_uring invalid event type");
+ return NGX_ERROR;
+ }
+
+ }
+
+ io_uring_cq_advance(&ring, count);
+
+ if (count == 0) {
+ ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
+ "io_uring_submit_and_wait() returned no events without timeout");
+ return NGX_ERROR;
+ }
+
+ return NGX_OK;
+}
+
+
+static void *
+ngx_uring_create_conf(ngx_cycle_t *cycle)
+{
+ ngx_uring_conf_t *urcf;
+
+ urcf = ngx_palloc(cycle->pool, sizeof(ngx_uring_conf_t));
+ if (urcf == NULL) {
+ return NULL;
+ }
+
+ urcf->entries = NGX_CONF_UNSET;
+ urcf->sendfile_bound = NGX_CONF_UNSET;
+
+ return urcf;
+}
+
+
+static char *
+ngx_uring_init_conf(ngx_cycle_t *cycle, void *conf)
+{
+ ngx_uring_conf_t *urcf = conf;
+
+ ngx_conf_init_uint_value(urcf->entries, 32768);
+ ngx_conf_init_uint_value(urcf->sendfile_bound, 41984);
+
+ return NGX_CONF_OK;
+}
+
+
+ngx_int_t
+ngx_uring_accept(ngx_connection_t *c)
+{
+ struct io_uring_sqe *sqe;
+ ngx_uring_info_t *ui;
+
+ if(c->pool == NULL){
+ c->pool = ngx_create_pool(c->listening->pool_size, c->log);
+ if (c->pool == NULL) {
+ return NGX_ERROR;
+ }
+ }
+
+ if(c->sockaddr == NULL){
+ c->sockaddr = ngx_palloc(c->pool, sizeof(ngx_sockaddr_t));
+ if (c->sockaddr == NULL) {
+ return NGX_ERROR;
+ }
+ c->socklen = sizeof(ngx_sockaddr_t);
+ }
+
+ ui = ngx_palloc(c->pool, sizeof(ngx_uring_info_t));
+ if(ui == NULL){
+ return NGX_ERROR;
+ }
+ ui->conn = c;
+ ui->ev = NGX_URING_ACCEPT;
+ ui->buf = NULL;
+
+ ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0,
+ "io_uring prep event: fd:%d op:%d ",
+ c->fd, ui->ev);
+
+ sqe = io_uring_get_sqe(&ring);
+ if(sqe == NULL){
+ ngx_log_error(NGX_LOG_ALERT, c->log, 0,
+ "io_uring_get_sqe() failed");
+ return NGX_ERROR;
+ }
+
+ io_uring_prep_accept(sqe, c->fd, c->sockaddr, &c->socklen, 0);
+ io_uring_sqe_set_data(sqe, ui);
+
+ return NGX_OK;
+}
+
+
+ssize_t
+ngx_uring_recv(ngx_connection_t *c, u_char *buf, size_t size)
+{
+ ssize_t n;
+ ngx_event_t *rev;
+ ngx_uring_info_t *ui;
+ struct io_uring_sqe *sqe;
+
+ rev = c->read;
+
+ if(!rev->complete && rev->uring_pending) {
+ ngx_log_error(NGX_LOG_ALERT, c->log, 0, "second uring_recv post");
+ return NGX_AGAIN;
+ }
+
+ if(rev->complete) {
+ n = rev->uring_res;
+ rev->uring_res = 0;
+ rev->available = 0;
+ rev->uring_pending = 0;
+ rev->complete = 0;
+
+ if(n == 0){
+ rev->ready = 0;
+ rev->eof = 1;
+ return 0;
+ }
+ if(n < 0){
+ ngx_connection_error(c, 0, "uring_recv() failed");
+ rev->error = 1;
+ return NGX_ERROR;
+ }
+
+ ngx_log_debug3(NGX_LOG_DEBUG_EVENT, c->log, 0,
+ "uring_recv: fd:%d %ul of %z",
+ c->fd, n, size);
+
+ return n;
+ }
+
+ ui = ngx_palloc(c->pool, sizeof(ngx_uring_info_t));
+ if(ui == NULL){
+ return NGX_ERROR;
+ }
+ ui->conn = c;
+ ui->ev = NGX_URING_READ;
+ ui->buf = NULL;
+
+ ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0,
+ "io_uring prep event: fd:%d op:%d ",
+ c->fd, ui->ev);
+
+ sqe = io_uring_get_sqe(&ring);
+ if(sqe == NULL){
+ ngx_log_error(NGX_LOG_ALERT, c->log, 0,
+ "io_uring_get_sqe() failed");
+ return NGX_ERROR;
+ }
+
+ io_uring_prep_recv(sqe, c->fd, buf, size, 0);
+ io_uring_sqe_set_data(sqe, ui);
+
+ rev->complete = 0;
+ rev->ready = 0;
+ rev->uring_pending = 1;
+
+ return NGX_AGAIN;
+}
+
+ssize_t
+ngx_uring_readv_chain(ngx_connection_t *c, ngx_chain_t *chain, off_t limit)
+{
+ u_char *prev;
+ ssize_t n, size;
+ ngx_array_t vec;
+ ngx_event_t *rev;
+ struct iovec *iov;
+ ngx_uring_info_t *ui;
+ struct io_uring_sqe *sqe;
+
+ rev = c->read;
+
+ if(!rev->complete && rev->uring_pending) {
+ ngx_log_error(NGX_LOG_ALERT, c->log, 0, "second uring_readv_chain post");
+ return NGX_AGAIN;
+ }
+
+ if(rev->complete) {
+ n = rev->uring_res;
+ rev->uring_res = 0;
+ rev->available = 0;
+ rev->uring_pending = 0;
+ rev->complete = 0;
+
+ if(n == 0){
+ rev->ready = 0;
+ rev->eof = 1;
+ return 0;
+ }
+ if(n < 0){
+ ngx_connection_error(c, 0, "uring_readv() failed");
+ rev->error = 1;
+ return NGX_ERROR;
+ }
+
+ ngx_log_debug3(NGX_LOG_DEBUG_EVENT, c->log, 0,
+ "uring_readv: fd:%d %ul of %z",
+ c->fd, n, size);
+
+ return n;
+ }
+
+ prev = NULL;
+ iov = NULL;
+ size = 0;
+
+ vec.elts = rev->uring_iov;
+ vec.nelts = 0;
+ vec.size = sizeof(struct iovec);
+ vec.nalloc = NGX_IOVS_PREALLOCATE;
+ vec.pool = c->pool;
+
+ /* coalesce the neighbouring bufs */
+
+ while (chain) {
+ n = chain->buf->end - chain->buf->last;
+
+ if (limit) {
+ if (size >= limit) {
+ break;
+ }
+
+ if (size + n > limit) {
+ n = (ssize_t) (limit - size);
+ }
+ }
+
+ if (prev == chain->buf->last) {
+ iov->iov_len += n;
+
+ } else {
+ if (vec.nelts >= IOV_MAX) {
+ break;
+ }
+
+ iov = ngx_array_push(&vec);
+ if (iov == NULL) {
+ return NGX_ERROR;
+ }
+
+ iov->iov_base = (void *) chain->buf->last;
+ iov->iov_len = n;
+ }
+
+ size += n;
+ prev = chain->buf->end;
+ chain = chain->next;
+ }
+
+ ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
+ "readv: %ui, last:%uz", vec.nelts, iov->iov_len);
+
+ ui = ngx_palloc(c->pool, sizeof(ngx_uring_info_t));
+ if(ui == NULL){
+ return NGX_ERROR;
+ }
+ ui->conn = c;
+ ui->ev = NGX_URING_READV;
+ ui->buf = NULL;
+
+ ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0,
+ "io_uring prep event: fd:%d op:%d ",
+ c->fd, ui->ev);
+
+ sqe = io_uring_get_sqe(&ring);
+ if(sqe == NULL){
+ ngx_log_error(NGX_LOG_ALERT, c->log, 0,
+ "io_uring_get_sqe() failed");
+ return NGX_ERROR;
+ }
+
+ io_uring_prep_readv(sqe, c->fd, (struct iovec *) vec.elts, vec.nelts, 0);
+ io_uring_sqe_set_data(sqe, ui);
+
+ rev->complete = 0;
+ rev->ready = 0;
+ rev->uring_pending = 1;
+
+ return NGX_AGAIN;
+}
+
+#define NGX_SENDFILE_MAXSIZE 2147483647L
+
+ssize_t
+ngx_uring_send(ngx_connection_t *c, u_char *buf, size_t size)
+{
+ ssize_t n;
+ ngx_event_t *wev;
+ ngx_uring_info_t *ui;
+ struct io_uring_sqe *sqe;
+
+ wev = c->write;
+
+ if(wev->uring_pending && !wev->complete){
+ return NGX_AGAIN;
+ }
+
+ if(wev->complete){
+ n = wev->uring_res;
+ wev->complete = 0;
+ wev->uring_pending = 0;
+ wev->uring_res = 0;
+ wev->ready = 1;
+
+ ngx_log_debug3(NGX_LOG_DEBUG_EVENT, c->log, 0,
+ "send: fd:%d %z of %uz", c->fd, n, size);
+
+ if (n == 0) {
+ ngx_log_error(NGX_LOG_ALERT, c->log, 0, "send() returned zero");
+ wev->ready = 0;
+ return n;
+ }
+
+ if (n > 0) {
+ if (n < (ssize_t) size) {
+ wev->ready = 0;
+ }
+
+ c->sent += n;
+
+ return n;
+ }
+
+
+ wev->error = 1;
+ (void) ngx_connection_error(c, 0, "send() failed");
+ return NGX_ERROR;
+
+ }
+
+ ui = ngx_palloc(c->pool, sizeof(ngx_uring_info_t));
+ if(ui == NULL){
+ return NGX_ERROR;
+ }
+ ui->conn = c;
+ ui->ev = NGX_URING_SEND;
+ ui->buf = NULL;
+
+ ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0,
+ "io_uring prep event: fd:%d op:%d ",
+ c->fd, ui->ev);
+
+ sqe = io_uring_get_sqe(&ring);
+ if(sqe == NULL){
+ ngx_log_error(NGX_LOG_ALERT, c->log, 0,
+ "io_uring_get_sqe() failed");
+ return NGX_ERROR;
+ }
+
+ io_uring_prep_send(sqe, c->fd, buf, size, 0);
+ io_uring_sqe_set_data(sqe, ui);
+
+ wev->uring_rq_size = size;
+ wev->uring_pending = 1;
+ wev->complete = 0;
+ wev->ready = 0;
+
+ return NGX_AGAIN;
+}
+
+static ssize_t
+ngx_uring_writev(ngx_connection_t *c, int nelts, int start_el)
+{
+ ngx_uring_info_t *ui;
+ struct io_uring_sqe *sqe;
+
+ ui = ngx_palloc(c->pool, sizeof(ngx_uring_info_t));
+ if(ui == NULL){
+ return NGX_ERROR;
+ }
+ ui->conn = c;
+ ui->ev = NGX_URING_WRITEV;
+ ui->buf = NULL;
+
+ ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0,
+ "io_uring prep event: fd:%d op:%d ",
+ c->fd, ui->ev);
+
+ sqe = io_uring_get_sqe(&ring);
+ if(sqe == NULL){
+ ngx_log_error(NGX_LOG_ALERT, c->log, 0,
+ "io_uring_get_sqe() failed");
+ return NGX_ERROR;
+ }
+
+ io_uring_prep_writev(sqe, c->fd, &c->write->uring_iov[start_el], nelts - start_el , 0);
+ io_uring_sqe_set_data(sqe, ui);
+
+ return NGX_AGAIN;
+}
+
+
+ngx_chain_t *
+ngx_uring_writev_chain(ngx_connection_t *c, ngx_chain_t *in, off_t limit)
+{
+ u_char *prev;
+ ssize_t n, sent, size;
+ off_t send;
+ ngx_chain_t *cl;
+ ngx_event_t *wev;
+ int nelts, start_el;
+
+ wev = c->write;
+
+ if(wev->uring_pending && !wev->complete){
+ return in;
+ }
+
+ if(wev->complete){
+ sent = wev->uring_res;
+
+ wev->complete = 0;
+ wev->uring_pending = 0;
+ wev->uring_res = 0;
+ wev->ready = 1;
+
+ if(sent != wev->uring_rq_size){
+ ngx_connection_error(c, 0, "uring_writev_chain failed");
+ return NGX_CHAIN_ERROR;
+ }
+
+ c->sent += sent;
+ in = ngx_chain_update_sent(in, sent);
+
+ return in;
+ }
+
+ send = 0;
+ nelts = 0;
+ prev = NULL;
+ start_el = 0;
+
+
+ /* the maximum limit size is the maximum size_t value - the page size */
+
+ if (limit == 0 || limit > (off_t) (NGX_MAX_SIZE_T_VALUE - ngx_pagesize)) {
+ limit = NGX_MAX_SIZE_T_VALUE - ngx_pagesize;
+ }
+
+ for (cl = in;
+ cl && nelts < NGX_IOVS_PREALLOCATE && send < limit;
+ )
+ {
+
+ if (ngx_buf_special(cl->buf)) {
+ cl = cl->next;
+ continue;
+ }
+
+ if (cl && cl->buf->in_file) {
+ ngx_log_error(NGX_LOG_ALERT, c->log, 0,
+ "file buf in writev "
+ "t:%d r:%d f:%d %p %p-%p %p %O-%O",
+ cl->buf->temporary,
+ cl->buf->recycled,
+ cl->buf->in_file,
+ cl->buf->start,
+ cl->buf->pos,
+ cl->buf->last,
+ cl->buf->file,
+ cl->buf->file_pos,
+ cl->buf->file_last);
+ ngx_debug_point();
+
+ return NGX_CHAIN_ERROR;
+ }
+
+
+ size = cl->buf->last - cl->buf->pos;
+ if (send + size > limit) {
+ size = (u_long) (limit - send);
+ }
+
+ if (prev == cl->buf->pos) {
+ wev->uring_iov[nelts - 1].iov_len += cl->buf->last - cl->buf->pos;
+
+ } else {
+ ++nelts;
+ if (nelts >= NGX_IOVS_PREALLOCATE) {
+ wev->error = 1;
+ return NGX_CHAIN_ERROR;
+ }
+ wev->uring_iov[nelts - 1].iov_base = (void *) cl->buf->pos;
+ wev->uring_iov[nelts - 1].iov_len = cl->buf->last - cl->buf->pos;
+ }
+ prev = cl->buf->last;
+ send += size;
+ cl = cl->next;
+ }
+
+ if(nelts - start_el > 0){
+ n = ngx_uring_writev(c, nelts, start_el);
+
+ if (n == NGX_ERROR) {
+ return NGX_CHAIN_ERROR;
+ }
+
+ sent = 0;
+ }
+
+ wev->uring_rq_size = send;
+ wev->uring_pending = 1;
+ wev->complete = 0;
+ wev->ready = 0;
+
+ return in;
+}
+
+#if (NGX_USE_URING_SPLICE)
+static ssize_t
+ngx_uring_splice_sendfile(ngx_connection_t *c, ngx_buf_t *file, size_t size)
+{
+ ngx_uring_info_t *ui;
+ struct io_uring_sqe *sqe;
+#if (NGX_HAVE_SENDFILE64)
+ off_t offset;
+#else
+ int32_t offset;
+#endif
+#if (NGX_HAVE_SENDFILE64)
+ offset = file->file_pos;
+#else
+ offset = (int32_t) file->file_pos;
+#endif
+
+ ui = ngx_palloc(c->pool, sizeof(ngx_uring_info_t));
+ if(ui == NULL){
+ return NGX_ERROR;
+ }
+ ui->conn = c;
+ ui->ev = NGX_URING_SPLICE_TO_PIPE;
+ ui->buf = NULL;
+
+ ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0,
+ "io_uring prep event: fd:%d op:%d ",
+ c->fd, ui->ev);
+
+ sqe = io_uring_get_sqe(&ring);
+ if(sqe == NULL){
+ ngx_log_error(NGX_LOG_ALERT, c->log, 0,
+ "io_uring_get_sqe() failed");
+ return NGX_ERROR;
+ }
+ io_uring_prep_splice(sqe, file->file->fd, offset,
+ c->write->uring_splice_pipe[1], -1, size, SPLICE_F_MOVE | SPLICE_F_MORE);
+ sqe->flags = IOSQE_IO_LINK;
+ io_uring_sqe_set_data(sqe, ui);
+
+
+ ui = ngx_palloc(c->pool, sizeof(ngx_uring_info_t));
+ if(ui == NULL){
+ return NGX_ERROR;
+ }
+ ui->conn = c;
+ ui->ev = NGX_URING_SPLICE_FROM_PIPE;
+ ui->buf = NULL;
+
+ ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0,
+ "io_uring prep event: fd:%d op:%d ",
+ c->fd, ui->ev);
+
+ sqe = io_uring_get_sqe(&ring);
+ if(sqe == NULL){
+ ngx_log_error(NGX_LOG_ALERT, c->log, 0,
+ "io_uring_get_sqe() failed");
+ return NGX_ERROR;
+ }
+ io_uring_prep_splice(sqe, c->write->uring_splice_pipe[0], -1,
+ c->fd, -1, size, SPLICE_F_MOVE | SPLICE_F_MORE);
+ io_uring_sqe_set_data(sqe, ui);
+
+
+ ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
+ "uring_splice_sendfile: @%O %uz", file->file_pos, size);
+
+ return NGX_AGAIN;
+}
+#else
+static ssize_t
+ngx_uring_read_sendfile(ngx_connection_t *c, ngx_buf_t *file, size_t size)
+{
+ ngx_uring_info_t *ui;
+ struct io_uring_sqe *sqe;
+ void *buf;
+#if (NGX_HAVE_SENDFILE64)
+ off_t offset;
+#else
+ int32_t offset;
+#endif
+#if (NGX_HAVE_SENDFILE64)
+ offset = file->file_pos;
+#else
+ offset = (int32_t) file->file_pos;
+#endif
+
+ buf = ngx_palloc(c->pool, size);
+ if(buf == NULL){
+ return NGX_ERROR;
+ }
+
+ ui = ngx_palloc(c->pool, sizeof(ngx_uring_info_t));
+ if(ui == NULL){
+ return NGX_ERROR;
+ }
+ ui->conn = c;
+ ui->ev = NGX_URING_READFILE;
+ ui->buf = NULL;
+
+ ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0,
+ "io_uring prep event: fd:%d op:%d ",
+ c->fd, ui->ev);
+
+ sqe = io_uring_get_sqe(&ring);
+ if(sqe == NULL){
+ ngx_log_error(NGX_LOG_ALERT, c->log, 0,
+ "io_uring_get_sqe() failed");
+ return NGX_ERROR;
+ }
+
+ io_uring_prep_read(sqe, file->file->fd, buf, size, offset );
+ sqe->flags = IOSQE_IO_LINK;
+ io_uring_sqe_set_data(sqe, ui);
+
+
+ ui = ngx_palloc(c->pool, sizeof(ngx_uring_info_t));
+ if(ui == NULL){
+ return NGX_ERROR;
+ }
+ ui->conn = c;
+ ui->ev = NGX_URING_SEND;
+ ui->buf = buf;
+
+ ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0,
+ "io_uring prep event: fd:%d op:%d ",
+ c->fd, ui->ev);
+
+ sqe = io_uring_get_sqe(&ring);
+ if(sqe == NULL){
+ ngx_log_error(NGX_LOG_ALERT, c->log, 0,
+ "io_uring_get_sqe() failed");
+ return NGX_ERROR;
+ }
+ io_uring_prep_send(sqe, c->fd, buf, size, 0);
+ io_uring_sqe_set_data(sqe, ui);
+
+
+ ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
+ "uring_splice_sendfile: @%O %uz", file->file_pos, size);
+
+ return NGX_AGAIN;
+}
+#endif
+
+
+ngx_chain_t *
+ngx_uring_sendfile_chain(ngx_connection_t *c, ngx_chain_t *in, off_t limit)
+{
+ u_char *prev;
+ off_t send;
+ size_t file_size;
+ ngx_buf_t *file;
+ ngx_event_t *wev;
+ ngx_chain_t *cl;
+ ssize_t n, sent, size;
+ int nelts, start_el, pending;
+
+ wev = c->write;
+
+ if(wev->uring_pending && !wev->complete){
+ return in;
+ }
+
+ if(wev->complete){
+ sent = wev->uring_res;
+
+ wev->complete = 0;
+ wev->uring_pending = 0;
+ wev->uring_res = 0;
+ wev->ready = 1;
+
+ if(sent != wev->uring_rq_size){
+ ngx_connection_error(c, 0, "uring_writev_chain failed");
+ return NGX_CHAIN_ERROR;
+ }
+
+ c->sent += sent;
+ in = ngx_chain_update_sent(in, sent);
+
+ return in;
+ }
+ sent = 0;
+ send = 0;
+ nelts = 0;
+ prev = NULL;
+ start_el = 0;
+ pending = 0;
+
+ if (limit == 0 || limit > (off_t) (NGX_SENDFILE_MAXSIZE - ngx_pagesize)) {
+ limit = NGX_SENDFILE_MAXSIZE - ngx_pagesize;
+ }
+
+ for (cl = in;
+ cl && nelts < NGX_IOVS_PREALLOCATE && send < limit;
+ )
+ {
+ if (ngx_buf_special(cl->buf)) {
+ cl = cl->next;
+ continue;
+ }
+
+ if (cl->buf->in_file) {
+ if(nelts > 0){
+ ++pending;
+ n = ngx_uring_writev(c, nelts, start_el);
+
+ if (n == NGX_ERROR) {
+ return NGX_CHAIN_ERROR;
+ }
+
+ start_el += nelts;
+ prev = NULL;
+ }
+
+ file = cl->buf;
+
+ /* coalesce the neighbouring file bufs */
+
+ file_size = (size_t) ngx_chain_coalesce_file(&cl, limit - send);
+
+ send += file_size;
+
+ if (file_size == 0) {
+ ngx_debug_point();
+ return NGX_CHAIN_ERROR;
+ }
+
+ if(file_size >= linux_sendfile_bound) {
+ io_uring_submit(&ring);
+
+ n = ngx_linux_sendfile(c, file, file_size);
+
+ if (n == NGX_ERROR) {
+ return NGX_CHAIN_ERROR;
+ }
+
+ if(n != NGX_AGAIN){
+ sent += n;
+ continue;
+ }
+ }
+
+#if (NGX_USE_URING_SPLICE)
+ n = ngx_uring_splice_sendfile(c, file, file_size);
+#else
+ n = ngx_uring_read_sendfile(c, file, file_size);
+#endif
+
+ if (n == NGX_ERROR) {
+ wev->error = 1;
+ return NGX_CHAIN_ERROR;
+ }
+
+ ++pending;
+ continue;
+ }
+
+ size = cl->buf->last - cl->buf->pos;
+ if (send + size > limit) {
+ size = (u_long) (limit - send);
+ }
+
+ if (prev == cl->buf->pos) {
+ wev->uring_iov[nelts - 1].iov_len += cl->buf->last - cl->buf->pos;
+
+ } else {
+ ++nelts;
+ if (nelts >= NGX_IOVS_PREALLOCATE) {
+ wev->error = 1;
+ return NGX_CHAIN_ERROR;
+ }
+ wev->uring_iov[nelts - 1].iov_base = (void *) cl->buf->pos;
+ wev->uring_iov[nelts - 1].iov_len = cl->buf->last - cl->buf->pos;
+ }
+
+ prev = cl->buf->last;
+ send += size;
+ cl = cl->next;
+ }
+
+ if(nelts - start_el > 0){
+ ++pending;
+ n = ngx_uring_writev(c, nelts, start_el);
+
+ if (n == NGX_ERROR) {
+ return NGX_CHAIN_ERROR;
+ }
+ }
+
+ wev->uring_rq_size = send;
+ wev->uring_pending = pending;
+ wev->complete = 0;
+ wev->ready = 0;
+ wev->uring_res = sent;
+
+ return in;
+}
diff -r ce34097321cf -r d7f0f0b78a24 src/event/ngx_event.c
--- a/src/event/ngx_event.c Mon Nov 23 10:49:15 2020 +0000
+++ b/src/event/ngx_event.c Mon Nov 23 10:52:02 2020 +0000
@@ -17,6 +17,7 @@
extern ngx_module_t ngx_eventport_module;
extern ngx_module_t ngx_devpoll_module;
extern ngx_module_t ngx_epoll_module;
+extern ngx_module_t ngx_uring_module;
extern ngx_module_t ngx_select_module;


_______________________________________________
nginx-devel mailing list
nginx-devel@nginx.org
http://mailman.nginx.org/mailman/listinfo/nginx-devel
Subject Author Views Posted

[PATCH 4 of 8] Added io_uring event module

SoYun Seong 117 November 24, 2020 03:46AM



Sorry, you do not have permission to post/reply in this forum.

Online Users

Guests: 43
Record Number of Users: 6 on February 13, 2018
Record Number of Guests: 421 on December 02, 2018
Powered by nginx      Powered by FreeBSD      PHP Powered      Powered by MariaDB      ipv6 ready