Commit 5f62412b authored by Chuck Lever's avatar Chuck Lever Committed by Anna Schumaker
Browse files

xprtrdma: Remove rpcrdma_memreg_ops



Clean up: Now that there is only FRWR, there is no need for a memory
registration switch. The indirect calls to the memreg operations can
be replaced with faster direct calls.

Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
Signed-off-by: default avatarAnna Schumaker <Anna.Schumaker@Netapp.com>
parent ba69cd12
Loading
Loading
Loading
Loading
+84 −47
Original line number Diff line number Diff line
@@ -15,21 +15,21 @@
/* Normal operation
 *
 * A Memory Region is prepared for RDMA READ or WRITE using a FAST_REG
 * Work Request (frwr_op_map). When the RDMA operation is finished, this
 * Work Request (frwr_map). When the RDMA operation is finished, this
 * Memory Region is invalidated using a LOCAL_INV Work Request
 * (frwr_op_unmap_sync).
 * (frwr_unmap_sync).
 *
 * Typically these Work Requests are not signaled, and neither are RDMA
 * SEND Work Requests (with the exception of signaling occasionally to
 * prevent provider work queue overflows). This greatly reduces HCA
 * interrupt workload.
 *
 * As an optimization, frwr_op_unmap marks MRs INVALID before the
 * As an optimization, frwr_unmap marks MRs INVALID before the
 * LOCAL_INV WR is posted. If posting succeeds, the MR is placed on
 * rb_mrs immediately so that no work (like managing a linked list
 * under a spinlock) is needed in the completion upcall.
 *
 * But this means that frwr_op_map() can occasionally encounter an MR
 * But this means that frwr_map() can occasionally encounter an MR
 * that is INVALID but the LOCAL_INV WR has not completed. Work Queue
 * ordering prevents a subsequent FAST_REG WR from executing against
 * that MR while it is still being invalidated.
@@ -57,14 +57,14 @@
 * FLUSHED_LI:	The MR was being invalidated when the QP entered ERROR
 *		state, and the pending WR was flushed.
 *
 * When frwr_op_map encounters FLUSHED and VALID MRs, they are recovered
 * When frwr_map encounters FLUSHED and VALID MRs, they are recovered
 * with ib_dereg_mr and then are re-initialized. Because MR recovery
 * allocates fresh resources, it is deferred to a workqueue, and the
 * recovered MRs are placed back on the rb_mrs list when recovery is
 * complete. frwr_op_map allocates another MR for the current RPC while
 * complete. frwr_map allocates another MR for the current RPC while
 * the broken MR is reset.
 *
 * To ensure that frwr_op_map doesn't encounter an MR that is marked
 * To ensure that frwr_map doesn't encounter an MR that is marked
 * INVALID but that is about to be flushed due to a previous transport
 * disconnect, the transport connect worker attempts to drain all
 * pending send queue WRs before the transport is reconnected.
@@ -80,8 +80,13 @@
# define RPCDBG_FACILITY	RPCDBG_TRANS
#endif

bool
frwr_is_supported(struct rpcrdma_ia *ia)
/**
 * frwr_is_supported - Check if device supports FRWR
 * @ia: interface adapter to check
 *
 * Returns true if device supports FRWR, otherwise false
 */
bool frwr_is_supported(struct rpcrdma_ia *ia)
{
	struct ib_device_attr *attrs = &ia->ri_device->attrs;

@@ -97,8 +102,12 @@ out_not_supported:
	return false;
}

static void
frwr_op_release_mr(struct rpcrdma_mr *mr)
/**
 * frwr_release_mr - Destroy one MR
 * @mr: MR allocated by frwr_init_mr
 *
 */
void frwr_release_mr(struct rpcrdma_mr *mr)
{
	int rc;

@@ -132,11 +141,19 @@ frwr_mr_recycle_worker(struct work_struct *work)
	list_del(&mr->mr_all);
	r_xprt->rx_stats.mrs_recycled++;
	spin_unlock(&r_xprt->rx_buf.rb_mrlock);
	frwr_op_release_mr(mr);

	frwr_release_mr(mr);
}

static int
frwr_op_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr)
/**
 * frwr_init_mr - Initialize one MR
 * @ia: interface adapter
 * @mr: generic MR to prepare for FRWR
 *
 * Returns zero if successful. Otherwise a negative errno
 * is returned.
 */
int frwr_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr)
{
	unsigned int depth = ia->ri_max_frwr_depth;
	struct rpcrdma_frwr *frwr = &mr->frwr;
@@ -172,7 +189,13 @@ out_list_err:
	return rc;
}

/* On success, sets:
/**
 * frwr_open - Prepare an endpoint for use with FRWR
 * @ia: interface adapter this endpoint will use
 * @ep: endpoint to prepare
 * @cdata: transport parameters
 *
 * On success, sets:
 *	ep->rep_attr.cap.max_send_wr
 *	ep->rep_attr.cap.max_recv_wr
 *	cdata->max_requests
@@ -181,9 +204,10 @@ out_list_err:
 * And these FRWR-related fields:
 *	ia->ri_max_frwr_depth
 *	ia->ri_mrtype
 *
 * On failure, a negative errno is returned.
 */
static int
frwr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
int frwr_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
	      struct rpcrdma_create_data_internal *cdata)
{
	struct ib_device_attr *attrs = &ia->ri_device->attrs;
@@ -258,11 +282,16 @@ frwr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
	return 0;
}

/* FRWR mode conveys a list of pages per chunk segment. The
/**
 * frwr_maxpages - Compute size of largest payload
 * @r_xprt: transport
 *
 * Returns maximum size of an RPC message, in pages.
 *
 * FRWR mode conveys a list of pages per chunk segment. The
 * maximum length of that list is the FRWR page list depth.
 */
static size_t
frwr_op_maxpages(struct rpcrdma_xprt *r_xprt)
size_t frwr_maxpages(struct rpcrdma_xprt *r_xprt)
{
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;

@@ -344,12 +373,24 @@ frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc)
	trace_xprtrdma_wc_li_wake(wc, frwr);
}

/* Post a REG_MR Work Request to register a memory region
/**
 * frwr_map - Register a memory region
 * @r_xprt: controlling transport
 * @seg: memory region co-ordinates
 * @nsegs: number of segments remaining
 * @writing: true when RDMA Write will be used
 * @out: initialized MR
 *
 * Prepare a REG_MR Work Request to register a memory region
 * for remote access via RDMA READ or RDMA WRITE.
 *
 * Returns the next segment or a negative errno pointer.
 * On success, the prepared MR is planted in @out.
 */
static struct rpcrdma_mr_seg *
frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
	    int nsegs, bool writing, struct rpcrdma_mr **out)
struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
				struct rpcrdma_mr_seg *seg,
				int nsegs, bool writing,
				struct rpcrdma_mr **out)
{
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
	bool holes_ok = ia->ri_mrtype == IB_MR_TYPE_SG_GAPS;
@@ -434,14 +475,18 @@ out_mapmr_err:
	return ERR_PTR(-EIO);
}

/* Post Send WR containing the RPC Call message.
/**
 * frwr_send - post Send WR containing the RPC Call message
 * @ia: interface adapter
 * @req: Prepared RPC Call
 *
 * For FRMR, chain any FastReg WRs to the Send WR. Only a
 * single ib_post_send call is needed to register memory
 * and then post the Send WR.
 *
 * Returns the result of ib_post_send.
 */
static int
frwr_op_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
int frwr_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
{
	struct ib_send_wr *post_wr;
	struct rpcrdma_mr *mr;
@@ -468,10 +513,13 @@ frwr_op_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
	return ib_post_send(ia->ri_id->qp, post_wr, NULL);
}

/* Handle a remotely invalidated mr on the @mrs list
/**
 * frwr_reminv - handle a remotely invalidated mr on the @mrs list
 * @rep: Received reply
 * @mrs: list of MRs to check
 *
 */
static void
frwr_op_reminv(struct rpcrdma_rep *rep, struct list_head *mrs)
void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs)
{
	struct rpcrdma_mr *mr;

@@ -485,7 +533,10 @@ frwr_op_reminv(struct rpcrdma_rep *rep, struct list_head *mrs)
		}
}

/* Invalidate all memory regions that were registered for "req".
/**
 * frwr_unmap_sync - invalidate memory regions that were registered for @req
 * @r_xprt: controlling transport
 * @mrs: list of MRs to process
 *
 * Sleeps until it is safe for the host CPU to access the
 * previously mapped memory regions.
@@ -493,8 +544,7 @@ frwr_op_reminv(struct rpcrdma_rep *rep, struct list_head *mrs)
 * Caller ensures that @mrs is not empty before the call. This
 * function empties the list.
 */
static void
frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct list_head *mrs)
void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct list_head *mrs)
{
	struct ib_send_wr *first, **prev, *last;
	const struct ib_send_wr *bad_wr;
@@ -577,16 +627,3 @@ out_release:
		rpcrdma_mr_recycle(mr);
	}
}

const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops = {
	.ro_map				= frwr_op_map,
	.ro_send			= frwr_op_send,
	.ro_reminv			= frwr_op_reminv,
	.ro_unmap_sync			= frwr_op_unmap_sync,
	.ro_open			= frwr_op_open,
	.ro_maxpages			= frwr_op_maxpages,
	.ro_init_mr			= frwr_op_init_mr,
	.ro_release_mr			= frwr_op_release_mr,
	.ro_displayname			= "frwr",
	.ro_send_w_inv_ok		= RPCRDMA_CMP_F_SND_W_INV_OK,
};
+5 −9
Original line number Diff line number Diff line
@@ -356,8 +356,7 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
		return nsegs;

	do {
		seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
						   false, &mr);
		seg = frwr_map(r_xprt, seg, nsegs, false, &mr);
		if (IS_ERR(seg))
			return PTR_ERR(seg);
		rpcrdma_mr_push(mr, &req->rl_registered);
@@ -414,8 +413,7 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,

	nchunks = 0;
	do {
		seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
						   true, &mr);
		seg = frwr_map(r_xprt, seg, nsegs, true, &mr);
		if (IS_ERR(seg))
			return PTR_ERR(seg);
		rpcrdma_mr_push(mr, &req->rl_registered);
@@ -472,8 +470,7 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,

	nchunks = 0;
	do {
		seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
						   true, &mr);
		seg = frwr_map(r_xprt, seg, nsegs, true, &mr);
		if (IS_ERR(seg))
			return PTR_ERR(seg);
		rpcrdma_mr_push(mr, &req->rl_registered);
@@ -1262,8 +1259,7 @@ void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
	 * RPC has relinquished all its Send Queue entries.
	 */
	if (!list_empty(&req->rl_registered))
		r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt,
						    &req->rl_registered);
		frwr_unmap_sync(r_xprt, &req->rl_registered);

	/* Ensure that any DMA mapped pages associated with
	 * the Send of the RPC Call have been unmapped before
@@ -1292,7 +1288,7 @@ void rpcrdma_deferred_completion(struct work_struct *work)

	trace_xprtrdma_defer_cmp(rep);
	if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE)
		r_xprt->rx_ia.ri_ops->ro_reminv(rep, &req->rl_registered);
		frwr_reminv(rep, &req->rl_registered);
	rpcrdma_release_rqst(r_xprt, req);
	rpcrdma_complete_rqst(rep);
}
+1 −1
Original line number Diff line number Diff line
@@ -399,7 +399,7 @@ xprt_setup_rdma(struct xprt_create *args)
	INIT_DELAYED_WORK(&new_xprt->rx_connect_worker,
			  xprt_rdma_connect_worker);

	xprt->max_payload = new_xprt->rx_ia.ri_ops->ro_maxpages(new_xprt);
	xprt->max_payload = frwr_maxpages(new_xprt);
	if (xprt->max_payload == 0)
		goto out4;
	xprt->max_payload <<= PAGE_SHIFT;
+9 −13
Original line number Diff line number Diff line
@@ -289,10 +289,9 @@ disconnected:
		break;
	}

	dprintk("RPC:       %s: %s:%s on %s/%s: %s\n", __func__,
	dprintk("RPC:       %s: %s:%s on %s/frwr: %s\n", __func__,
		rpcrdma_addrstr(r_xprt), rpcrdma_portstr(r_xprt),
		ia->ri_device->name, ia->ri_ops->ro_displayname,
		rdma_event_msg(event->event));
		ia->ri_device->name, rdma_event_msg(event->event));
	return 0;
}

@@ -392,10 +391,8 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt)

	switch (xprt_rdma_memreg_strategy) {
	case RPCRDMA_FRWR:
		if (frwr_is_supported(ia)) {
			ia->ri_ops = &rpcrdma_frwr_memreg_ops;
		if (frwr_is_supported(ia))
			break;
		}
		/*FALLTHROUGH*/
	default:
		pr_err("rpcrdma: Device %s does not support memreg mode %d\n",
@@ -509,7 +506,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
	}
	ia->ri_max_send_sges = max_sge;

	rc = ia->ri_ops->ro_open(ia, ep, cdata);
	rc = frwr_open(ia, ep, cdata);
	if (rc)
		return rc;

@@ -567,7 +564,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
	/* Prepare RDMA-CM private message */
	pmsg->cp_magic = rpcrdma_cmp_magic;
	pmsg->cp_version = RPCRDMA_CMP_VERSION;
	pmsg->cp_flags |= ia->ri_ops->ro_send_w_inv_ok;
	pmsg->cp_flags |= RPCRDMA_CMP_F_SND_W_INV_OK;
	pmsg->cp_send_size = rpcrdma_encode_buffer_size(cdata->inline_wsize);
	pmsg->cp_recv_size = rpcrdma_encode_buffer_size(cdata->inline_rsize);
	ep->rep_remote_cma.private_data = pmsg;
@@ -991,7 +988,7 @@ rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt)
		if (!mr)
			break;

		rc = ia->ri_ops->ro_init_mr(ia, mr);
		rc = frwr_init_mr(ia, mr);
		if (rc) {
			kfree(mr);
			break;
@@ -1171,7 +1168,6 @@ rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf)
{
	struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
						   rx_buf);
	struct rpcrdma_ia *ia = rdmab_to_ia(buf);
	struct rpcrdma_mr *mr;
	unsigned int count;

@@ -1187,7 +1183,7 @@ rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf)
		if (!list_empty(&mr->mr_list))
			list_del(&mr->mr_list);

		ia->ri_ops->ro_release_mr(mr);
		frwr_release_mr(mr);
		count++;
		spin_lock(&buf->rb_mrlock);
	}
@@ -1381,7 +1377,7 @@ rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
 *
 * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
 * receiving the payload of RDMA RECV operations. During Long Calls
 * or Replies they may be registered externally via ro_map.
 * or Replies they may be registered externally via frwr_map.
 */
struct rpcrdma_regbuf *
rpcrdma_alloc_regbuf(size_t size, enum dma_data_direction direction,
@@ -1472,7 +1468,7 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
		--ep->rep_send_count;
	}

	rc = ia->ri_ops->ro_send(ia, req);
	rc = frwr_send(ia, req);
	trace_xprtrdma_post_send(req, rc);
	if (rc)
		return -ENOTCONN;
+17 −31
Original line number Diff line number Diff line
@@ -66,7 +66,6 @@
 * Interface Adapter -- one per transport instance
 */
struct rpcrdma_ia {
	const struct rpcrdma_memreg_ops	*ri_ops;
	struct ib_device	*ri_device;
	struct rdma_cm_id 	*ri_id;
	struct ib_pd		*ri_pd;
@@ -406,7 +405,6 @@ struct rpcrdma_buffer {
	struct workqueue_struct *rb_completion_wq;
	struct delayed_work	rb_refresh_worker;
};
#define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)

/* rb_flags */
enum {
@@ -456,34 +454,6 @@ struct rpcrdma_stats {
	unsigned long		bcall_count;
};

/*
 * Per-registration mode operations
 */
struct rpcrdma_xprt;
struct rpcrdma_memreg_ops {
	struct rpcrdma_mr_seg *
			(*ro_map)(struct rpcrdma_xprt *,
				  struct rpcrdma_mr_seg *, int, bool,
				  struct rpcrdma_mr **);
	int		(*ro_send)(struct rpcrdma_ia *ia,
				   struct rpcrdma_req *req);
	void		(*ro_reminv)(struct rpcrdma_rep *rep,
				     struct list_head *mrs);
	void		(*ro_unmap_sync)(struct rpcrdma_xprt *,
					 struct list_head *);
	int		(*ro_open)(struct rpcrdma_ia *,
				   struct rpcrdma_ep *,
				   struct rpcrdma_create_data_internal *);
	size_t		(*ro_maxpages)(struct rpcrdma_xprt *);
	int		(*ro_init_mr)(struct rpcrdma_ia *,
				      struct rpcrdma_mr *);
	void		(*ro_release_mr)(struct rpcrdma_mr *mr);
	const char	*ro_displayname;
	const int	ro_send_w_inv_ok;
};

extern const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops;

/*
 * RPCRDMA transport -- encapsulates the structures above for
 * integration with RPC.
@@ -535,7 +505,6 @@ extern unsigned int xprt_rdma_memreg_strategy;
int rpcrdma_ia_open(struct rpcrdma_xprt *xprt);
void rpcrdma_ia_remove(struct rpcrdma_ia *ia);
void rpcrdma_ia_close(struct rpcrdma_ia *);
bool frwr_is_supported(struct rpcrdma_ia *);

/*
 * Endpoint calls - xprtrdma/verbs.c
@@ -601,6 +570,23 @@ rpcrdma_data_dir(bool writing)
	return writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
}

/* Memory registration calls xprtrdma/frwr_ops.c
 */
bool frwr_is_supported(struct rpcrdma_ia *);
int frwr_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
	      struct rpcrdma_create_data_internal *cdata);
int frwr_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr);
void frwr_release_mr(struct rpcrdma_mr *mr);
size_t frwr_maxpages(struct rpcrdma_xprt *r_xprt);
struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
				struct rpcrdma_mr_seg *seg,
				int nsegs, bool writing,
				struct rpcrdma_mr **mr);
int frwr_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req);
void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs);
void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt,
		     struct list_head *mrs);

/*
 * RPC/RDMA protocol calls - xprtrdma/rpc_rdma.c
 */