Commit cf9946cd authored by Trond Myklebust's avatar Trond Myklebust
Browse files

SUNRPC: Refactor the transport request pinning



We are going to need to pin for both send and receive.

Signed-off-by: default avatarTrond Myklebust <trond.myklebust@hammerspace.com>
parent 4cd34e7c
Loading
Loading
Loading
Loading
+1 −2
Original line number Diff line number Diff line
@@ -142,8 +142,7 @@ struct rpc_task_setup {
#define RPC_TASK_ACTIVE		2
#define RPC_TASK_NEED_XMIT	3
#define RPC_TASK_NEED_RECV	4
#define RPC_TASK_MSG_RECV	5
#define RPC_TASK_MSG_RECV_WAIT	6
#define RPC_TASK_MSG_PIN_WAIT	5

#define RPC_IS_RUNNING(t)	test_bit(RPC_TASK_RUNNING, &(t)->tk_runstate)
#define rpc_set_running(t)	set_bit(RPC_TASK_RUNNING, &(t)->tk_runstate)
+1 −0
Original line number Diff line number Diff line
@@ -103,6 +103,7 @@ struct rpc_rqst {
						/* A cookie used to track the
						   state of the transport
						   connection */
	atomic_t		rq_pin;
	
	/*
	 * Partial send handling
+23 −20
Original line number Diff line number Diff line
@@ -847,16 +847,22 @@ struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid)
}
EXPORT_SYMBOL_GPL(xprt_lookup_rqst);

static bool
xprt_is_pinned_rqst(struct rpc_rqst *req)
{
	return atomic_read(&req->rq_pin) != 0;
}

/**
 * xprt_pin_rqst - Pin a request on the transport receive list
 * @req: Request to pin
 *
 * Caller must ensure this is atomic with the call to xprt_lookup_rqst()
 * so should be holding the xprt transport lock.
 * so should be holding the xprt receive lock.
 */
void xprt_pin_rqst(struct rpc_rqst *req)
{
	set_bit(RPC_TASK_MSG_RECV, &req->rq_task->tk_runstate);
	atomic_inc(&req->rq_pin);
}
EXPORT_SYMBOL_GPL(xprt_pin_rqst);

@@ -864,31 +870,22 @@ EXPORT_SYMBOL_GPL(xprt_pin_rqst);
 * xprt_unpin_rqst - Unpin a request on the transport receive list
 * @req: Request to pin
 *
 * Caller should be holding the xprt transport lock.
 * Caller should be holding the xprt receive lock.
 */
void xprt_unpin_rqst(struct rpc_rqst *req)
{
	struct rpc_task *task = req->rq_task;

	clear_bit(RPC_TASK_MSG_RECV, &task->tk_runstate);
	if (test_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate))
		wake_up_bit(&task->tk_runstate, RPC_TASK_MSG_RECV);
	if (!test_bit(RPC_TASK_MSG_PIN_WAIT, &req->rq_task->tk_runstate)) {
		atomic_dec(&req->rq_pin);
		return;
	}
	if (atomic_dec_and_test(&req->rq_pin))
		wake_up_var(&req->rq_pin);
}
EXPORT_SYMBOL_GPL(xprt_unpin_rqst);

static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req)
__must_hold(&req->rq_xprt->recv_lock)
{
	struct rpc_task *task = req->rq_task;

	if (task && test_bit(RPC_TASK_MSG_RECV, &task->tk_runstate)) {
		spin_unlock(&req->rq_xprt->recv_lock);
		set_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate);
		wait_on_bit(&task->tk_runstate, RPC_TASK_MSG_RECV,
				TASK_UNINTERRUPTIBLE);
		clear_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate);
		spin_lock(&req->rq_xprt->recv_lock);
	}
	wait_var_event(&req->rq_pin, !xprt_is_pinned_rqst(req));
}

/**
@@ -1388,7 +1385,13 @@ void xprt_release(struct rpc_task *task)
	spin_lock(&xprt->recv_lock);
	if (!list_empty(&req->rq_list)) {
		list_del_init(&req->rq_list);
		if (xprt_is_pinned_rqst(req)) {
			set_bit(RPC_TASK_MSG_PIN_WAIT, &req->rq_task->tk_runstate);
			spin_unlock(&xprt->recv_lock);
			xprt_wait_on_pinned_rqst(req);
			spin_lock(&xprt->recv_lock);
			clear_bit(RPC_TASK_MSG_PIN_WAIT, &req->rq_task->tk_runstate);
		}
	}
	spin_unlock(&xprt->recv_lock);
	spin_lock_bh(&xprt->transport_lock);