Commit a34a8bfd authored by Kent Overstreet's avatar Kent Overstreet
Browse files

bcache: Refactor journalling flow control



Making things less asynchronous that don't need to be - bch_journal()
only has to block when the journal or journal entry is full, which is
emphatically not a fast path. So make it a normal function that just
returns when it finishes, to make the code and control flow easier to
follow.

Signed-off-by: default avatarKent Overstreet <kmo@daterainc.com>
parent cdd972b1
Loading
Loading
Loading
Loading
+0 −3
Original line number Diff line number Diff line
@@ -2164,9 +2164,6 @@ int bch_btree_insert(struct btree_op *op, struct cache_set *c,
		}
	}

	if (op->journal)
		atomic_dec_bug(op->journal);
	op->journal = NULL;
	return ret;
}

+1 −1
Original line number Diff line number Diff line
@@ -642,7 +642,7 @@ do { \
#define continue_at_nobarrier(_cl, _fn, _wq)				\
do {									\
	set_closure_fn(_cl, _fn, _wq);					\
	closure_queue(cl);						\
	closure_queue(_cl);						\
	return;								\
} while (0)

+100 −113
Original line number Diff line number Diff line
@@ -318,7 +318,6 @@ int bch_journal_replay(struct cache_set *s, struct list_head *list,
			bch_keylist_push(&op->keys);

			op->journal = i->pin;
			atomic_inc(op->journal);

			ret = bch_btree_insert(op, s, &op->keys);
			if (ret)
@@ -357,48 +356,35 @@ static void btree_flush_write(struct cache_set *c)
	 * Try to find the btree node with that references the oldest journal
	 * entry, best is our current candidate and is locked if non NULL:
	 */
	struct btree *b, *best = NULL;
	unsigned iter;

	for_each_cached_btree(b, c, iter) {
		if (!down_write_trylock(&b->lock))
			continue;

		if (!btree_node_dirty(b) ||
		    !btree_current_write(b)->journal) {
			rw_unlock(true, b);
			continue;
		}
	struct btree *b, *best;
	unsigned i;
retry:
	best = NULL;

	for_each_cached_btree(b, c, i)
		if (btree_current_write(b)->journal) {
			if (!best)
				best = b;
			else if (journal_pin_cmp(c,
						 btree_current_write(best),
						 btree_current_write(b))) {
			rw_unlock(true, best);
				best = b;
		} else
			rw_unlock(true, b);
			}
		}

	if (best)
		goto out;
	b = best;
	if (b) {
		rw_lock(true, b, b->level);

	/* We can't find the best btree node, just pick the first */
	list_for_each_entry(b, &c->btree_cache, list)
		if (!b->level && btree_node_dirty(b)) {
			best = b;
			rw_lock(true, best, best->level);
			goto found;
		if (!btree_current_write(b)->journal) {
			rw_unlock(true, b);
			/* We raced */
			goto retry;
		}

out:
	if (!best)
		return;
found:
	if (btree_node_dirty(best))
		bch_btree_node_write(best, NULL);
	rw_unlock(true, best);
		bch_btree_node_write(b, NULL);
		rw_unlock(true, b);
	}
}

#define last_seq(j)	((j)->seq - fifo_used(&(j)->pin) + 1)
@@ -494,7 +480,7 @@ static void journal_reclaim(struct cache_set *c)
		do_journal_discard(ca);

	if (c->journal.blocks_free)
		return;
		goto out;

	/*
	 * Allocate:
@@ -520,7 +506,7 @@ static void journal_reclaim(struct cache_set *c)

	if (n)
		c->journal.blocks_free = c->sb.bucket_size >> c->block_bits;

out:
	if (!journal_full(&c->journal))
		__closure_wake_up(&c->journal.wait);
}
@@ -659,7 +645,7 @@ static void journal_write(struct closure *cl)
	journal_write_unlocked(cl);
}

static void __journal_try_write(struct cache_set *c, bool noflush)
static void journal_try_write(struct cache_set *c)
	__releases(c->journal.lock)
{
	struct closure *cl = &c->journal.io;
@@ -667,29 +653,59 @@ static void __journal_try_write(struct cache_set *c, bool noflush)

	w->need_write = true;

	if (!closure_trylock(cl, &c->cl))
		spin_unlock(&c->journal.lock);
	else if (noflush && journal_full(&c->journal)) {
		spin_unlock(&c->journal.lock);
		continue_at(cl, journal_write, system_wq);
	} else
	if (closure_trylock(cl, &c->cl))
		journal_write_unlocked(cl);
	else
		spin_unlock(&c->journal.lock);
}

#define journal_try_write(c)	__journal_try_write(c, false)

void bch_journal_meta(struct cache_set *c, struct closure *cl)
static struct journal_write *journal_wait_for_write(struct cache_set *c,
						    unsigned nkeys)
{
	struct journal_write *w;
	size_t sectors;
	struct closure cl;

	closure_init_stack(&cl);

	if (CACHE_SYNC(&c->sb)) {
	spin_lock(&c->journal.lock);
		w = c->journal.cur;

		if (cl)
			BUG_ON(!closure_wait(&w->wait, cl));
	while (1) {
		struct journal_write *w = c->journal.cur;

		sectors = __set_blocks(w->data, w->data->keys + nkeys,
				       c) * c->sb.block_size;

		if (sectors <= min_t(size_t,
				     c->journal.blocks_free * c->sb.block_size,
				     PAGE_SECTORS << JSET_BITS))
			return w;

		/* XXX: tracepoint */
		if (!journal_full(&c->journal)) {
			trace_bcache_journal_entry_full(c);

			/*
			 * XXX: If we were inserting so many keys that they
			 * won't fit in an _empty_ journal write, we'll
			 * deadlock. For now, handle this in
			 * bch_keylist_realloc() - but something to think about.
			 */
			BUG_ON(!w->data->keys);

			closure_wait(&w->wait, &cl);
			journal_try_write(c); /* unlocks */
		} else {
			trace_bcache_journal_full(c);

		__journal_try_write(c, true);
			closure_wait(&c->journal.wait, &cl);
			journal_reclaim(c);
			spin_unlock(&c->journal.lock);

			btree_flush_write(c);
		}

		closure_sync(&cl);
		spin_lock(&c->journal.lock);
	}
}

@@ -708,68 +724,26 @@ static void journal_write_work(struct work_struct *work)
 * bch_journal() hands those same keys off to btree_insert_async()
 */

void bch_journal(struct closure *cl)
atomic_t *bch_journal(struct cache_set *c,
		      struct keylist *keys,
		      struct closure *parent)
{
	struct btree_op *op = container_of(cl, struct btree_op, cl);
	struct cache_set *c = op->c;
	struct journal_write *w;
	size_t sectors, nkeys;

	if (op->type != BTREE_INSERT ||
	    !CACHE_SYNC(&c->sb))
		goto out;

	/*
	 * If we're looping because we errored, might already be waiting on
	 * another journal write:
	 */
	while (atomic_read(&cl->parent->remaining) & CLOSURE_WAITING)
		closure_sync(cl->parent);
	atomic_t *ret;

	spin_lock(&c->journal.lock);
	if (!CACHE_SYNC(&c->sb))
		return NULL;

	if (journal_full(&c->journal)) {
		trace_bcache_journal_full(c);
	w = journal_wait_for_write(c, bch_keylist_nkeys(keys));

		closure_wait(&c->journal.wait, cl);
	memcpy(end(w->data), keys->keys, bch_keylist_bytes(keys));
	w->data->keys += bch_keylist_nkeys(keys);

		journal_reclaim(c);
		spin_unlock(&c->journal.lock);
	ret = &fifo_back(&c->journal.pin);
	atomic_inc(ret);

		btree_flush_write(c);
		continue_at(cl, bch_journal, bcache_wq);
	}

	w = c->journal.cur;
	nkeys = w->data->keys + bch_keylist_nkeys(&op->keys);
	sectors = __set_blocks(w->data, nkeys, c) * c->sb.block_size;

	if (sectors > min_t(size_t,
			    c->journal.blocks_free * c->sb.block_size,
			    PAGE_SECTORS << JSET_BITS)) {
		trace_bcache_journal_entry_full(c);

		/*
		 * XXX: If we were inserting so many keys that they won't fit in
		 * an _empty_ journal write, we'll deadlock. For now, handle
		 * this in bch_keylist_realloc() - but something to think about.
		 */
		BUG_ON(!w->data->keys);

		BUG_ON(!closure_wait(&w->wait, cl));

		journal_try_write(c);
		continue_at(cl, bch_journal, bcache_wq);
	}

	memcpy(end(w->data), op->keys.keys, bch_keylist_bytes(&op->keys));
	w->data->keys += bch_keylist_nkeys(&op->keys);

	op->journal = &fifo_back(&c->journal.pin);
	atomic_inc(op->journal);

	if (op->flush_journal) {
		closure_wait(&w->wait, cl->parent);
	if (parent) {
		closure_wait(&w->wait, parent);
		journal_try_write(c);
	} else if (!w->need_write) {
		schedule_delayed_work(&c->journal.work,
@@ -778,8 +752,21 @@ void bch_journal(struct closure *cl)
	} else {
		spin_unlock(&c->journal.lock);
	}
out:
	bch_btree_insert_async(cl);


	return ret;
}

void bch_journal_meta(struct cache_set *c, struct closure *cl)
{
	struct keylist keys;
	atomic_t *ref;

	bch_keylist_init(&keys);

	ref = bch_journal(c, &keys, cl);
	if (ref)
		atomic_dec_bug(ref);
}

void bch_journal_free(struct cache_set *c)
+2 −1
Original line number Diff line number Diff line
@@ -200,8 +200,9 @@ struct journal_device {
struct closure;
struct cache_set;
struct btree_op;
struct keylist;

void bch_journal(struct closure *);
atomic_t *bch_journal(struct cache_set *, struct keylist *, struct closure *);
void bch_journal_next(struct journal *);
void bch_journal_mark(struct cache_set *, struct list_head *);
void bch_journal_meta(struct cache_set *, struct closure *);
+1 −1
Original line number Diff line number Diff line
@@ -110,7 +110,7 @@ static void write_moving(struct closure *cl)
		bkey_copy(&s->op.replace, &io->w->key);

		closure_init(&s->op.cl, cl);
		bch_insert_data(&s->op.cl);
		bch_data_insert(&s->op.cl);
	}

	continue_at(cl, write_moving_finish, bch_gc_wq);
Loading