Commit 505bbe64 authored by Chuck Lever's avatar Chuck Lever Committed by Anna Schumaker
Browse files

xprtrdma: Refactor MR recovery work queues



I found that commit ead3f26e ("xprtrdma: Add ro_unmap_safe
memreg method"), which introduces ro_unmap_safe, never wired up the
FMR recovery worker.

The FMR and FRWR recovery work queues both do the same thing.
Instead of setting up separate individual work queues for this,
schedule a delayed worker to deal with them, since recovering MRs is
not performance-critical.

Fixes: ead3f26e ("xprtrdma: Add ro_unmap_safe memreg method")
Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
Tested-by: default avatarSteve Wise <swise@opengridcomputing.com>
Signed-off-by: default avatarAnna Schumaker <Anna.Schumaker@Netapp.com>
parent fcdfb968
Loading
Loading
Loading
Loading
+59 −88
Original line number Diff line number Diff line
@@ -19,13 +19,6 @@
 * verb (fmr_op_unmap).
 */

/* Transport recovery
 *
 * After a transport reconnect, fmr_op_map re-uses the MR already
 * allocated for the RPC, but generates a fresh rkey then maps the
 * MR again. This process is synchronous.
 */

#include "xprt_rdma.h"

#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
@@ -41,30 +34,6 @@ enum {
					  IB_ACCESS_REMOTE_READ,
};

static struct workqueue_struct *fmr_recovery_wq;

#define FMR_RECOVERY_WQ_FLAGS		(WQ_UNBOUND)

int
fmr_alloc_recovery_wq(void)
{
	fmr_recovery_wq = alloc_workqueue("fmr_recovery", WQ_UNBOUND, 0);
	return !fmr_recovery_wq ? -ENOMEM : 0;
}

void
fmr_destroy_recovery_wq(void)
{
	struct workqueue_struct *wq;

	if (!fmr_recovery_wq)
		return;

	wq = fmr_recovery_wq;
	fmr_recovery_wq = NULL;
	destroy_workqueue(wq);
}

static int
__fmr_init(struct rpcrdma_mw *mw, struct ib_pd *pd)
{
@@ -115,66 +84,56 @@ __fmr_unmap(struct rpcrdma_mw *mw)
	return rc;
}

static void
__fmr_dma_unmap(struct rpcrdma_mw *mw)
{
	struct rpcrdma_xprt *r_xprt = mw->mw_xprt;

	ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
			mw->mw_sg, mw->mw_nents, mw->mw_dir);
	rpcrdma_put_mw(r_xprt, mw);
}

static void
__fmr_reset_and_unmap(struct rpcrdma_mw *mw)
{
	int rc;

	/* ORDER */
	rc = __fmr_unmap(mw);
	if (rc) {
		pr_warn("rpcrdma: ib_unmap_fmr status %d, fmr %p orphaned\n",
			rc, mw);
		return;
	}
	__fmr_dma_unmap(mw);
}

static void
__fmr_release(struct rpcrdma_mw *r)
{
	LIST_HEAD(unmap_list);
	int rc;

	kfree(r->fmr.fm_physaddrs);
	kfree(r->mw_sg);

	/* In case this one was left mapped, try to unmap it
	 * to prevent dealloc_fmr from failing with EBUSY
	 */
	rc = __fmr_unmap(r);
	if (rc)
		pr_err("rpcrdma: final ib_unmap_fmr for %p failed %i\n",
		       r, rc);

	rc = ib_dealloc_fmr(r->fmr.fm_mr);
	if (rc)
		pr_err("rpcrdma: final ib_dealloc_fmr for %p returned %i\n",
		       r, rc);
}

/* Deferred reset of a single FMR. Generate a fresh rkey by
 * replacing the MR. There's no recovery if this fails.
/* Reset of a single FMR.
 *
 * There's no recovery if this fails. The FMR is abandoned, but
 * remains in rb_all. It will be cleaned up when the transport is
 * destroyed.
 */
static void
__fmr_recovery_worker(struct work_struct *work)
fmr_op_recover_mr(struct rpcrdma_mw *mw)
{
	struct rpcrdma_mw *mw = container_of(work, struct rpcrdma_mw,
					     mw_work);
	struct rpcrdma_xprt *r_xprt = mw->mw_xprt;
	int rc;

	/* ORDER: invalidate first */
	rc = __fmr_unmap(mw);

	__fmr_reset_and_unmap(mw);
	/* ORDER: then DMA unmap */
	ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
			mw->mw_sg, mw->mw_nents, mw->mw_dir);
	if (rc) {
		pr_err("rpcrdma: FMR reset status %d, %p orphaned\n",
		       rc, mw);
		r_xprt->rx_stats.mrs_orphaned++;
		return;
	}

/* A broken MR was discovered in a context that can't sleep.
 * Defer recovery to the recovery worker.
 */
static void
__fmr_queue_recovery(struct rpcrdma_mw *mw)
{
	INIT_WORK(&mw->mw_work, __fmr_recovery_worker);
	queue_work(fmr_recovery_wq, &mw->mw_work);
	rpcrdma_put_mw(r_xprt, mw);
	r_xprt->rx_stats.mrs_recovered++;
}

static int
@@ -245,16 +204,11 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,

	mw = seg1->rl_mw;
	seg1->rl_mw = NULL;
	if (!mw) {
	if (mw)
		rpcrdma_defer_mr_recovery(mw);
	mw = rpcrdma_get_mw(r_xprt);
	if (!mw)
		return -ENOMEM;
	} else {
		/* this is a retransmit; generate a fresh rkey */
		rc = __fmr_unmap(mw);
		if (rc)
			return rc;
	}

	pageoff = offset_in_page(seg1->mr_offset);
	seg1->mr_offset -= pageoff;	/* start of page */
@@ -309,7 +263,7 @@ out_maperr:
	pr_err("rpcrdma: ib_map_phys_fmr %u@0x%llx+%i (%d) status %i\n",
	       len, (unsigned long long)dma_pages[0],
	       pageoff, mw->mw_nents, rc);
	__fmr_dma_unmap(mw);
	rpcrdma_defer_mr_recovery(mw);
	return rc;
}

@@ -332,7 +286,7 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
	/* ORDER: Invalidate all of the req's MRs first
	 *
	 * ib_unmap_fmr() is slow, so use a single call instead
	 * of one call per mapped MR.
	 * of one call per mapped FMR.
	 */
	for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) {
		seg = &req->rl_segments[i];
@@ -344,7 +298,7 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
	}
	rc = ib_unmap_fmr(&unmap_list);
	if (rc)
		pr_warn("%s: ib_unmap_fmr failed (%i)\n", __func__, rc);
		goto out_reset;

	/* ORDER: Now DMA unmap all of the req's MRs, and return
	 * them to the free MW list.
@@ -354,7 +308,9 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
		mw = seg->rl_mw;

		list_del_init(&mw->fmr.fm_mr->list);
		__fmr_dma_unmap(mw);
		ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
				mw->mw_sg, mw->mw_nents, mw->mw_dir);
		rpcrdma_put_mw(r_xprt, mw);

		i += seg->mr_nsegs;
		seg->mr_nsegs = 0;
@@ -362,6 +318,20 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
	}

	req->rl_nchunks = 0;
	return;

out_reset:
	pr_err("rpcrdma: ib_unmap_fmr failed (%i)\n", rc);

	for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) {
		seg = &req->rl_segments[i];
		mw = seg->rl_mw;

		list_del_init(&mw->fmr.fm_mr->list);
		fmr_op_recover_mr(mw);

		i += seg->mr_nsegs;
	}
}

/* Use a slow, safe mechanism to invalidate all memory regions
@@ -380,9 +350,9 @@ fmr_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
		mw = seg->rl_mw;

		if (sync)
			__fmr_reset_and_unmap(mw);
			fmr_op_recover_mr(mw);
		else
			__fmr_queue_recovery(mw);
			rpcrdma_defer_mr_recovery(mw);

		i += seg->mr_nsegs;
		seg->mr_nsegs = 0;
@@ -407,6 +377,7 @@ const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops = {
	.ro_map				= fmr_op_map,
	.ro_unmap_sync			= fmr_op_unmap_sync,
	.ro_unmap_safe			= fmr_op_unmap_safe,
	.ro_recover_mr			= fmr_op_recover_mr,
	.ro_open			= fmr_op_open,
	.ro_maxpages			= fmr_op_maxpages,
	.ro_init			= fmr_op_init,
+20 −62
Original line number Diff line number Diff line
@@ -73,31 +73,6 @@
# define RPCDBG_FACILITY	RPCDBG_TRANS
#endif

static struct workqueue_struct *frwr_recovery_wq;

#define FRWR_RECOVERY_WQ_FLAGS		(WQ_UNBOUND | WQ_MEM_RECLAIM)

int
frwr_alloc_recovery_wq(void)
{
	frwr_recovery_wq = alloc_workqueue("frwr_recovery",
					   FRWR_RECOVERY_WQ_FLAGS, 0);
	return !frwr_recovery_wq ? -ENOMEM : 0;
}

void
frwr_destroy_recovery_wq(void)
{
	struct workqueue_struct *wq;

	if (!frwr_recovery_wq)
		return;

	wq = frwr_recovery_wq;
	frwr_recovery_wq = NULL;
	destroy_workqueue(wq);
}

static int
__frwr_init(struct rpcrdma_mw *r, struct ib_pd *pd, unsigned int depth)
{
@@ -168,8 +143,14 @@ __frwr_reset_mr(struct rpcrdma_ia *ia, struct rpcrdma_mw *r)
	return 0;
}

/* Reset of a single FRMR. Generate a fresh rkey by replacing the MR.
 *
 * There's no recovery if this fails. The FRMR is abandoned, but
 * remains in rb_all. It will be cleaned up when the transport is
 * destroyed.
 */
static void
__frwr_reset_and_unmap(struct rpcrdma_mw *mw)
frwr_op_recover_mr(struct rpcrdma_mw *mw)
{
	struct rpcrdma_xprt *r_xprt = mw->mw_xprt;
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
@@ -177,35 +158,15 @@ __frwr_reset_and_unmap(struct rpcrdma_mw *mw)

	rc = __frwr_reset_mr(ia, mw);
	ib_dma_unmap_sg(ia->ri_device, mw->mw_sg, mw->mw_nents, mw->mw_dir);
	if (rc)
	if (rc) {
		pr_err("rpcrdma: FRMR reset status %d, %p orphaned\n",
		       rc, mw);
		r_xprt->rx_stats.mrs_orphaned++;
		return;
	rpcrdma_put_mw(r_xprt, mw);
	}

/* Deferred reset of a single FRMR. Generate a fresh rkey by
 * replacing the MR.
 *
 * There's no recovery if this fails. The FRMR is abandoned, but
 * remains in rb_all. It will be cleaned up when the transport is
 * destroyed.
 */
static void
__frwr_recovery_worker(struct work_struct *work)
{
	struct rpcrdma_mw *r = container_of(work, struct rpcrdma_mw,
					    mw_work);

	__frwr_reset_and_unmap(r);
}

/* A broken MR was discovered in a context that can't sleep.
 * Defer recovery to the recovery worker.
 */
static void
__frwr_queue_recovery(struct rpcrdma_mw *r)
{
	INIT_WORK(&r->mw_work, __frwr_recovery_worker);
	queue_work(frwr_recovery_wq, &r->mw_work);
	rpcrdma_put_mw(r_xprt, mw);
	r_xprt->rx_stats.mrs_recovered++;
}

static int
@@ -401,7 +362,7 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
	seg1->rl_mw = NULL;
	do {
		if (mw)
			__frwr_queue_recovery(mw);
			rpcrdma_defer_mr_recovery(mw);
		mw = rpcrdma_get_mw(r_xprt);
		if (!mw)
			return -ENOMEM;
@@ -483,12 +444,11 @@ out_mapmr_err:
	pr_err("rpcrdma: failed to map mr %p (%u/%u)\n",
	       frmr->fr_mr, n, mw->mw_nents);
	rc = n < 0 ? n : -EIO;
	__frwr_queue_recovery(mw);
	rpcrdma_defer_mr_recovery(mw);
	return rc;

out_senderr:
	pr_err("rpcrdma: ib_post_send status %i\n", rc);
	__frwr_queue_recovery(mw);
	rpcrdma_defer_mr_recovery(mw);
	return rc;
}

@@ -627,9 +587,9 @@ frwr_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
		mw = seg->rl_mw;

		if (sync)
			__frwr_reset_and_unmap(mw);
			frwr_op_recover_mr(mw);
		else
			__frwr_queue_recovery(mw);
			rpcrdma_defer_mr_recovery(mw);

		i += seg->mr_nsegs;
		seg->mr_nsegs = 0;
@@ -642,9 +602,6 @@ frwr_op_destroy(struct rpcrdma_buffer *buf)
{
	struct rpcrdma_mw *r;

	/* Ensure stale MWs for "buf" are no longer in flight */
	flush_workqueue(frwr_recovery_wq);

	while (!list_empty(&buf->rb_all)) {
		r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
		list_del(&r->mw_all);
@@ -657,6 +614,7 @@ const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops = {
	.ro_map				= frwr_op_map,
	.ro_unmap_sync			= frwr_op_unmap_sync,
	.ro_unmap_safe			= frwr_op_unmap_safe,
	.ro_recover_mr			= frwr_op_recover_mr,
	.ro_open			= frwr_op_open,
	.ro_maxpages			= frwr_op_maxpages,
	.ro_init			= frwr_op_init,
+5 −11
Original line number Diff line number Diff line
@@ -660,7 +660,7 @@ void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
		   xprt->stat.bad_xids,
		   xprt->stat.req_u,
		   xprt->stat.bklog_u);
	seq_printf(seq, "%lu %lu %lu %llu %llu %llu %llu %lu %lu %lu %lu\n",
	seq_printf(seq, "%lu %lu %lu %llu %llu %llu %llu %lu %lu %lu %lu ",
		   r_xprt->rx_stats.read_chunk_count,
		   r_xprt->rx_stats.write_chunk_count,
		   r_xprt->rx_stats.reply_chunk_count,
@@ -672,6 +672,9 @@ void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
		   r_xprt->rx_stats.failed_marshal_count,
		   r_xprt->rx_stats.bad_reply_count,
		   r_xprt->rx_stats.nomsg_call_count);
	seq_printf(seq, "%lu %lu\n",
		   r_xprt->rx_stats.mrs_recovered,
		   r_xprt->rx_stats.mrs_orphaned);
}

static int
@@ -741,7 +744,6 @@ void xprt_rdma_cleanup(void)
			__func__, rc);

	rpcrdma_destroy_wq();
	frwr_destroy_recovery_wq();

	rc = xprt_unregister_transport(&xprt_rdma_bc);
	if (rc)
@@ -753,20 +755,13 @@ int xprt_rdma_init(void)
{
	int rc;

	rc = frwr_alloc_recovery_wq();
	if (rc)
		return rc;

	rc = rpcrdma_alloc_wq();
	if (rc) {
		frwr_destroy_recovery_wq();
	if (rc)
		return rc;
	}

	rc = xprt_register_transport(&xprt_rdma);
	if (rc) {
		rpcrdma_destroy_wq();
		frwr_destroy_recovery_wq();
		return rc;
	}

@@ -774,7 +769,6 @@ int xprt_rdma_init(void)
	if (rc) {
		xprt_unregister_transport(&xprt_rdma);
		rpcrdma_destroy_wq();
		frwr_destroy_recovery_wq();
		return rc;
	}

+42 −1
Original line number Diff line number Diff line
@@ -777,6 +777,41 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
	ib_drain_qp(ia->ri_id->qp);
}

static void
rpcrdma_mr_recovery_worker(struct work_struct *work)
{
	struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
						  rb_recovery_worker.work);
	struct rpcrdma_mw *mw;

	spin_lock(&buf->rb_recovery_lock);
	while (!list_empty(&buf->rb_stale_mrs)) {
		mw = list_first_entry(&buf->rb_stale_mrs,
				      struct rpcrdma_mw, mw_list);
		list_del_init(&mw->mw_list);
		spin_unlock(&buf->rb_recovery_lock);

		dprintk("RPC:       %s: recovering MR %p\n", __func__, mw);
		mw->mw_xprt->rx_ia.ri_ops->ro_recover_mr(mw);

		spin_lock(&buf->rb_recovery_lock);
	};
	spin_unlock(&buf->rb_recovery_lock);
}

void
rpcrdma_defer_mr_recovery(struct rpcrdma_mw *mw)
{
	struct rpcrdma_xprt *r_xprt = mw->mw_xprt;
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;

	spin_lock(&buf->rb_recovery_lock);
	list_add(&mw->mw_list, &buf->rb_stale_mrs);
	spin_unlock(&buf->rb_recovery_lock);

	schedule_delayed_work(&buf->rb_recovery_worker, 0);
}

struct rpcrdma_req *
rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
{
@@ -837,8 +872,12 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)

	buf->rb_max_requests = r_xprt->rx_data.max_requests;
	buf->rb_bc_srv_max_requests = 0;
	spin_lock_init(&buf->rb_lock);
	atomic_set(&buf->rb_credits, 1);
	spin_lock_init(&buf->rb_lock);
	spin_lock_init(&buf->rb_recovery_lock);
	INIT_LIST_HEAD(&buf->rb_stale_mrs);
	INIT_DELAYED_WORK(&buf->rb_recovery_worker,
			  rpcrdma_mr_recovery_worker);

	rc = ia->ri_ops->ro_init(r_xprt);
	if (rc)
@@ -923,6 +962,8 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
{
	struct rpcrdma_ia *ia = rdmab_to_ia(buf);

	cancel_delayed_work_sync(&buf->rb_recovery_worker);

	while (!list_empty(&buf->rb_recv_bufs)) {
		struct rpcrdma_rep *rep;

+9 −4
Original line number Diff line number Diff line
@@ -245,7 +245,6 @@ struct rpcrdma_mw {
		struct rpcrdma_fmr	fmr;
		struct rpcrdma_frmr	frmr;
	};
	struct work_struct	mw_work;
	struct rpcrdma_xprt	*mw_xprt;
	struct list_head	mw_all;
};
@@ -341,6 +340,10 @@ struct rpcrdma_buffer {
	struct list_head	rb_allreqs;

	u32			rb_bc_max_requests;

	spinlock_t		rb_recovery_lock; /* protect rb_stale_mrs */
	struct list_head	rb_stale_mrs;
	struct delayed_work	rb_recovery_worker;
};
#define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)

@@ -387,6 +390,8 @@ struct rpcrdma_stats {
	unsigned long		bad_reply_count;
	unsigned long		nomsg_call_count;
	unsigned long		bcall_count;
	unsigned long		mrs_recovered;
	unsigned long		mrs_orphaned;
};

/*
@@ -400,6 +405,7 @@ struct rpcrdma_memreg_ops {
					 struct rpcrdma_req *);
	void		(*ro_unmap_safe)(struct rpcrdma_xprt *,
					 struct rpcrdma_req *, bool);
	void		(*ro_recover_mr)(struct rpcrdma_mw *);
	int		(*ro_open)(struct rpcrdma_ia *,
				   struct rpcrdma_ep *,
				   struct rpcrdma_create_data_internal *);
@@ -477,6 +483,8 @@ void rpcrdma_buffer_put(struct rpcrdma_req *);
void rpcrdma_recv_buffer_get(struct rpcrdma_req *);
void rpcrdma_recv_buffer_put(struct rpcrdma_rep *);

void rpcrdma_defer_mr_recovery(struct rpcrdma_mw *);

struct rpcrdma_regbuf *rpcrdma_alloc_regbuf(struct rpcrdma_ia *,
					    size_t, gfp_t);
void rpcrdma_free_regbuf(struct rpcrdma_ia *,
@@ -484,9 +492,6 @@ void rpcrdma_free_regbuf(struct rpcrdma_ia *,

int rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *, unsigned int);

int frwr_alloc_recovery_wq(void);
void frwr_destroy_recovery_wq(void);

int rpcrdma_alloc_wq(void);
void rpcrdma_destroy_wq(void);