Commit 05bd375b authored by Linus Torvalds's avatar Linus Torvalds
Browse files

Merge tag 'for-5.5/io_uring-post-20191128' of git://git.kernel.dk/linux-block

Pull more io_uring updates from Jens Axboe:
 "As mentioned in the first pull request, there was a later batch as
  well. This contains fixes to the stuff that already went in, cleanups,
  and a few later additions. In particular, this contains:

   - Cleanups/fixes/unification of the submission and completion path
     (Pavel,me)

   - Linked timeouts improvements (Pavel,me)

   - Error path fixes (me)

   - Fix lookup window where cancellations wouldn't work (me)

   - Improve DRAIN support (Pavel)

   - Fix backlog flushing -EBUSY on submit (me)

   - Add support for connect(2) (me)

   - Fix for non-iter based fixed IO (Pavel)

   - creds inheritance for async workers (me)

   - Disable cmsg/ancillary data for sendmsg/recvmsg (me)

   - Shrink io_kiocb to 3 cachelines (me)

   - NUMA fix for io-wq (Jann)"

* tag 'for-5.5/io_uring-post-20191128' of git://git.kernel.dk/linux-block: (42 commits)
  io_uring: make poll->wait dynamically allocated
  io-wq: shrink io_wq_work a bit
  io-wq: fix handling of NUMA node IDs
  io_uring: use kzalloc instead of kcalloc for single-element allocations
  io_uring: cleanup io_import_fixed()
  io_uring: inline struct sqe_submit
  io_uring: store timeout's sqe->off in proper place
  net: disallow ancillary data for __sys_{send,recv}msg_file()
  net: separate out the msghdr copy from ___sys_{send,recv}msg()
  io_uring: remove superfluous check for sqe->off in io_accept()
  io_uring: async workers should inherit the user creds
  io-wq: have io_wq_create() take a 'data' argument
  io_uring: fix dead-hung for non-iter fixed rw
  io_uring: add support for IORING_OP_CONNECT
  net: add __sys_connect_file() helper
  io_uring: only return -EBUSY for submit on non-flushed backlog
  io_uring: only !null ptr to io_issue_sqe()
  io_uring: simplify io_req_link_next()
  io_uring: pass only !null to io_req_find_next()
  io_uring: remove io_free_req_find_next()
  ...
parents a6ed68d6 e944475e
Loading
Loading
Loading
Loading
+108 −79
Original line number Diff line number Diff line
@@ -33,6 +33,7 @@ enum {
enum {
	IO_WQ_BIT_EXIT		= 0,	/* wq exiting */
	IO_WQ_BIT_CANCEL	= 1,	/* cancel work on list */
	IO_WQ_BIT_ERROR		= 2,	/* error on setup */
};

enum {
@@ -56,6 +57,7 @@ struct io_worker {

	struct rcu_head rcu;
	struct mm_struct *mm;
	const struct cred *creds;
	struct files_struct *restore_files;
};

@@ -82,7 +84,7 @@ enum {
struct io_wqe {
	struct {
		spinlock_t lock;
		struct list_head work_list;
		struct io_wq_work_list work_list;
		unsigned long hash_map;
		unsigned flags;
	} ____cacheline_aligned_in_smp;
@@ -103,13 +105,13 @@ struct io_wqe {
struct io_wq {
	struct io_wqe **wqes;
	unsigned long state;
	unsigned nr_wqes;

	get_work_fn *get_work;
	put_work_fn *put_work;

	struct task_struct *manager;
	struct user_struct *user;
	struct cred *creds;
	struct mm_struct *mm;
	refcount_t refs;
	struct completion done;
@@ -135,6 +137,11 @@ static bool __io_worker_unuse(struct io_wqe *wqe, struct io_worker *worker)
{
	bool dropped_lock = false;

	if (worker->creds) {
		revert_creds(worker->creds);
		worker->creds = NULL;
	}

	if (current->files != worker->restore_files) {
		__acquire(&wqe->lock);
		spin_unlock_irq(&wqe->lock);
@@ -229,7 +236,8 @@ static void io_worker_exit(struct io_worker *worker)
static inline bool io_wqe_run_queue(struct io_wqe *wqe)
	__must_hold(wqe->lock)
{
	if (!list_empty(&wqe->work_list) && !(wqe->flags & IO_WQE_FLAG_STALLED))
	if (!wq_list_empty(&wqe->work_list) &&
	    !(wqe->flags & IO_WQE_FLAG_STALLED))
		return true;
	return false;
}
@@ -368,12 +376,15 @@ static bool __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker)
static struct io_wq_work *io_get_next_work(struct io_wqe *wqe, unsigned *hash)
	__must_hold(wqe->lock)
{
	struct io_wq_work_node *node, *prev;
	struct io_wq_work *work;

	list_for_each_entry(work, &wqe->work_list, list) {
	wq_list_for_each(node, prev, &wqe->work_list) {
		work = container_of(node, struct io_wq_work, list);

		/* not hashed, can run anytime */
		if (!(work->flags & IO_WQ_WORK_HASHED)) {
			list_del(&work->list);
			wq_node_del(&wqe->work_list, node, prev);
			return work;
		}

@@ -381,7 +392,7 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe, unsigned *hash)
		*hash = work->flags >> IO_WQ_HASH_SHIFT;
		if (!(wqe->hash_map & BIT_ULL(*hash))) {
			wqe->hash_map |= BIT_ULL(*hash);
			list_del(&work->list);
			wq_node_del(&wqe->work_list, node, prev);
			return work;
		}
	}
@@ -409,7 +420,7 @@ static void io_worker_handle_work(struct io_worker *worker)
		work = io_get_next_work(wqe, &hash);
		if (work)
			__io_worker_busy(wqe, worker, work);
		else if (!list_empty(&wqe->work_list))
		else if (!wq_list_empty(&wqe->work_list))
			wqe->flags |= IO_WQE_FLAG_STALLED;

		spin_unlock_irq(&wqe->lock);
@@ -426,6 +437,9 @@ next:
		worker->cur_work = work;
		spin_unlock_irq(&worker->lock);

		if (work->flags & IO_WQ_WORK_CB)
			work->func(&work);

		if ((work->flags & IO_WQ_WORK_NEEDS_FILES) &&
		    current->files != work->files) {
			task_lock(current);
@@ -438,6 +452,8 @@ next:
			set_fs(USER_DS);
			worker->mm = wq->mm;
		}
		if (!worker->creds)
			worker->creds = override_creds(wq->creds);
		if (test_bit(IO_WQ_BIT_CANCEL, &wq->state))
			work->flags |= IO_WQ_WORK_CANCEL;
		if (worker->mm)
@@ -514,7 +530,7 @@ static int io_wqe_worker(void *data)

	if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
		spin_lock_irq(&wqe->lock);
		if (!list_empty(&wqe->work_list))
		if (!wq_list_empty(&wqe->work_list))
			io_worker_handle_work(worker);
		else
			spin_unlock_irq(&wqe->lock);
@@ -562,14 +578,14 @@ void io_wq_worker_sleeping(struct task_struct *tsk)
	spin_unlock_irq(&wqe->lock);
}

static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
{
	struct io_wqe_acct *acct =&wqe->acct[index];
	struct io_worker *worker;

	worker = kcalloc_node(1, sizeof(*worker), GFP_KERNEL, wqe->node);
	worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node);
	if (!worker)
		return;
		return false;

	refcount_set(&worker->ref, 1);
	worker->nulls_node.pprev = NULL;
@@ -581,7 +597,7 @@ static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
				"io_wqe_worker-%d/%d", index, wqe->node);
	if (IS_ERR(worker->task)) {
		kfree(worker);
		return;
		return false;
	}

	spin_lock_irq(&wqe->lock);
@@ -599,6 +615,7 @@ static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
		atomic_inc(&wq->user->processes);

	wake_up_process(worker->task);
	return true;
}

static inline bool io_wqe_need_worker(struct io_wqe *wqe, int index)
@@ -606,9 +623,6 @@ static inline bool io_wqe_need_worker(struct io_wqe *wqe, int index)
{
	struct io_wqe_acct *acct = &wqe->acct[index];

	/* always ensure we have one bounded worker */
	if (index == IO_WQ_ACCT_BOUND && !acct->nr_workers)
		return true;
	/* if we have available workers or no work, no need */
	if (!hlist_nulls_empty(&wqe->free_list) || !io_wqe_run_queue(wqe))
		return false;
@@ -621,12 +635,22 @@ static inline bool io_wqe_need_worker(struct io_wqe *wqe, int index)
static int io_wq_manager(void *data)
{
	struct io_wq *wq = data;
	int workers_to_create = num_possible_nodes();
	int node;

	while (!kthread_should_stop()) {
		int i;
	/* create fixed workers */
	refcount_set(&wq->refs, workers_to_create);
	for_each_node(node) {
		if (!create_io_worker(wq, wq->wqes[node], IO_WQ_ACCT_BOUND))
			goto err;
		workers_to_create--;
	}

		for (i = 0; i < wq->nr_wqes; i++) {
			struct io_wqe *wqe = wq->wqes[i];
	complete(&wq->done);

	while (!kthread_should_stop()) {
		for_each_node(node) {
			struct io_wqe *wqe = wq->wqes[node];
			bool fork_worker[2] = { false, false };

			spin_lock_irq(&wqe->lock);
@@ -645,6 +669,12 @@ static int io_wq_manager(void *data)
	}

	return 0;
err:
	set_bit(IO_WQ_BIT_ERROR, &wq->state);
	set_bit(IO_WQ_BIT_EXIT, &wq->state);
	if (refcount_sub_and_test(workers_to_create, &wq->refs))
		complete(&wq->done);
	return 0;
}

static bool io_wq_can_queue(struct io_wqe *wqe, struct io_wqe_acct *acct,
@@ -688,7 +718,7 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
	}

	spin_lock_irqsave(&wqe->lock, flags);
	list_add_tail(&work->list, &wqe->work_list);
	wq_list_add_tail(&work->list, &wqe->work_list);
	wqe->flags &= ~IO_WQE_FLAG_STALLED;
	spin_unlock_irqrestore(&wqe->lock, flags);

@@ -750,7 +780,7 @@ static bool io_wq_for_each_worker(struct io_wqe *wqe,

void io_wq_cancel_all(struct io_wq *wq)
{
	int i;
	int node;

	set_bit(IO_WQ_BIT_CANCEL, &wq->state);

@@ -759,8 +789,8 @@ void io_wq_cancel_all(struct io_wq *wq)
	 * to a worker and the worker putting itself on the busy_list
	 */
	rcu_read_lock();
	for (i = 0; i < wq->nr_wqes; i++) {
		struct io_wqe *wqe = wq->wqes[i];
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];

		io_wq_for_each_worker(wqe, io_wqe_worker_send_sig, NULL);
	}
@@ -803,14 +833,17 @@ static enum io_wq_cancel io_wqe_cancel_cb_work(struct io_wqe *wqe,
		.cancel = cancel,
		.caller_data = cancel_data,
	};
	struct io_wq_work_node *node, *prev;
	struct io_wq_work *work;
	unsigned long flags;
	bool found = false;

	spin_lock_irqsave(&wqe->lock, flags);
	list_for_each_entry(work, &wqe->work_list, list) {
	wq_list_for_each(node, prev, &wqe->work_list) {
		work = container_of(node, struct io_wq_work, list);

		if (cancel(work, cancel_data)) {
			list_del(&work->list);
			wq_node_del(&wqe->work_list, node, prev);
			found = true;
			break;
		}
@@ -833,10 +866,10 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
				  void *data)
{
	enum io_wq_cancel ret = IO_WQ_CANCEL_NOTFOUND;
	int i;
	int node;

	for (i = 0; i < wq->nr_wqes; i++) {
		struct io_wqe *wqe = wq->wqes[i];
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];

		ret = io_wqe_cancel_cb_work(wqe, cancel, data);
		if (ret != IO_WQ_CANCEL_NOTFOUND)
@@ -868,6 +901,7 @@ static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe,
					    struct io_wq_work *cwork)
{
	struct io_wq_work_node *node, *prev;
	struct io_wq_work *work;
	unsigned long flags;
	bool found = false;
@@ -880,9 +914,11 @@ static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe,
	 * no completion will be posted for it.
	 */
	spin_lock_irqsave(&wqe->lock, flags);
	list_for_each_entry(work, &wqe->work_list, list) {
	wq_list_for_each(node, prev, &wqe->work_list) {
		work = container_of(node, struct io_wq_work, list);

		if (work == cwork) {
			list_del(&work->list);
			wq_node_del(&wqe->work_list, node, prev);
			found = true;
			break;
		}
@@ -910,10 +946,10 @@ static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe,
enum io_wq_cancel io_wq_cancel_work(struct io_wq *wq, struct io_wq_work *cwork)
{
	enum io_wq_cancel ret = IO_WQ_CANCEL_NOTFOUND;
	int i;
	int node;

	for (i = 0; i < wq->nr_wqes; i++) {
		struct io_wqe *wqe = wq->wqes[i];
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];

		ret = io_wqe_cancel_work(wqe, cwork);
		if (ret != IO_WQ_CANCEL_NOTFOUND)
@@ -944,10 +980,10 @@ static void io_wq_flush_func(struct io_wq_work **workptr)
void io_wq_flush(struct io_wq *wq)
{
	struct io_wq_flush_data data;
	int i;
	int node;

	for (i = 0; i < wq->nr_wqes; i++) {
		struct io_wqe *wqe = wq->wqes[i];
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];

		init_completion(&data.done);
		INIT_IO_WORK(&data.work, io_wq_flush_func);
@@ -957,43 +993,39 @@ void io_wq_flush(struct io_wq *wq)
	}
}

struct io_wq *io_wq_create(unsigned bounded, struct mm_struct *mm,
			   struct user_struct *user, get_work_fn *get_work,
			   put_work_fn *put_work)
struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
{
	int ret = -ENOMEM, i, node;
	int ret = -ENOMEM, node;
	struct io_wq *wq;

	wq = kcalloc(1, sizeof(*wq), GFP_KERNEL);
	wq = kzalloc(sizeof(*wq), GFP_KERNEL);
	if (!wq)
		return ERR_PTR(-ENOMEM);

	wq->nr_wqes = num_online_nodes();
	wq->wqes = kcalloc(wq->nr_wqes, sizeof(struct io_wqe *), GFP_KERNEL);
	wq->wqes = kcalloc(nr_node_ids, sizeof(struct io_wqe *), GFP_KERNEL);
	if (!wq->wqes) {
		kfree(wq);
		return ERR_PTR(-ENOMEM);
	}

	wq->get_work = get_work;
	wq->put_work = put_work;
	wq->get_work = data->get_work;
	wq->put_work = data->put_work;

	/* caller must already hold a reference to this */
	wq->user = user;
	wq->user = data->user;
	wq->creds = data->creds;

	i = 0;
	refcount_set(&wq->refs, wq->nr_wqes);
	for_each_online_node(node) {
	for_each_node(node) {
		struct io_wqe *wqe;

		wqe = kcalloc_node(1, sizeof(struct io_wqe), GFP_KERNEL, node);
		wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, node);
		if (!wqe)
			break;
		wq->wqes[i] = wqe;
			goto err;
		wq->wqes[node] = wqe;
		wqe->node = node;
		wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
		atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0);
		if (user) {
		if (wq->user) {
			wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
					task_rlimit(current, RLIMIT_NPROC);
		}
@@ -1001,33 +1033,36 @@ struct io_wq *io_wq_create(unsigned bounded, struct mm_struct *mm,
		wqe->node = node;
		wqe->wq = wq;
		spin_lock_init(&wqe->lock);
		INIT_LIST_HEAD(&wqe->work_list);
		INIT_WQ_LIST(&wqe->work_list);
		INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
		INIT_HLIST_NULLS_HEAD(&wqe->busy_list, 1);
		INIT_LIST_HEAD(&wqe->all_list);

		i++;
	}

	init_completion(&wq->done);

	if (i != wq->nr_wqes)
		goto err;

	/* caller must have already done mmgrab() on this mm */
	wq->mm = mm;
	wq->mm = data->mm;

	wq->manager = kthread_create(io_wq_manager, wq, "io_wq_manager");
	if (!IS_ERR(wq->manager)) {
		wake_up_process(wq->manager);
		wait_for_completion(&wq->done);
		if (test_bit(IO_WQ_BIT_ERROR, &wq->state)) {
			ret = -ENOMEM;
			goto err;
		}
		reinit_completion(&wq->done);
		return wq;
	}

	ret = PTR_ERR(wq->manager);
	wq->manager = NULL;
err:
	complete(&wq->done);
	io_wq_destroy(wq);
err:
	for_each_node(node)
		kfree(wq->wqes[node]);
	kfree(wq->wqes);
	kfree(wq);
	return ERR_PTR(ret);
}

@@ -1039,27 +1074,21 @@ static bool io_wq_worker_wake(struct io_worker *worker, void *data)

void io_wq_destroy(struct io_wq *wq)
{
	int i;
	int node;

	if (wq->manager) {
	set_bit(IO_WQ_BIT_EXIT, &wq->state);
	if (wq->manager)
		kthread_stop(wq->manager);
	}

	rcu_read_lock();
	for (i = 0; i < wq->nr_wqes; i++) {
		struct io_wqe *wqe = wq->wqes[i];

		if (!wqe)
			continue;
		io_wq_for_each_worker(wqe, io_wq_worker_wake, NULL);
	}
	for_each_node(node)
		io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL);
	rcu_read_unlock();

	wait_for_completion(&wq->done);

	for (i = 0; i < wq->nr_wqes; i++)
		kfree(wq->wqes[i]);
	for_each_node(node)
		kfree(wq->wqes[node]);
	kfree(wq->wqes);
	kfree(wq);
}
+58 −5
Original line number Diff line number Diff line
@@ -11,6 +11,7 @@ enum {
	IO_WQ_WORK_NEEDS_FILES	= 16,
	IO_WQ_WORK_UNBOUND	= 32,
	IO_WQ_WORK_INTERNAL	= 64,
	IO_WQ_WORK_CB		= 128,

	IO_WQ_HASH_SHIFT	= 24,	/* upper 8 bits are used for hash key */
};
@@ -21,15 +22,60 @@ enum io_wq_cancel {
	IO_WQ_CANCEL_NOTFOUND,	/* work not found */
};

struct io_wq_work_node {
	struct io_wq_work_node *next;
};

struct io_wq_work_list {
	struct io_wq_work_node *first;
	struct io_wq_work_node *last;
};

static inline void wq_list_add_tail(struct io_wq_work_node *node,
				    struct io_wq_work_list *list)
{
	if (!list->first) {
		list->first = list->last = node;
	} else {
		list->last->next = node;
		list->last = node;
	}
}

static inline void wq_node_del(struct io_wq_work_list *list,
			       struct io_wq_work_node *node,
			       struct io_wq_work_node *prev)
{
	if (node == list->first)
		list->first = node->next;
	if (node == list->last)
		list->last = prev;
	if (prev)
		prev->next = node->next;
}

#define wq_list_for_each(pos, prv, head)			\
	for (pos = (head)->first, prv = NULL; pos; prv = pos, pos = (pos)->next)

#define wq_list_empty(list)	((list)->first == NULL)
#define INIT_WQ_LIST(list)	do {				\
	(list)->first = NULL;					\
	(list)->last = NULL;					\
} while (0)

struct io_wq_work {
	struct list_head list;
	union {
		struct io_wq_work_node list;
		void *data;
	};
	void (*func)(struct io_wq_work **);
	unsigned flags;
	struct files_struct *files;
	unsigned flags;
};

#define INIT_IO_WORK(work, _func)			\
	do {						\
		(work)->list.next = NULL;		\
		(work)->func = _func;			\
		(work)->flags = 0;			\
		(work)->files = NULL;			\
@@ -38,9 +84,16 @@ struct io_wq_work {
typedef void (get_work_fn)(struct io_wq_work *);
typedef void (put_work_fn)(struct io_wq_work *);

struct io_wq *io_wq_create(unsigned bounded, struct mm_struct *mm,
				struct user_struct *user,
				get_work_fn *get_work, put_work_fn *put_work);
struct io_wq_data {
	struct mm_struct *mm;
	struct user_struct *user;
	struct cred *creds;

	get_work_fn *get_work;
	put_work_fn *put_work;
};

struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data);
void io_wq_destroy(struct io_wq *wq);

void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work);
Loading