Commit 4eea0fef authored by Ilya Dryomov's avatar Ilya Dryomov
Browse files

libceph: use for_each_request() in ceph_osdc_abort_on_full()



Scanning the trees just to see if there is anything to abort is
unnecessary -- all that is needed here is to update the epoch barrier
first, before we start aborting.  Simplify and do the update inside the
loop before calling abort_request() for the first time.

The switch to for_each_request() also fixes a bug: homeless requests
weren't even considered for aborting.

Signed-off-by: default avatarIlya Dryomov <idryomov@gmail.com>
Acked-by: default avatarJeff Layton <jlayton@redhat.com>
Reviewed-by: default avatar"Yan, Zheng" <zyan@redhat.com>
parent 88bc1922
Loading
Loading
Loading
Loading
+26 −53
Original line number Diff line number Diff line
@@ -2434,68 +2434,41 @@ void ceph_osdc_update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb)
EXPORT_SYMBOL(ceph_osdc_update_epoch_barrier);

/*
 * Drop all pending requests that are stalled waiting on a full condition to
 * clear, and complete them with ENOSPC as the return code. Set the
 * osdc->epoch_barrier to the latest map epoch that we've seen if any were
 * cancelled.
 * We can end up releasing caps as a result of abort_request().
 * In that case, we probably want to ensure that the cap release message
 * has an updated epoch barrier in it, so set the epoch barrier prior to
 * aborting the first request.
 */
static void ceph_osdc_abort_on_full(struct ceph_osd_client *osdc)
static int abort_on_full_fn(struct ceph_osd_request *req, void *arg)
{
	struct rb_node *n;
	bool victims = false;

	dout("enter abort_on_full\n");

	if (!ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) && !have_pool_full(osdc))
		goto out;

	/* Scan list and see if there is anything to abort */
	for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
		struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
		struct rb_node *m;

		m = rb_first(&osd->o_requests);
		while (m) {
			struct ceph_osd_request *req = rb_entry(m,
					struct ceph_osd_request, r_node);
			m = rb_next(m);
	struct ceph_osd_client *osdc = req->r_osdc;
	bool *victims = arg;

			if (req->r_abort_on_full) {
				victims = true;
				break;
			}
	if (req->r_abort_on_full &&
	    (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
	     pool_full(osdc, req->r_t.target_oloc.pool))) {
		if (!*victims) {
			update_epoch_barrier(osdc, osdc->osdmap->epoch);
			*victims = true;
		}
		if (victims)
			break;
		abort_request(req, -ENOSPC);
	}

	if (!victims)
		goto out;
	return 0; /* continue iteration */
}

/*
	 * Update the barrier to current epoch if it's behind that point,
	 * since we know we have some calls to be aborted in the tree.
 * Drop all pending requests that are stalled waiting on a full condition to
 * clear, and complete them with ENOSPC as the return code. Set the
 * osdc->epoch_barrier to the latest map epoch that we've seen if any were
 * cancelled.
 */
	update_epoch_barrier(osdc, osdc->osdmap->epoch);

	for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
		struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
		struct rb_node *m;

		m = rb_first(&osd->o_requests);
		while (m) {
			struct ceph_osd_request *req = rb_entry(m,
					struct ceph_osd_request, r_node);
			m = rb_next(m);
static void ceph_osdc_abort_on_full(struct ceph_osd_client *osdc)
{
	bool victims = false;

			if (req->r_abort_on_full &&
			    (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
			     pool_full(osdc, req->r_t.target_oloc.pool)))
				abort_request(req, -ENOSPC);
		}
	}
out:
	dout("return abort_on_full barrier=%u\n", osdc->epoch_barrier);
	if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || have_pool_full(osdc))
		for_each_request(osdc, abort_on_full_fn, &victims);
}

static void check_pool_dne(struct ceph_osd_request *req)