Commit d8099fed authored by Chuck Lever's avatar Chuck Lever Committed by Anna Schumaker
Browse files

xprtrdma: Reduce context switching due to Local Invalidation



Since commit ba69cd12 ("xprtrdma: Remove support for FMR memory
registration"), FRWR is the only supported memory registration mode.

We can take advantage of the asynchronous nature of FRWR's LOCAL_INV
Work Requests to get rid of the completion wait by having the
LOCAL_INV completion handler take care of DMA unmapping MRs and
waking the upper layer RPC waiter.

This eliminates two context switches when local invalidation is
necessary. As a side benefit, we will no longer need the per-xprt
deferred completion work queue.

Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
Signed-off-by: default avatarAnna Schumaker <Anna.Schumaker@Netapp.com>
parent 40088f0e
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -712,6 +712,7 @@ TRACE_EVENT(xprtrdma_wc_receive,
DEFINE_FRWR_DONE_EVENT(xprtrdma_wc_fastreg);
DEFINE_FRWR_DONE_EVENT(xprtrdma_wc_li);
DEFINE_FRWR_DONE_EVENT(xprtrdma_wc_li_wake);
DEFINE_FRWR_DONE_EVENT(xprtrdma_wc_li_done);

TRACE_EVENT(xprtrdma_frwr_alloc,
	TP_PROTO(
+102 −1
Original line number Diff line number Diff line
@@ -542,7 +542,10 @@ static void frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc)
 * @req: rpcrdma_req with a non-empty list of MRs to process
 *
 * Sleeps until it is safe for the host CPU to access the previously mapped
 * memory regions.
 * memory regions. This guarantees that registered MRs are properly fenced
 * from the server before the RPC consumer accesses the data in them. It
 * also ensures proper Send flow control: waking the next RPC waits until
 * this RPC has relinquished all its Send Queue entries.
 */
void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
{
@@ -616,3 +619,101 @@ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
		rpcrdma_mr_recycle(mr);
	}
}

/**
 * frwr_wc_localinv_done - Invoked by RDMA provider for a signaled LOCAL_INV WC
 * @cq:	completion queue (ignored)
 * @wc:	completed WR
 *
 */
static void frwr_wc_localinv_done(struct ib_cq *cq, struct ib_wc *wc)
{
	struct ib_cqe *cqe = wc->wr_cqe;
	struct rpcrdma_frwr *frwr =
		container_of(cqe, struct rpcrdma_frwr, fr_cqe);
	struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr);

	/* WARNING: Only wr_cqe and status are reliable at this point */
	trace_xprtrdma_wc_li_done(wc, frwr);
	rpcrdma_complete_rqst(frwr->fr_req->rl_reply);
	__frwr_release_mr(wc, mr);
}

/**
 * frwr_unmap_async - invalidate memory regions that were registered for @req
 * @r_xprt: controlling transport instance
 * @req: rpcrdma_req with a non-empty list of MRs to process
 *
 * This guarantees that registered MRs are properly fenced from the
 * server before the RPC consumer accesses the data in them. It also
 * ensures proper Send flow control: waking the next RPC waits until
 * this RPC has relinquished all its Send Queue entries.
 */
void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
{
	struct ib_send_wr *first, *last, **prev;
	const struct ib_send_wr *bad_wr;
	struct rpcrdma_frwr *frwr;
	struct rpcrdma_mr *mr;
	int rc;

	/* Chain the LOCAL_INV Work Requests and post them with
	 * a single ib_post_send() call.
	 */
	frwr = NULL;
	prev = &first;
	while (!list_empty(&req->rl_registered)) {
		mr = rpcrdma_mr_pop(&req->rl_registered);

		trace_xprtrdma_mr_localinv(mr);
		r_xprt->rx_stats.local_inv_needed++;

		frwr = &mr->frwr;
		frwr->fr_cqe.done = frwr_wc_localinv;
		frwr->fr_req = req;
		last = &frwr->fr_invwr;
		last->next = NULL;
		last->wr_cqe = &frwr->fr_cqe;
		last->sg_list = NULL;
		last->num_sge = 0;
		last->opcode = IB_WR_LOCAL_INV;
		last->send_flags = IB_SEND_SIGNALED;
		last->ex.invalidate_rkey = mr->mr_handle;

		*prev = last;
		prev = &last->next;
	}

	/* Strong send queue ordering guarantees that when the
	 * last WR in the chain completes, all WRs in the chain
	 * are complete. The last completion will wake up the
	 * RPC waiter.
	 */
	frwr->fr_cqe.done = frwr_wc_localinv_done;

	/* Transport disconnect drains the receive CQ before it
	 * replaces the QP. The RPC reply handler won't call us
	 * unless ri_id->qp is a valid pointer.
	 */
	bad_wr = NULL;
	rc = ib_post_send(r_xprt->rx_ia.ri_id->qp, first, &bad_wr);
	trace_xprtrdma_post_send(req, rc);
	if (!rc)
		return;

	/* Recycle MRs in the LOCAL_INV chain that did not get posted.
	 */
	while (bad_wr) {
		frwr = container_of(bad_wr, struct rpcrdma_frwr, fr_invwr);
		mr = container_of(frwr, struct rpcrdma_mr, frwr);
		bad_wr = bad_wr->next;

		rpcrdma_mr_recycle(mr);
	}

	/* The final LOCAL_INV WR in the chain is supposed to
	 * do the wake. If it was never posted, the wake will
	 * not happen, so wake here in that case.
	 */
	rpcrdma_complete_rqst(req->rl_reply);
}
+30 −31
Original line number Diff line number Diff line
@@ -1268,24 +1268,15 @@ out_badheader:
	goto out;
}

void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
{
	/* Invalidate and unmap the data payloads before waking
	 * the waiting application. This guarantees the memory
	 * regions are properly fenced from the server before the
	 * application accesses the data. It also ensures proper
	 * send flow control: waking the next RPC waits until this
	 * RPC has relinquished all its Send Queue entries.
	 */
	if (!list_empty(&req->rl_registered))
		frwr_unmap_sync(r_xprt, req);

/* Ensure that any DMA mapped pages associated with
 * the Send of the RPC Call have been unmapped before
 * allowing the RPC to complete. This protects argument
 * memory not controlled by the RPC client from being
 * re-used before we're done with it.
 */
static void rpcrdma_release_tx(struct rpcrdma_xprt *r_xprt,
			       struct rpcrdma_req *req)
{
	if (test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
		r_xprt->rx_stats.reply_waits_for_send++;
		out_of_line_wait_on_bit(&req->rl_flags,
@@ -1295,24 +1286,23 @@ void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
	}
}

/* Reply handling runs in the poll worker thread. Anything that
 * might wait is deferred to a separate workqueue.
/**
 * rpcrdma_release_rqst - Release hardware resources
 * @r_xprt: controlling transport instance
 * @req: request with resources to release
 *
 */
void rpcrdma_deferred_completion(struct work_struct *work)
void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
{
	struct rpcrdma_rep *rep =
			container_of(work, struct rpcrdma_rep, rr_work);
	struct rpcrdma_req *req = rpcr_to_rdmar(rep->rr_rqst);
	struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
	if (!list_empty(&req->rl_registered))
		frwr_unmap_sync(r_xprt, req);

	trace_xprtrdma_defer_cmp(rep);
	if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE)
		frwr_reminv(rep, &req->rl_registered);
	rpcrdma_release_rqst(r_xprt, req);
	rpcrdma_complete_rqst(rep);
	rpcrdma_release_tx(r_xprt, req);
}

/* Process received RPC/RDMA messages.
/**
 * rpcrdma_reply_handler - Process received RPC/RDMA messages
 * @rep: Incoming rpcrdma_rep object to process
 *
 * Errors must result in the RPC task either being awakened, or
 * allowed to timeout, to discover the errors at that time.
@@ -1374,7 +1364,16 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
	rep->rr_rqst = rqst;

	trace_xprtrdma_reply(rqst->rq_task, rep, req, credits);
	queue_work(buf->rb_completion_wq, &rep->rr_work);

	if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE)
		frwr_reminv(rep, &req->rl_registered);
	if (!list_empty(&req->rl_registered)) {
		frwr_unmap_async(r_xprt, req);
		/* LocalInv completion will complete the RPC */
	} else {
		rpcrdma_release_tx(r_xprt, req);
		rpcrdma_complete_rqst(rep);
	}
	return;

out_badversion:
+0 −17
Original line number Diff line number Diff line
@@ -89,14 +89,12 @@ static void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp);
 */
static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt)
{
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;

	/* Flush Receives, then wait for deferred Reply work
	 * to complete.
	 */
	ib_drain_rq(ia->ri_id->qp);
	drain_workqueue(buf->rb_completion_wq);

	/* Deferred Reply processing might have scheduled
	 * local invalidations.
@@ -1056,7 +1054,6 @@ static bool rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt, bool temp)

	rep->rr_cqe.done = rpcrdma_wc_receive;
	rep->rr_rxprt = r_xprt;
	INIT_WORK(&rep->rr_work, rpcrdma_deferred_completion);
	rep->rr_recv_wr.next = NULL;
	rep->rr_recv_wr.wr_cqe = &rep->rr_cqe;
	rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
@@ -1117,15 +1114,6 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
	if (rc)
		goto out;

	buf->rb_completion_wq = alloc_workqueue("rpcrdma-%s",
						WQ_MEM_RECLAIM | WQ_HIGHPRI,
						0,
			r_xprt->rx_xprt.address_strings[RPC_DISPLAY_ADDR]);
	if (!buf->rb_completion_wq) {
		rc = -ENOMEM;
		goto out;
	}

	return 0;
out:
	rpcrdma_buffer_destroy(buf);
@@ -1199,11 +1187,6 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
{
	cancel_delayed_work_sync(&buf->rb_refresh_worker);

	if (buf->rb_completion_wq) {
		destroy_workqueue(buf->rb_completion_wq);
		buf->rb_completion_wq = NULL;
	}

	rpcrdma_sendctxs_destroy(buf);

	while (!list_empty(&buf->rb_recv_bufs)) {
+4 −4
Original line number Diff line number Diff line
@@ -202,10 +202,9 @@ struct rpcrdma_rep {
	bool			rr_temp;
	struct rpcrdma_regbuf	*rr_rdmabuf;
	struct rpcrdma_xprt	*rr_rxprt;
	struct work_struct	rr_work;
	struct rpc_rqst		*rr_rqst;
	struct xdr_buf		rr_hdrbuf;
	struct xdr_stream	rr_stream;
	struct rpc_rqst		*rr_rqst;
	struct list_head	rr_list;
	struct ib_recv_wr	rr_recv_wr;
};
@@ -240,10 +239,12 @@ struct rpcrdma_sendctx {
 * An external memory region is any buffer or page that is registered
 * on the fly (ie, not pre-registered).
 */
struct rpcrdma_req;
struct rpcrdma_frwr {
	struct ib_mr			*fr_mr;
	struct ib_cqe			fr_cqe;
	struct completion		fr_linv_done;
	struct rpcrdma_req		*fr_req;
	union {
		struct ib_reg_wr	fr_regwr;
		struct ib_send_wr	fr_invwr;
@@ -388,7 +389,6 @@ struct rpcrdma_buffer {
	u32			rb_bc_srv_max_requests;
	u32			rb_bc_max_requests;

	struct workqueue_struct *rb_completion_wq;
	struct delayed_work	rb_refresh_worker;
};

@@ -561,6 +561,7 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
int frwr_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req);
void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs);
void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req);
void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req);

/*
 * RPC/RDMA protocol calls - xprtrdma/rpc_rdma.c
@@ -585,7 +586,6 @@ void rpcrdma_complete_rqst(struct rpcrdma_rep *rep);
void rpcrdma_reply_handler(struct rpcrdma_rep *rep);
void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt,
			  struct rpcrdma_req *req);
void rpcrdma_deferred_completion(struct work_struct *work);

static inline void rpcrdma_set_xdrlen(struct xdr_buf *xdr, size_t len)
{