Commit 08369246 authored by Xiaoguang Wang's avatar Xiaoguang Wang Committed by Jens Axboe
Browse files

io_uring: refactor io_sq_thread() handling



There are some issues about current io_sq_thread() implementation:
  1. The prepare_to_wait() usage in __io_sq_thread() is weird. If
multiple ctxs share one same poll thread, one ctx will put poll thread
in TASK_INTERRUPTIBLE, but if other ctxs have work to do, we don't
need to change task's stat at all. I think only if all ctxs don't have
work to do, we can do it.
  2. We use round-robin strategy to make multiple ctxs share one same
poll thread, but there are various condition in __io_sq_thread(), which
seems complicated and may affect round-robin strategy.

To improve above issues, I take below actions:
  1. If multiple ctxs share one same poll thread, only if all all ctxs
don't have work to do, we can call prepare_to_wait() and schedule() to
make poll thread enter sleep state.
  2. To make round-robin strategy more straight, I simplify
__io_sq_thread() a bit, it just does io poll and sqes submit work once,
does not check various condition.
  3. For multiple ctxs share one same poll thread, we choose the biggest
sq_thread_idle among these ctxs as timeout condition, and will update
it when ctx is in or out.
  4. Not need to check EBUSY especially, if io_submit_sqes() returns
EBUSY, IORING_SQ_CQ_OVERFLOW should be set, helper in liburing should
be aware of cq overflow and enters kernel to flush work.

Signed-off-by: default avatarXiaoguang Wang <xiaoguang.wang@linux.alibaba.com>
Signed-off-by: default avatarJens Axboe <axboe@kernel.dk>
parent f6edbabb
Loading
Loading
Loading
Loading
+67 −102
Original line number Diff line number Diff line
@@ -245,6 +245,8 @@ struct io_sq_data {

	struct task_struct	*thread;
	struct wait_queue_head	wait;

	unsigned		sq_thread_idle;
};

struct io_ring_ctx {
@@ -310,7 +312,6 @@ struct io_ring_ctx {
	struct io_sq_data	*sq_data;	/* if using sq thread polling */

	struct wait_queue_head	sqo_sq_wait;
	struct wait_queue_entry	sqo_wait_entry;
	struct list_head	sqd_list;

	/*
@@ -6841,111 +6842,49 @@ static inline void io_ring_clear_wakeup_flag(struct io_ring_ctx *ctx)
	spin_unlock_irq(&ctx->completion_lock);
}

static int io_sq_wake_function(struct wait_queue_entry *wqe, unsigned mode,
			       int sync, void *key)
{
	struct io_ring_ctx *ctx = container_of(wqe, struct io_ring_ctx, sqo_wait_entry);
	int ret;

	ret = autoremove_wake_function(wqe, mode, sync, key);
	if (ret) {
		unsigned long flags;

		spin_lock_irqsave(&ctx->completion_lock, flags);
		ctx->rings->sq_flags &= ~IORING_SQ_NEED_WAKEUP;
		spin_unlock_irqrestore(&ctx->completion_lock, flags);
	}
	return ret;
}

enum sq_ret {
	SQT_IDLE	= 1,
	SQT_SPIN	= 2,
	SQT_DID_WORK	= 4,
};

static enum sq_ret __io_sq_thread(struct io_ring_ctx *ctx,
				  unsigned long start_jiffies, bool cap_entries)
static int __io_sq_thread(struct io_ring_ctx *ctx, bool cap_entries)
{
	unsigned long timeout = start_jiffies + ctx->sq_thread_idle;
	struct io_sq_data *sqd = ctx->sq_data;
	unsigned int to_submit;
	int ret = 0;

again:
	if (!list_empty(&ctx->iopoll_list)) {
		unsigned nr_events = 0;

		mutex_lock(&ctx->uring_lock);
		if (!list_empty(&ctx->iopoll_list) && !need_resched())
		if (!list_empty(&ctx->iopoll_list))
			io_do_iopoll(ctx, &nr_events, 0);
		mutex_unlock(&ctx->uring_lock);
	}

	to_submit = io_sqring_entries(ctx);

	/*
	 * If submit got -EBUSY, flag us as needing the application
	 * to enter the kernel to reap and flush events.
	 */
	if (!to_submit || ret == -EBUSY || need_resched()) {
		/*
		 * Drop cur_mm before scheduling, we can't hold it for
		 * long periods (or over schedule()). Do this before
		 * adding ourselves to the waitqueue, as the unuse/drop
		 * may sleep.
		 */
		io_sq_thread_drop_mm_files();

		/*
		 * We're polling. If we're within the defined idle
		 * period, then let us spin without work before going
		 * to sleep. The exception is if we got EBUSY doing
		 * more IO, we should wait for the application to
		 * reap events and wake us up.
		 */
		if (!list_empty(&ctx->iopoll_list) || need_resched() ||
		    (!time_after(jiffies, timeout) && ret != -EBUSY &&
		    !percpu_ref_is_dying(&ctx->refs)))
			return SQT_SPIN;

		prepare_to_wait(&sqd->wait, &ctx->sqo_wait_entry,
					TASK_INTERRUPTIBLE);

		/*
		 * While doing polled IO, before going to sleep, we need
		 * to check if there are new reqs added to iopoll_list,
		 * it is because reqs may have been punted to io worker
		 * and will be added to iopoll_list later, hence check
		 * the iopoll_list again.
		 */
		if ((ctx->flags & IORING_SETUP_IOPOLL) &&
		    !list_empty_careful(&ctx->iopoll_list)) {
			finish_wait(&sqd->wait, &ctx->sqo_wait_entry);
			goto again;
		}

		to_submit = io_sqring_entries(ctx);
		if (!to_submit || ret == -EBUSY)
			return SQT_IDLE;
	}

	finish_wait(&sqd->wait, &ctx->sqo_wait_entry);
	io_ring_clear_wakeup_flag(ctx);

	/* if we're handling multiple rings, cap submit size for fairness */
	if (cap_entries && to_submit > 8)
		to_submit = 8;

	if (to_submit) {
		mutex_lock(&ctx->uring_lock);
		if (likely(!percpu_ref_is_dying(&ctx->refs)))
			ret = io_submit_sqes(ctx, to_submit);
		mutex_unlock(&ctx->uring_lock);
	}

	if (!io_sqring_full(ctx) && wq_has_sleeper(&ctx->sqo_sq_wait))
		wake_up(&ctx->sqo_sq_wait);

	return SQT_DID_WORK;
	return ret;
}

static void io_sqd_update_thread_idle(struct io_sq_data *sqd)
{
	struct io_ring_ctx *ctx;
	unsigned sq_thread_idle = 0;

	list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
		if (sq_thread_idle < ctx->sq_thread_idle)
			sq_thread_idle = ctx->sq_thread_idle;
	}

	sqd->sq_thread_idle = sq_thread_idle;
}

static void io_sqd_init_new(struct io_sq_data *sqd)
@@ -6954,11 +6893,11 @@ static void io_sqd_init_new(struct io_sq_data *sqd)

	while (!list_empty(&sqd->ctx_new_list)) {
		ctx = list_first_entry(&sqd->ctx_new_list, struct io_ring_ctx, sqd_list);
		init_wait(&ctx->sqo_wait_entry);
		ctx->sqo_wait_entry.func = io_sq_wake_function;
		list_move_tail(&ctx->sqd_list, &sqd->ctx_list);
		complete(&ctx->sq_thread_comp);
	}

	io_sqd_update_thread_idle(sqd);
}

static int io_sq_thread(void *data)
@@ -6969,17 +6908,17 @@ static int io_sq_thread(void *data)
	const struct cred *old_cred = NULL;
	struct io_sq_data *sqd = data;
	struct io_ring_ctx *ctx;
	unsigned long start_jiffies;
	unsigned long timeout;
	DEFINE_WAIT(wait);

	task_lock(current);
	current->files = NULL;
	current->nsproxy = NULL;
	task_unlock(current);

	start_jiffies = jiffies;
	while (!kthread_should_stop()) {
		enum sq_ret ret = 0;
		bool cap_entries;
		int ret;
		bool cap_entries, sqt_spin, needs_sched;

		/*
		 * Any changes to the sqd lists are synchronized through the
@@ -6989,11 +6928,13 @@ static int io_sq_thread(void *data)
		if (kthread_should_park())
			kthread_parkme();

		if (unlikely(!list_empty(&sqd->ctx_new_list)))
		if (unlikely(!list_empty(&sqd->ctx_new_list))) {
			io_sqd_init_new(sqd);
			timeout = jiffies + sqd->sq_thread_idle;
		}

		sqt_spin = false;
		cap_entries = !list_is_singular(&sqd->ctx_list);

		list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
			if (current->cred != ctx->creds) {
				if (old_cred)
@@ -7006,24 +6947,49 @@ static int io_sq_thread(void *data)
			current->sessionid = ctx->sessionid;
#endif

			ret |= __io_sq_thread(ctx, start_jiffies, cap_entries);
			ret = __io_sq_thread(ctx, cap_entries);
			if (!sqt_spin && (ret > 0 || !list_empty(&ctx->iopoll_list)))
				sqt_spin = true;

			io_sq_thread_drop_mm_files();
		}

		if (ret & SQT_SPIN) {
		if (sqt_spin || !time_after(jiffies, timeout)) {
			io_run_task_work();
			cond_resched();
		} else if (ret == SQT_IDLE) {
			if (sqt_spin)
				timeout = jiffies + sqd->sq_thread_idle;
			continue;
		}

		if (kthread_should_park())
			continue;

		needs_sched = true;
		prepare_to_wait(&sqd->wait, &wait, TASK_INTERRUPTIBLE);
		list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
			if ((ctx->flags & IORING_SETUP_IOPOLL) &&
			    !list_empty_careful(&ctx->iopoll_list)) {
				needs_sched = false;
				break;
			}
			if (io_sqring_entries(ctx)) {
				needs_sched = false;
				break;
			}
		}

		if (needs_sched) {
			list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
				io_ring_set_wakeup_flag(ctx);

			schedule();
			start_jiffies = jiffies;
			list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
				io_ring_clear_wakeup_flag(ctx);
		}

		finish_wait(&sqd->wait, &wait);
		timeout = jiffies + sqd->sq_thread_idle;
	}

	io_run_task_work();
@@ -7335,12 +7301,11 @@ static void io_sq_thread_stop(struct io_ring_ctx *ctx)

		mutex_lock(&sqd->ctx_lock);
		list_del(&ctx->sqd_list);
		io_sqd_update_thread_idle(sqd);
		mutex_unlock(&sqd->ctx_lock);

		if (sqd->thread) {
			finish_wait(&sqd->wait, &ctx->sqo_wait_entry);
		if (sqd->thread)
			io_sq_thread_unpark(sqd);
		}

		io_put_sq_data(sqd);
		ctx->sq_data = NULL;