Commit 69fb2131 authored by Jens Axboe's avatar Jens Axboe
Browse files

io_uring: base SQPOLL handling off io_sq_data



Remove the SQPOLL thread from the ctx, and use the io_sq_data as the
data structure we pass in. io_sq_data has a list of ctx's that we can
then iterate over and handle.

As of now we're ready to handle multiple ctx's, though we're still just
handling a single one after this patch.

Signed-off-by: default avatarJens Axboe <axboe@kernel.dk>
parent 534ca6d6
Loading
Loading
Loading
Loading
+101 −25
Original line number Diff line number Diff line
@@ -232,6 +232,13 @@ struct io_restriction {

struct io_sq_data {
	refcount_t		refs;
	struct mutex		lock;

	/* ctx's that are using this sqd */
	struct list_head	ctx_list;
	struct list_head	ctx_new_list;
	struct mutex		ctx_lock;

	struct task_struct	*thread;
	struct wait_queue_head	wait;
};
@@ -295,6 +302,7 @@ struct io_ring_ctx {
	struct io_sq_data	*sq_data;	/* if using sq thread polling */

	struct wait_queue_entry	sqo_wait_entry;
	struct list_head	sqd_list;

	/*
	 * If used, fixed file set. Writers must ensure that ->refs is dead,
@@ -1064,6 +1072,7 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
		goto err;

	ctx->flags = p->flags;
	INIT_LIST_HEAD(&ctx->sqd_list);
	init_waitqueue_head(&ctx->cq_wait);
	INIT_LIST_HEAD(&ctx->cq_overflow_list);
	init_completion(&ctx->ref_comp);
@@ -6715,8 +6724,6 @@ again:
			goto again;
		}

		io_ring_set_wakeup_flag(ctx);

		to_submit = io_sqring_entries(ctx);
		if (!to_submit || ret == -EBUSY)
			return SQT_IDLE;
@@ -6732,41 +6739,71 @@ again:
	return SQT_DID_WORK;
}

static int io_sq_thread(void *data)
static void io_sqd_init_new(struct io_sq_data *sqd)
{
	struct io_ring_ctx *ctx = data;
	const struct cred *old_cred;
	unsigned long start_jiffies;
	struct io_ring_ctx *ctx;

	while (!list_empty(&sqd->ctx_new_list)) {
		ctx = list_first_entry(&sqd->ctx_new_list, struct io_ring_ctx, sqd_list);
		init_wait(&ctx->sqo_wait_entry);
		ctx->sqo_wait_entry.func = io_sq_wake_function;

		list_move_tail(&ctx->sqd_list, &sqd->ctx_list);
		complete(&ctx->sq_thread_comp);
	}
}

	old_cred = override_creds(ctx->creds);
static int io_sq_thread(void *data)
{
	const struct cred *old_cred = NULL;
	struct io_sq_data *sqd = data;
	struct io_ring_ctx *ctx;
	unsigned long start_jiffies;

	start_jiffies = jiffies;
	while (!kthread_should_park()) {
		enum sq_ret ret;
	while (!kthread_should_stop()) {
		enum sq_ret ret = 0;

		ret = __io_sq_thread(ctx, start_jiffies);
		switch (ret) {
		case SQT_IDLE:
			schedule();
			start_jiffies = jiffies;
			continue;
		case SQT_SPIN:
		/*
		 * Any changes to the sqd lists are synchronized through the
		 * kthread parking. This synchronizes the thread vs users,
		 * the users are synchronized on the sqd->ctx_lock.
		 */
		if (kthread_should_park())
			kthread_parkme();

		if (unlikely(!list_empty(&sqd->ctx_new_list)))
			io_sqd_init_new(sqd);

		list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
			if (current->cred != ctx->creds) {
				if (old_cred)
					revert_creds(old_cred);
				old_cred = override_creds(ctx->creds);
			}

			ret |= __io_sq_thread(ctx, start_jiffies);

			io_sq_thread_drop_mm();
		}

		if (ret & SQT_SPIN) {
			io_run_task_work();
			cond_resched();
			fallthrough;
		case SQT_DID_WORK:
		} else if (ret == SQT_IDLE) {
			if (kthread_should_park())
				continue;
			list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
				io_ring_set_wakeup_flag(ctx);
			schedule();
			start_jiffies = jiffies;
			list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
				io_ring_clear_wakeup_flag(ctx);
		}
	}

	io_run_task_work();

	io_sq_thread_drop_mm();
	if (old_cred)
		revert_creds(old_cred);

	kthread_parkme();
@@ -6968,10 +7005,32 @@ static struct io_sq_data *io_get_sq_data(struct io_uring_params *p)
		return ERR_PTR(-ENOMEM);

	refcount_set(&sqd->refs, 1);
	INIT_LIST_HEAD(&sqd->ctx_list);
	INIT_LIST_HEAD(&sqd->ctx_new_list);
	mutex_init(&sqd->ctx_lock);
	mutex_init(&sqd->lock);
	init_waitqueue_head(&sqd->wait);
	return sqd;
}

static void io_sq_thread_unpark(struct io_sq_data *sqd)
	__releases(&sqd->lock)
{
	if (!sqd->thread)
		return;
	kthread_unpark(sqd->thread);
	mutex_unlock(&sqd->lock);
}

static void io_sq_thread_park(struct io_sq_data *sqd)
	__acquires(&sqd->lock)
{
	if (!sqd->thread)
		return;
	mutex_lock(&sqd->lock);
	kthread_park(sqd->thread);
}

static void io_sq_thread_stop(struct io_ring_ctx *ctx)
{
	struct io_sq_data *sqd = ctx->sq_data;
@@ -6986,6 +7045,17 @@ static void io_sq_thread_stop(struct io_ring_ctx *ctx)
			 */
			wake_up_process(sqd->thread);
			wait_for_completion(&ctx->sq_thread_comp);

			io_sq_thread_park(sqd);
		}

		mutex_lock(&sqd->ctx_lock);
		list_del(&ctx->sqd_list);
		mutex_unlock(&sqd->ctx_lock);

		if (sqd->thread) {
			finish_wait(&sqd->wait, &ctx->sqo_wait_entry);
			io_sq_thread_unpark(sqd);
		}

		io_put_sq_data(sqd);
@@ -7669,7 +7739,13 @@ static int io_sq_offload_create(struct io_ring_ctx *ctx,
			ret = PTR_ERR(sqd);
			goto err;
		}

		ctx->sq_data = sqd;
		io_sq_thread_park(sqd);
		mutex_lock(&sqd->ctx_lock);
		list_add(&ctx->sqd_list, &sqd->ctx_new_list);
		mutex_unlock(&sqd->ctx_lock);
		io_sq_thread_unpark(sqd);

		ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle);
		if (!ctx->sq_thread_idle)
@@ -7684,10 +7760,10 @@ static int io_sq_offload_create(struct io_ring_ctx *ctx,
			if (!cpu_online(cpu))
				goto err;

			sqd->thread = kthread_create_on_cpu(io_sq_thread, ctx,
			sqd->thread = kthread_create_on_cpu(io_sq_thread, sqd,
							cpu, "io_uring-sq");
		} else {
			sqd->thread = kthread_create(io_sq_thread, ctx,
			sqd->thread = kthread_create(io_sq_thread, sqd,
							"io_uring-sq");
		}
		if (IS_ERR(sqd->thread)) {