Commit fb5ccc98 authored by Pavel Begunkov's avatar Pavel Begunkov Committed by Jens Axboe
Browse files

io_uring: Fix broken links with offloading



io_sq_thread() processes sqes by 8 without considering links. As a
result, links will be randomely subdivided.

The easiest way to fix it is to call io_get_sqring() inside
io_submit_sqes() as do io_ring_submit().

Downsides:
1. This removes optimisation of not grabbing mm_struct for fixed files
2. It submitting all sqes in one go, without finer-grained sheduling
with cq processing.

Signed-off-by: default avatarPavel Begunkov <asml.silence@gmail.com>
Signed-off-by: default avatarJens Axboe <axboe@kernel.dk>
parent 84d55dc5
Loading
Loading
Loading
Loading
+33 −29
Original line number Diff line number Diff line
@@ -735,6 +735,14 @@ static unsigned io_cqring_events(struct io_rings *rings)
	return READ_ONCE(rings->cq.tail) - READ_ONCE(rings->cq.head);
}

static inline unsigned int io_sqring_entries(struct io_ring_ctx *ctx)
{
	struct io_rings *rings = ctx->rings;

	/* make sure SQ entry isn't read before tail */
	return smp_load_acquire(&rings->sq.tail) - ctx->cached_sq_head;
}

/*
 * Find and free completed poll iocbs
 */
@@ -2560,8 +2568,8 @@ static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s)
	return false;
}

static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,
			  unsigned int nr, bool has_user, bool mm_fault)
static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr,
			  bool has_user, bool mm_fault)
{
	struct io_submit_state state, *statep = NULL;
	struct io_kiocb *link = NULL;
@@ -2575,6 +2583,11 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,
	}

	for (i = 0; i < nr; i++) {
		struct sqe_submit s;

		if (!io_get_sqring(ctx, &s))
			break;

		/*
		 * If previous wasn't linked and we have a linked command,
		 * that's the end of the chain. Submit the previous link.
@@ -2584,9 +2597,9 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,
			link = NULL;
			shadow_req = NULL;
		}
		prev_was_link = (sqes[i].sqe->flags & IOSQE_IO_LINK) != 0;
		prev_was_link = (s.sqe->flags & IOSQE_IO_LINK) != 0;

		if (link && (sqes[i].sqe->flags & IOSQE_IO_DRAIN)) {
		if (link && (s.sqe->flags & IOSQE_IO_DRAIN)) {
			if (!shadow_req) {
				shadow_req = io_get_req(ctx, NULL);
				if (unlikely(!shadow_req))
@@ -2594,18 +2607,18 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,
				shadow_req->flags |= (REQ_F_IO_DRAIN | REQ_F_SHADOW_DRAIN);
				refcount_dec(&shadow_req->refs);
			}
			shadow_req->sequence = sqes[i].sequence;
			shadow_req->sequence = s.sequence;
		}

out:
		if (unlikely(mm_fault)) {
			io_cqring_add_event(ctx, sqes[i].sqe->user_data,
			io_cqring_add_event(ctx, s.sqe->user_data,
						-EFAULT);
		} else {
			sqes[i].has_user = has_user;
			sqes[i].needs_lock = true;
			sqes[i].needs_fixed_file = true;
			io_submit_sqe(ctx, &sqes[i], statep, &link);
			s.has_user = has_user;
			s.needs_lock = true;
			s.needs_fixed_file = true;
			io_submit_sqe(ctx, &s, statep, &link);
			submitted++;
		}
	}
@@ -2620,7 +2633,6 @@ out:

static int io_sq_thread(void *data)
{
	struct sqe_submit sqes[IO_IOPOLL_BATCH];
	struct io_ring_ctx *ctx = data;
	struct mm_struct *cur_mm = NULL;
	mm_segment_t old_fs;
@@ -2635,8 +2647,8 @@ static int io_sq_thread(void *data)

	timeout = inflight = 0;
	while (!kthread_should_park()) {
		bool all_fixed, mm_fault = false;
		int i;
		bool mm_fault = false;
		unsigned int to_submit;

		if (inflight) {
			unsigned nr_events = 0;
@@ -2656,7 +2668,8 @@ static int io_sq_thread(void *data)
				timeout = jiffies + ctx->sq_thread_idle;
		}

		if (!io_get_sqring(ctx, &sqes[0])) {
		to_submit = io_sqring_entries(ctx);
		if (!to_submit) {
			/*
			 * We're polling. If we're within the defined idle
			 * period, then let us spin without work before going
@@ -2687,7 +2700,8 @@ static int io_sq_thread(void *data)
			/* make sure to read SQ tail after writing flags */
			smp_mb();

			if (!io_get_sqring(ctx, &sqes[0])) {
			to_submit = io_sqring_entries(ctx);
			if (!to_submit) {
				if (kthread_should_park()) {
					finish_wait(&ctx->sqo_wait, &wait);
					break;
@@ -2705,19 +2719,8 @@ static int io_sq_thread(void *data)
			ctx->rings->sq_flags &= ~IORING_SQ_NEED_WAKEUP;
		}

		i = 0;
		all_fixed = true;
		do {
			if (all_fixed && io_sqe_needs_user(sqes[i].sqe))
				all_fixed = false;

			i++;
			if (i == ARRAY_SIZE(sqes))
				break;
		} while (io_get_sqring(ctx, &sqes[i]));

		/* Unless all new commands are FIXED regions, grab mm */
		if (!all_fixed && !cur_mm) {
		if (!cur_mm) {
			mm_fault = !mmget_not_zero(ctx->sqo_mm);
			if (!mm_fault) {
				use_mm(ctx->sqo_mm);
@@ -2725,7 +2728,8 @@ static int io_sq_thread(void *data)
			}
		}

		inflight += io_submit_sqes(ctx, sqes, i, cur_mm != NULL,
		to_submit = min(to_submit, ctx->sq_entries);
		inflight += io_submit_sqes(ctx, to_submit, cur_mm != NULL,
					   mm_fault);

		/* Commit SQ ring head once we've consumed all SQEs */