Commit 3f9c7e76 authored by Chuck Lever's avatar Chuck Lever Committed by Anna Schumaker
Browse files

xprtrdma: Backchannel can use GFP_KERNEL allocations



The Receive handler runs in process context, thus can use on-demand
GFP_KERNEL allocations instead of pre-allocation.

This makes the xprtrdma backchannel independent of the number of
backchannel session slots provisioned by the Upper Layer protocol.

Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
Signed-off-by: default avatarAnna Schumaker <Anna.Schumaker@Netapp.com>
parent d2832af3
Loading
Loading
Loading
Loading
+40 −64
Original line number Diff line number Diff line
@@ -19,35 +19,6 @@

#undef RPCRDMA_BACKCHANNEL_DEBUG

static int rpcrdma_bc_setup_reqs(struct rpcrdma_xprt *r_xprt,
				 unsigned int count)
{
	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
	struct rpcrdma_req *req;
	struct rpc_rqst *rqst;
	unsigned int i;

	for (i = 0; i < (count << 1); i++) {
		size_t size;

		size = min_t(size_t, r_xprt->rx_data.inline_rsize, PAGE_SIZE);
		req = rpcrdma_req_create(r_xprt, size, GFP_KERNEL);
		if (!req)
			return -ENOMEM;
		rqst = &req->rl_slot;

		rqst->rq_xprt = xprt;
		INIT_LIST_HEAD(&rqst->rq_bc_list);
		__set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
		spin_lock(&xprt->bc_pa_lock);
		list_add(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
		spin_unlock(&xprt->bc_pa_lock);
		xdr_buf_init(&rqst->rq_snd_buf, rdmab_data(req->rl_sendbuf),
			     size);
	}
	return 0;
}

/**
 * xprt_rdma_bc_setup - Pre-allocate resources for handling backchannel requests
 * @xprt: transport associated with these backchannel resources
@@ -58,34 +29,10 @@ static int rpcrdma_bc_setup_reqs(struct rpcrdma_xprt *r_xprt,
int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs)
{
	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
	int rc;

	/* The backchannel reply path returns each rpc_rqst to the
	 * bc_pa_list _after_ the reply is sent. If the server is
	 * faster than the client, it can send another backward
	 * direction request before the rpc_rqst is returned to the
	 * list. The client rejects the request in this case.
	 *
	 * Twice as many rpc_rqsts are prepared to ensure there is
	 * always an rpc_rqst available as soon as a reply is sent.
	 */
	if (reqs > RPCRDMA_BACKWARD_WRS >> 1)
		goto out_err;

	rc = rpcrdma_bc_setup_reqs(r_xprt, reqs);
	if (rc)
		goto out_free;

	r_xprt->rx_buf.rb_bc_srv_max_requests = reqs;
	r_xprt->rx_buf.rb_bc_srv_max_requests = RPCRDMA_BACKWARD_WRS >> 1;
	trace_xprtrdma_cb_setup(r_xprt, reqs);
	return 0;

out_free:
	xprt_rdma_bc_destroy(xprt, reqs);

out_err:
	pr_err("RPC:       %s: setup backchannel transport failed\n", __func__);
	return -ENOMEM;
}

/**
@@ -213,6 +160,43 @@ void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst)
	spin_unlock(&xprt->bc_pa_lock);
}

static struct rpc_rqst *rpcrdma_bc_rqst_get(struct rpcrdma_xprt *r_xprt)
{
	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
	struct rpcrdma_req *req;
	struct rpc_rqst *rqst;
	size_t size;

	spin_lock(&xprt->bc_pa_lock);
	rqst = list_first_entry_or_null(&xprt->bc_pa_list, struct rpc_rqst,
					rq_bc_pa_list);
	if (!rqst)
		goto create_req;
	list_del(&rqst->rq_bc_pa_list);
	spin_unlock(&xprt->bc_pa_lock);
	return rqst;

create_req:
	spin_unlock(&xprt->bc_pa_lock);

	/* Set a limit to prevent a remote from overrunning our resources.
	 */
	if (xprt->bc_alloc_count >= RPCRDMA_BACKWARD_WRS)
		return NULL;

	size = min_t(size_t, r_xprt->rx_data.inline_rsize, PAGE_SIZE);
	req = rpcrdma_req_create(r_xprt, size, GFP_KERNEL);
	if (!req)
		return NULL;

	xprt->bc_alloc_count++;
	rqst = &req->rl_slot;
	rqst->rq_xprt = xprt;
	__set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
	xdr_buf_init(&rqst->rq_snd_buf, rdmab_data(req->rl_sendbuf), size);
	return rqst;
}

/**
 * rpcrdma_bc_receive_call - Handle a backward direction call
 * @r_xprt: transport receiving the call
@@ -244,18 +228,10 @@ void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
	pr_info("RPC:       %s: %*ph\n", __func__, size, p);
#endif

	/* Grab a free bc rqst */
	spin_lock(&xprt->bc_pa_lock);
	if (list_empty(&xprt->bc_pa_list)) {
		spin_unlock(&xprt->bc_pa_lock);
	rqst = rpcrdma_bc_rqst_get(r_xprt);
	if (!rqst)
		goto out_overflow;
	}
	rqst = list_first_entry(&xprt->bc_pa_list,
				struct rpc_rqst, rq_bc_pa_list);
	list_del(&rqst->rq_bc_pa_list);
	spin_unlock(&xprt->bc_pa_lock);

	/* Prepare rqst */
	rqst->rq_reply_bytes_recvd = 0;
	rqst->rq_xid = *p;