Commit d8a6df10 authored by Jens Axboe's avatar Jens Axboe
Browse files

io_uring: use percpu counters to track inflight requests



Even though we place the req_issued and req_complete in separate
cachelines, there's considerable overhead in doing the atomics
particularly on the completion side.

Get rid of having the two counters, and just use a percpu_counter for
this. That's what it was made for, after all. This considerably
reduces the overhead in __io_free_req().

Signed-off-by: default avatarJens Axboe <axboe@kernel.dk>
parent 500a373d
Loading
Loading
Loading
Loading
+28 −22
Original line number Diff line number Diff line
@@ -1810,7 +1810,7 @@ static void __io_free_req(struct io_kiocb *req)

	io_dismantle_req(req);

	atomic_long_inc(&tctx->req_complete);
	percpu_counter_dec(&tctx->inflight);
	if (tctx->in_idle)
		wake_up(&tctx->wait);
	put_task_struct(req->task);
@@ -2089,7 +2089,9 @@ static void io_req_free_batch_finish(struct io_ring_ctx *ctx,
	if (rb->to_free)
		__io_req_free_batch_flush(ctx, rb);
	if (rb->task) {
		atomic_long_add(rb->task_refs, &rb->task->io_uring->req_complete);
		struct io_uring_task *tctx = rb->task->io_uring;

		percpu_counter_sub(&tctx->inflight, rb->task_refs);
		put_task_struct_many(rb->task, rb->task_refs);
		rb->task = NULL;
	}
@@ -2106,7 +2108,9 @@ static void io_req_free_batch(struct req_batch *rb, struct io_kiocb *req)

	if (req->task != rb->task) {
		if (rb->task) {
			atomic_long_add(rb->task_refs, &rb->task->io_uring->req_complete);
			struct io_uring_task *tctx = rb->task->io_uring;

			percpu_counter_sub(&tctx->inflight, rb->task_refs);
			put_task_struct_many(rb->task, rb->task_refs);
		}
		rb->task = req->task;
@@ -6524,7 +6528,7 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr)
	if (!percpu_ref_tryget_many(&ctx->refs, nr))
		return -EAGAIN;

	atomic_long_add(nr, &current->io_uring->req_issue);
	percpu_counter_add(&current->io_uring->inflight, nr);
	refcount_add(nr, &current->usage);

	io_submit_state_start(&state, ctx, nr);
@@ -6566,10 +6570,12 @@ fail_req:

	if (unlikely(submitted != nr)) {
		int ref_used = (submitted == -EAGAIN) ? 0 : submitted;
		struct io_uring_task *tctx = current->io_uring;
		int unused = nr - ref_used;

		percpu_ref_put_many(&ctx->refs, nr - ref_used);
		atomic_long_sub(nr - ref_used, &current->io_uring->req_issue);
		put_task_struct_many(current, nr - ref_used);
		percpu_ref_put_many(&ctx->refs, unused);
		percpu_counter_sub(&tctx->inflight, unused);
		put_task_struct_many(current, unused);
	}
	if (link)
		io_queue_link_head(link, &state.comp);
@@ -7687,17 +7693,22 @@ out_fput:
static int io_uring_alloc_task_context(struct task_struct *task)
{
	struct io_uring_task *tctx;
	int ret;

	tctx = kmalloc(sizeof(*tctx), GFP_KERNEL);
	if (unlikely(!tctx))
		return -ENOMEM;

	ret = percpu_counter_init(&tctx->inflight, 0, GFP_KERNEL);
	if (unlikely(ret)) {
		kfree(tctx);
		return ret;
	}

	xa_init(&tctx->xa);
	init_waitqueue_head(&tctx->wait);
	tctx->last = NULL;
	tctx->in_idle = 0;
	atomic_long_set(&tctx->req_issue, 0);
	atomic_long_set(&tctx->req_complete, 0);
	io_init_identity(&tctx->__identity);
	tctx->identity = &tctx->__identity;
	task->io_uring = tctx;
@@ -7712,6 +7723,7 @@ void __io_uring_free(struct task_struct *tsk)
	WARN_ON_ONCE(refcount_read(&tctx->identity->count) != 1);
	if (tctx->identity != &tctx->__identity)
		kfree(tctx->identity);
	percpu_counter_destroy(&tctx->inflight);
	kfree(tctx);
	tsk->io_uring = NULL;
}
@@ -8696,12 +8708,6 @@ void __io_uring_files_cancel(struct files_struct *files)
	}
}

static inline bool io_uring_task_idle(struct io_uring_task *tctx)
{
	return atomic_long_read(&tctx->req_issue) ==
		atomic_long_read(&tctx->req_complete);
}

/*
 * Find any io_uring fd that this task has registered or done IO on, and cancel
 * requests.
@@ -8710,14 +8716,16 @@ void __io_uring_task_cancel(void)
{
	struct io_uring_task *tctx = current->io_uring;
	DEFINE_WAIT(wait);
	long completions;
	s64 inflight;

	/* make sure overflow events are dropped */
	tctx->in_idle = true;

	while (!io_uring_task_idle(tctx)) {
	do {
		/* read completions before cancelations */
		completions = atomic_long_read(&tctx->req_complete);
		inflight = percpu_counter_sum(&tctx->inflight);
		if (!inflight)
			break;
		__io_uring_files_cancel(NULL);

		prepare_to_wait(&tctx->wait, &wait, TASK_UNINTERRUPTIBLE);
@@ -8726,12 +8734,10 @@ void __io_uring_task_cancel(void)
		 * If we've seen completions, retry. This avoids a race where
		 * a completion comes in before we did prepare_to_wait().
		 */
		if (completions != atomic_long_read(&tctx->req_complete))
		if (inflight != percpu_counter_sum(&tctx->inflight))
			continue;
		if (io_uring_task_idle(tctx))
			break;
		schedule();
	}
	} while (1);

	finish_wait(&tctx->wait, &wait);
	tctx->in_idle = false;
+2 −5
Original line number Diff line number Diff line
@@ -23,13 +23,10 @@ struct io_uring_task {
	struct xarray		xa;
	struct wait_queue_head	wait;
	struct file		*last;
	atomic_long_t		req_issue;
	struct percpu_counter	inflight;
	struct io_identity	__identity;
	struct io_identity	*identity;

	/* completion side */
	bool			in_idle ____cacheline_aligned_in_smp;
	atomic_long_t		req_complete;
	bool			in_idle;
};

#if defined(CONFIG_IO_URING)