Commit ba816ad6 authored by Jens Axboe's avatar Jens Axboe
Browse files

io_uring: run dependent links inline if possible



Currently any dependent link is executed from a new workqueue context,
which means that we'll be doing a context switch per link in the chain.
If we are running the completion of the current request from our async
workqueue and find that the next request is a link, then run it directly
from the workqueue context instead of forcing another switch.

This improves the performance of linked SQEs, and reduces the CPU
overhead.

Reviewed-by: default avatarJackie Liu <liuyun01@kylinos.cn>
Signed-off-by: default avatarJens Axboe <axboe@kernel.dk>
parent d848074b
Loading
Loading
Loading
Loading
+113 −47
Original line number Diff line number Diff line
@@ -666,7 +666,7 @@ static void __io_free_req(struct io_kiocb *req)
	kmem_cache_free(req_cachep, req);
}

static void io_req_link_next(struct io_kiocb *req)
static void io_req_link_next(struct io_kiocb *req, struct io_kiocb **nxtptr)
{
	struct io_kiocb *nxt;

@@ -685,10 +685,18 @@ static void io_req_link_next(struct io_kiocb *req)
		}

		nxt->flags |= REQ_F_LINK_DONE;
		/*
		 * If we're in async work, we can continue processing the chain
		 * in this context instead of having to queue up new async work.
		 */
		if (nxtptr && current_work()) {
			*nxtptr = nxt;
		} else {
			INIT_WORK(&nxt->work, io_sq_wq_submit_work);
			io_queue_async_work(req->ctx, nxt);
		}
	}
}

/*
 * Called if REQ_F_LINK is set, and we fail the head request
@@ -706,7 +714,7 @@ static void io_fail_links(struct io_kiocb *req)
	}
}

static void io_free_req(struct io_kiocb *req)
static void io_free_req(struct io_kiocb *req, struct io_kiocb **nxt)
{
	/*
	 * If LINK is set, we have dependent requests in this chain. If we
@@ -718,16 +726,39 @@ static void io_free_req(struct io_kiocb *req)
		if (req->flags & REQ_F_FAIL_LINK)
			io_fail_links(req);
		else
			io_req_link_next(req);
			io_req_link_next(req, nxt);
	}

	__io_free_req(req);
}

static void io_put_req(struct io_kiocb *req)
/*
 * Drop reference to request, return next in chain (if there is one) if this
 * was the last reference to this request.
 */
static struct io_kiocb *io_put_req_find_next(struct io_kiocb *req)
{
	struct io_kiocb *nxt = NULL;

	if (refcount_dec_and_test(&req->refs))
		io_free_req(req);
		io_free_req(req, &nxt);

	return nxt;
}

static void io_put_req(struct io_kiocb *req, struct io_kiocb **nxtptr)
{
	struct io_kiocb *nxt;

	nxt = io_put_req_find_next(req);
	if (nxt) {
		if (nxtptr) {
			*nxtptr = nxt;
		} else {
			INIT_WORK(&nxt->work, io_sq_wq_submit_work);
			io_queue_async_work(nxt->ctx, nxt);
		}
	}
}

static unsigned io_cqring_events(struct io_rings *rings)
@@ -775,7 +806,7 @@ static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
				if (to_free == ARRAY_SIZE(reqs))
					io_free_req_many(ctx, reqs, &to_free);
			} else {
				io_free_req(req);
				io_free_req(req, NULL);
			}
		}
	}
@@ -947,7 +978,7 @@ static void kiocb_end_write(struct io_kiocb *req)
	file_end_write(req->file);
}

static void io_complete_rw(struct kiocb *kiocb, long res, long res2)
static void io_complete_rw_common(struct kiocb *kiocb, long res)
{
	struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);

@@ -957,7 +988,22 @@ static void io_complete_rw(struct kiocb *kiocb, long res, long res2)
	if ((req->flags & REQ_F_LINK) && res != req->result)
		req->flags |= REQ_F_FAIL_LINK;
	io_cqring_add_event(req->ctx, req->user_data, res);
	io_put_req(req);
}

static void io_complete_rw(struct kiocb *kiocb, long res, long res2)
{
	struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);

	io_complete_rw_common(kiocb, res);
	io_put_req(req, NULL);
}

static struct io_kiocb *__io_complete_rw(struct kiocb *kiocb, long res)
{
	struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);

	io_complete_rw_common(kiocb, res);
	return io_put_req_find_next(req);
}

static void io_complete_rw_iopoll(struct kiocb *kiocb, long res, long res2)
@@ -1153,6 +1199,15 @@ static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret)
	}
}

static void kiocb_done(struct kiocb *kiocb, ssize_t ret, struct io_kiocb **nxt,
		       bool in_async)
{
	if (in_async && ret >= 0 && nxt && kiocb->ki_complete == io_complete_rw)
		*nxt = __io_complete_rw(kiocb, ret);
	else
		io_rw_done(kiocb, ret);
}

static int io_import_fixed(struct io_ring_ctx *ctx, int rw,
			   const struct io_uring_sqe *sqe,
			   struct iov_iter *iter)
@@ -1369,7 +1424,7 @@ static ssize_t loop_rw_iter(int rw, struct file *file, struct kiocb *kiocb,
}

static int io_read(struct io_kiocb *req, const struct sqe_submit *s,
		   bool force_nonblock)
		   struct io_kiocb **nxt, bool force_nonblock)
{
	struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
	struct kiocb *kiocb = &req->rw;
@@ -1418,7 +1473,7 @@ static int io_read(struct io_kiocb *req, const struct sqe_submit *s,
			ret2 = -EAGAIN;
		/* Catch -EAGAIN return for forced non-blocking submission */
		if (!force_nonblock || ret2 != -EAGAIN) {
			io_rw_done(kiocb, ret2);
			kiocb_done(kiocb, ret2, nxt, s->needs_lock);
		} else {
			/*
			 * If ->needs_lock is true, we're already in async
@@ -1434,7 +1489,7 @@ static int io_read(struct io_kiocb *req, const struct sqe_submit *s,
}

static int io_write(struct io_kiocb *req, const struct sqe_submit *s,
		    bool force_nonblock)
		    struct io_kiocb **nxt, bool force_nonblock)
{
	struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
	struct kiocb *kiocb = &req->rw;
@@ -1492,7 +1547,7 @@ static int io_write(struct io_kiocb *req, const struct sqe_submit *s,
		else
			ret2 = loop_rw_iter(WRITE, file, kiocb, &iter);
		if (!force_nonblock || ret2 != -EAGAIN) {
			io_rw_done(kiocb, ret2);
			kiocb_done(kiocb, ret2, nxt, s->needs_lock);
		} else {
			/*
			 * If ->needs_lock is true, we're already in async
@@ -1520,7 +1575,7 @@ static int io_nop(struct io_kiocb *req, u64 user_data)
		return -EINVAL;

	io_cqring_add_event(ctx, user_data, err);
	io_put_req(req);
	io_put_req(req, NULL);
	return 0;
}

@@ -1540,7 +1595,7 @@ static int io_prep_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe)
}

static int io_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe,
		    bool force_nonblock)
		    struct io_kiocb **nxt, bool force_nonblock)
{
	loff_t sqe_off = READ_ONCE(sqe->off);
	loff_t sqe_len = READ_ONCE(sqe->len);
@@ -1567,7 +1622,7 @@ static int io_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe,
	if (ret < 0 && (req->flags & REQ_F_LINK))
		req->flags |= REQ_F_FAIL_LINK;
	io_cqring_add_event(req->ctx, sqe->user_data, ret);
	io_put_req(req);
	io_put_req(req, nxt);
	return 0;
}

@@ -1589,6 +1644,7 @@ static int io_prep_sfr(struct io_kiocb *req, const struct io_uring_sqe *sqe)

static int io_sync_file_range(struct io_kiocb *req,
			      const struct io_uring_sqe *sqe,
			      struct io_kiocb **nxt,
			      bool force_nonblock)
{
	loff_t sqe_off;
@@ -1613,13 +1669,13 @@ static int io_sync_file_range(struct io_kiocb *req,
	if (ret < 0 && (req->flags & REQ_F_LINK))
		req->flags |= REQ_F_FAIL_LINK;
	io_cqring_add_event(req->ctx, sqe->user_data, ret);
	io_put_req(req);
	io_put_req(req, nxt);
	return 0;
}

#if defined(CONFIG_NET)
static int io_send_recvmsg(struct io_kiocb *req, const struct io_uring_sqe *sqe,
			   bool force_nonblock,
			   struct io_kiocb **nxt, bool force_nonblock,
		   long (*fn)(struct socket *, struct user_msghdr __user *,
				unsigned int))
{
@@ -1649,26 +1705,28 @@ static int io_send_recvmsg(struct io_kiocb *req, const struct io_uring_sqe *sqe,
	}

	io_cqring_add_event(req->ctx, sqe->user_data, ret);
	io_put_req(req);
	io_put_req(req, nxt);
	return 0;
}
#endif

static int io_sendmsg(struct io_kiocb *req, const struct io_uring_sqe *sqe,
		      bool force_nonblock)
		      struct io_kiocb **nxt, bool force_nonblock)
{
#if defined(CONFIG_NET)
	return io_send_recvmsg(req, sqe, force_nonblock, __sys_sendmsg_sock);
	return io_send_recvmsg(req, sqe, nxt, force_nonblock,
				__sys_sendmsg_sock);
#else
	return -EOPNOTSUPP;
#endif
}

static int io_recvmsg(struct io_kiocb *req, const struct io_uring_sqe *sqe,
		      bool force_nonblock)
		      struct io_kiocb **nxt, bool force_nonblock)
{
#if defined(CONFIG_NET)
	return io_send_recvmsg(req, sqe, force_nonblock, __sys_recvmsg_sock);
	return io_send_recvmsg(req, sqe, nxt, force_nonblock,
				__sys_recvmsg_sock);
#else
	return -EOPNOTSUPP;
#endif
@@ -1728,7 +1786,7 @@ static int io_poll_remove(struct io_kiocb *req, const struct io_uring_sqe *sqe)
	spin_unlock_irq(&ctx->completion_lock);

	io_cqring_add_event(req->ctx, sqe->user_data, ret);
	io_put_req(req);
	io_put_req(req, NULL);
	return 0;
}

@@ -1769,7 +1827,7 @@ static void io_poll_complete_work(struct work_struct *work)
	spin_unlock_irq(&ctx->completion_lock);

	io_cqring_ev_posted(ctx);
	io_put_req(req);
	io_put_req(req, NULL);
}

static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
@@ -1794,7 +1852,7 @@ static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
		spin_unlock_irqrestore(&ctx->completion_lock, flags);

		io_cqring_ev_posted(ctx);
		io_put_req(req);
		io_put_req(req, NULL);
	} else {
		io_queue_async_work(ctx, req);
	}
@@ -1886,7 +1944,7 @@ static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe)

	if (mask) {
		io_cqring_ev_posted(ctx);
		io_put_req(req);
		io_put_req(req, NULL);
	}
	return ipt.error;
}
@@ -1919,7 +1977,7 @@ static enum hrtimer_restart io_timeout_fn(struct hrtimer *timer)

	io_cqring_ev_posted(ctx);

	io_put_req(req);
	io_put_req(req, NULL);
	return HRTIMER_NORESTART;
}

@@ -2028,7 +2086,8 @@ static int io_req_defer(struct io_ring_ctx *ctx, struct io_kiocb *req,
}

static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
			   const struct sqe_submit *s, bool force_nonblock)
			   const struct sqe_submit *s, struct io_kiocb **nxt,
			   bool force_nonblock)
{
	int ret, opcode;

@@ -2045,21 +2104,21 @@ static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
	case IORING_OP_READV:
		if (unlikely(s->sqe->buf_index))
			return -EINVAL;
		ret = io_read(req, s, force_nonblock);
		ret = io_read(req, s, nxt, force_nonblock);
		break;
	case IORING_OP_WRITEV:
		if (unlikely(s->sqe->buf_index))
			return -EINVAL;
		ret = io_write(req, s, force_nonblock);
		ret = io_write(req, s, nxt, force_nonblock);
		break;
	case IORING_OP_READ_FIXED:
		ret = io_read(req, s, force_nonblock);
		ret = io_read(req, s, nxt, force_nonblock);
		break;
	case IORING_OP_WRITE_FIXED:
		ret = io_write(req, s, force_nonblock);
		ret = io_write(req, s, nxt, force_nonblock);
		break;
	case IORING_OP_FSYNC:
		ret = io_fsync(req, s->sqe, force_nonblock);
		ret = io_fsync(req, s->sqe, nxt, force_nonblock);
		break;
	case IORING_OP_POLL_ADD:
		ret = io_poll_add(req, s->sqe);
@@ -2068,13 +2127,13 @@ static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
		ret = io_poll_remove(req, s->sqe);
		break;
	case IORING_OP_SYNC_FILE_RANGE:
		ret = io_sync_file_range(req, s->sqe, force_nonblock);
		ret = io_sync_file_range(req, s->sqe, nxt, force_nonblock);
		break;
	case IORING_OP_SENDMSG:
		ret = io_sendmsg(req, s->sqe, force_nonblock);
		ret = io_sendmsg(req, s->sqe, nxt, force_nonblock);
		break;
	case IORING_OP_RECVMSG:
		ret = io_recvmsg(req, s->sqe, force_nonblock);
		ret = io_recvmsg(req, s->sqe, nxt, force_nonblock);
		break;
	case IORING_OP_TIMEOUT:
		ret = io_timeout(req, s->sqe);
@@ -2141,6 +2200,7 @@ restart:
		struct sqe_submit *s = &req->submit;
		const struct io_uring_sqe *sqe = s->sqe;
		unsigned int flags = req->flags;
		struct io_kiocb *nxt = NULL;

		/* Ensure we clear previously set non-block flag */
		req->rw.ki_flags &= ~IOCB_NOWAIT;
@@ -2161,7 +2221,7 @@ restart:
			s->has_user = cur_mm != NULL;
			s->needs_lock = true;
			do {
				ret = __io_submit_sqe(ctx, req, s, false);
				ret = __io_submit_sqe(ctx, req, s, &nxt, false);
				/*
				 * We can get EAGAIN for polled IO even though
				 * we're forcing a sync submission from here,
@@ -2175,16 +2235,22 @@ restart:
		}

		/* drop submission reference */
		io_put_req(req);
		io_put_req(req, NULL);

		if (ret) {
			io_cqring_add_event(ctx, sqe->user_data, ret);
			io_put_req(req);
			io_put_req(req, NULL);
		}

		/* async context always use a copy of the sqe */
		kfree(sqe);

		/* if a dependent link is ready, do that as the next one */
		if (!ret && nxt) {
			req = nxt;
			continue;
		}

		/* req from defer and link list needn't decrease async cnt */
		if (flags & (REQ_F_IO_DRAINED | REQ_F_LINK_DONE))
			goto out;
@@ -2331,7 +2397,7 @@ static int __io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
{
	int ret;

	ret = __io_submit_sqe(ctx, req, s, true);
	ret = __io_submit_sqe(ctx, req, s, NULL, true);

	/*
	 * We async punt it if the file wasn't marked NOWAIT, or if the file
@@ -2364,14 +2430,14 @@ static int __io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
	}

	/* drop submission reference */
	io_put_req(req);
	io_put_req(req, NULL);

	/* and drop final reference, if we failed */
	if (ret) {
		io_cqring_add_event(ctx, req->user_data, ret);
		if (req->flags & REQ_F_LINK)
			req->flags |= REQ_F_FAIL_LINK;
		io_put_req(req);
		io_put_req(req, NULL);
	}

	return ret;
@@ -2385,7 +2451,7 @@ static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
	ret = io_req_defer(ctx, req, s->sqe);
	if (ret) {
		if (ret != -EIOCBQUEUED) {
			io_free_req(req);
			io_free_req(req, NULL);
			io_cqring_add_event(ctx, s->sqe->user_data, ret);
		}
		return 0;
@@ -2412,7 +2478,7 @@ static int io_queue_link_head(struct io_ring_ctx *ctx, struct io_kiocb *req,
	ret = io_req_defer(ctx, req, s->sqe);
	if (ret) {
		if (ret != -EIOCBQUEUED) {
			io_free_req(req);
			io_free_req(req, NULL);
			__io_free_req(shadow);
			io_cqring_add_event(ctx, s->sqe->user_data, ret);
			return 0;
@@ -2460,7 +2526,7 @@ static void io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
	ret = io_req_set_file(ctx, s, state, req);
	if (unlikely(ret)) {
err_req:
		io_free_req(req);
		io_free_req(req, NULL);
err:
		io_cqring_add_event(ctx, s->sqe->user_data, ret);
		return;