Commit c544577d authored by Trond Myklebust's avatar Trond Myklebust
Browse files

SUNRPC: Clean up transport write space handling



Treat socket write space handling in the same way we now treat transport
congestion: by denying the XPRT_LOCK until the transport signals that it
has free buffer space.

Signed-off-by: default avatarTrond Myklebust <trond.myklebust@hammerspace.com>
parent 36bd7de9
Loading
Loading
Loading
Loading
+0 −1
Original line number Original line Diff line number Diff line
@@ -84,7 +84,6 @@ struct svc_xprt {
	struct sockaddr_storage	xpt_remote;	/* remote peer's address */
	struct sockaddr_storage	xpt_remote;	/* remote peer's address */
	size_t			xpt_remotelen;	/* length of address */
	size_t			xpt_remotelen;	/* length of address */
	char			xpt_remotebuf[INET6_ADDRSTRLEN + 10];
	char			xpt_remotebuf[INET6_ADDRSTRLEN + 10];
	struct rpc_wait_queue	xpt_bc_pending;	/* backchannel wait queue */
	struct list_head	xpt_users;	/* callbacks on free */
	struct list_head	xpt_users;	/* callbacks on free */


	struct net		*xpt_net;
	struct net		*xpt_net;
+3 −2
Original line number Original line Diff line number Diff line
@@ -387,8 +387,8 @@ int xprt_load_transport(const char *);
void			xprt_set_retrans_timeout_def(struct rpc_task *task);
void			xprt_set_retrans_timeout_def(struct rpc_task *task);
void			xprt_set_retrans_timeout_rtt(struct rpc_task *task);
void			xprt_set_retrans_timeout_rtt(struct rpc_task *task);
void			xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status);
void			xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status);
void			xprt_wait_for_buffer_space(struct rpc_task *task, rpc_action action);
void			xprt_wait_for_buffer_space(struct rpc_xprt *xprt);
void			xprt_write_space(struct rpc_xprt *xprt);
bool			xprt_write_space(struct rpc_xprt *xprt);
void			xprt_adjust_cwnd(struct rpc_xprt *xprt, struct rpc_task *task, int result);
void			xprt_adjust_cwnd(struct rpc_xprt *xprt, struct rpc_task *task, int result);
struct rpc_rqst *	xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid);
struct rpc_rqst *	xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid);
void			xprt_update_rtt(struct rpc_task *task);
void			xprt_update_rtt(struct rpc_task *task);
@@ -416,6 +416,7 @@ void xprt_unlock_connect(struct rpc_xprt *, void *);
#define XPRT_CLOSING		(6)
#define XPRT_CLOSING		(6)
#define XPRT_CONGESTED		(9)
#define XPRT_CONGESTED		(9)
#define XPRT_CWND_WAIT		(10)
#define XPRT_CWND_WAIT		(10)
#define XPRT_WRITE_SPACE	(11)


static inline void xprt_set_connected(struct rpc_xprt *xprt)
static inline void xprt_set_connected(struct rpc_xprt *xprt)
{
{
+10 −18
Original line number Original line Diff line number Diff line
@@ -1964,14 +1964,15 @@ call_transmit(struct rpc_task *task)
{
{
	dprint_status(task);
	dprint_status(task);


	task->tk_action = call_transmit_status;
	task->tk_status = 0;
	if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
	if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) {
		return;

		if (!xprt_prepare_transmit(task))
		if (!xprt_prepare_transmit(task))
			return;
			return;
		xprt_transmit(task);
		xprt_transmit(task);
	}
	}
	task->tk_action = call_transmit_status;
	xprt_end_transmit(task);
}


/*
/*
 * 5a.	Handle cleanup after a transmission
 * 5a.	Handle cleanup after a transmission
@@ -1986,7 +1987,6 @@ call_transmit_status(struct rpc_task *task)
	 * test first.
	 * test first.
	 */
	 */
	if (task->tk_status == 0) {
	if (task->tk_status == 0) {
		xprt_end_transmit(task);
		xprt_request_wait_receive(task);
		xprt_request_wait_receive(task);
		return;
		return;
	}
	}
@@ -1994,15 +1994,8 @@ call_transmit_status(struct rpc_task *task)
	switch (task->tk_status) {
	switch (task->tk_status) {
	default:
	default:
		dprint_status(task);
		dprint_status(task);
		xprt_end_transmit(task);
		break;
	case -EBADSLT:
		xprt_end_transmit(task);
		task->tk_action = call_transmit;
		task->tk_status = 0;
		break;
		break;
	case -EBADMSG:
	case -EBADMSG:
		xprt_end_transmit(task);
		task->tk_status = 0;
		task->tk_status = 0;
		task->tk_action = call_encode;
		task->tk_action = call_encode;
		break;
		break;
@@ -2015,6 +2008,7 @@ call_transmit_status(struct rpc_task *task)
	case -ENOBUFS:
	case -ENOBUFS:
		rpc_delay(task, HZ>>2);
		rpc_delay(task, HZ>>2);
		/* fall through */
		/* fall through */
	case -EBADSLT:
	case -EAGAIN:
	case -EAGAIN:
		task->tk_action = call_transmit;
		task->tk_action = call_transmit;
		task->tk_status = 0;
		task->tk_status = 0;
@@ -2026,7 +2020,6 @@ call_transmit_status(struct rpc_task *task)
	case -ENETUNREACH:
	case -ENETUNREACH:
	case -EPERM:
	case -EPERM:
		if (RPC_IS_SOFTCONN(task)) {
		if (RPC_IS_SOFTCONN(task)) {
			xprt_end_transmit(task);
			if (!task->tk_msg.rpc_proc->p_proc)
			if (!task->tk_msg.rpc_proc->p_proc)
				trace_xprt_ping(task->tk_xprt,
				trace_xprt_ping(task->tk_xprt,
						task->tk_status);
						task->tk_status);
@@ -2069,9 +2062,6 @@ call_bc_transmit(struct rpc_task *task)


	xprt_transmit(task);
	xprt_transmit(task);


	if (task->tk_status == -EAGAIN)
		goto out_retry;

	xprt_end_transmit(task);
	xprt_end_transmit(task);
	dprint_status(task);
	dprint_status(task);
	switch (task->tk_status) {
	switch (task->tk_status) {
@@ -2087,6 +2077,8 @@ call_bc_transmit(struct rpc_task *task)
	case -ENOTCONN:
	case -ENOTCONN:
	case -EPIPE:
	case -EPIPE:
		break;
		break;
	case -EAGAIN:
		goto out_retry;
	case -ETIMEDOUT:
	case -ETIMEDOUT:
		/*
		/*
		 * Problem reaching the server.  Disconnect and let the
		 * Problem reaching the server.  Disconnect and let the
+0 −2
Original line number Original line Diff line number Diff line
@@ -171,7 +171,6 @@ void svc_xprt_init(struct net *net, struct svc_xprt_class *xcl,
	mutex_init(&xprt->xpt_mutex);
	mutex_init(&xprt->xpt_mutex);
	spin_lock_init(&xprt->xpt_lock);
	spin_lock_init(&xprt->xpt_lock);
	set_bit(XPT_BUSY, &xprt->xpt_flags);
	set_bit(XPT_BUSY, &xprt->xpt_flags);
	rpc_init_wait_queue(&xprt->xpt_bc_pending, "xpt_bc_pending");
	xprt->xpt_net = get_net(net);
	xprt->xpt_net = get_net(net);
	strcpy(xprt->xpt_remotebuf, "uninitialized");
	strcpy(xprt->xpt_remotebuf, "uninitialized");
}
}
@@ -895,7 +894,6 @@ int svc_send(struct svc_rqst *rqstp)
	else
	else
		len = xprt->xpt_ops->xpo_sendto(rqstp);
		len = xprt->xpt_ops->xpo_sendto(rqstp);
	mutex_unlock(&xprt->xpt_mutex);
	mutex_unlock(&xprt->xpt_mutex);
	rpc_wake_up(&xprt->xpt_bc_pending);
	trace_svc_send(rqstp, len);
	trace_svc_send(rqstp, len);
	svc_xprt_release(rqstp);
	svc_xprt_release(rqstp);


+47 −30
Original line number Original line Diff line number Diff line
@@ -169,6 +169,17 @@ out:
}
}
EXPORT_SYMBOL_GPL(xprt_load_transport);
EXPORT_SYMBOL_GPL(xprt_load_transport);


static void xprt_clear_locked(struct rpc_xprt *xprt)
{
	xprt->snd_task = NULL;
	if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state)) {
		smp_mb__before_atomic();
		clear_bit(XPRT_LOCKED, &xprt->state);
		smp_mb__after_atomic();
	} else
		queue_work(xprtiod_workqueue, &xprt->task_cleanup);
}

/**
/**
 * xprt_reserve_xprt - serialize write access to transports
 * xprt_reserve_xprt - serialize write access to transports
 * @task: task that is requesting access to the transport
 * @task: task that is requesting access to the transport
@@ -188,10 +199,14 @@ int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
			return 1;
			return 1;
		goto out_sleep;
		goto out_sleep;
	}
	}
	if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
		goto out_unlock;
	xprt->snd_task = task;
	xprt->snd_task = task;


	return 1;
	return 1;


out_unlock:
	xprt_clear_locked(xprt);
out_sleep:
out_sleep:
	dprintk("RPC: %5u failed to lock transport %p\n",
	dprintk("RPC: %5u failed to lock transport %p\n",
			task->tk_pid, xprt);
			task->tk_pid, xprt);
@@ -208,17 +223,6 @@ out_sleep:
}
}
EXPORT_SYMBOL_GPL(xprt_reserve_xprt);
EXPORT_SYMBOL_GPL(xprt_reserve_xprt);


static void xprt_clear_locked(struct rpc_xprt *xprt)
{
	xprt->snd_task = NULL;
	if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state)) {
		smp_mb__before_atomic();
		clear_bit(XPRT_LOCKED, &xprt->state);
		smp_mb__after_atomic();
	} else
		queue_work(xprtiod_workqueue, &xprt->task_cleanup);
}

static bool
static bool
xprt_need_congestion_window_wait(struct rpc_xprt *xprt)
xprt_need_congestion_window_wait(struct rpc_xprt *xprt)
{
{
@@ -267,10 +271,13 @@ int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
		xprt->snd_task = task;
		xprt->snd_task = task;
		return 1;
		return 1;
	}
	}
	if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
		goto out_unlock;
	if (!xprt_need_congestion_window_wait(xprt)) {
	if (!xprt_need_congestion_window_wait(xprt)) {
		xprt->snd_task = task;
		xprt->snd_task = task;
		return 1;
		return 1;
	}
	}
out_unlock:
	xprt_clear_locked(xprt);
	xprt_clear_locked(xprt);
out_sleep:
out_sleep:
	dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt);
	dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt);
@@ -309,10 +316,12 @@ static void __xprt_lock_write_next(struct rpc_xprt *xprt)
{
{
	if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
	if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
		return;
		return;

	if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
		goto out_unlock;
	if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
	if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
				__xprt_lock_write_func, xprt))
				__xprt_lock_write_func, xprt))
		return;
		return;
out_unlock:
	xprt_clear_locked(xprt);
	xprt_clear_locked(xprt);
}
}


@@ -320,6 +329,8 @@ static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
{
{
	if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
	if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
		return;
		return;
	if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
		goto out_unlock;
	if (xprt_need_congestion_window_wait(xprt))
	if (xprt_need_congestion_window_wait(xprt))
		goto out_unlock;
		goto out_unlock;
	if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
	if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
@@ -510,39 +521,46 @@ EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks);


/**
/**
 * xprt_wait_for_buffer_space - wait for transport output buffer to clear
 * xprt_wait_for_buffer_space - wait for transport output buffer to clear
 * @task: task to be put to sleep
 * @xprt: transport
 * @action: function pointer to be executed after wait
 *
 *
 * Note that we only set the timer for the case of RPC_IS_SOFT(), since
 * Note that we only set the timer for the case of RPC_IS_SOFT(), since
 * we don't in general want to force a socket disconnection due to
 * we don't in general want to force a socket disconnection due to
 * an incomplete RPC call transmission.
 * an incomplete RPC call transmission.
 */
 */
void xprt_wait_for_buffer_space(struct rpc_task *task, rpc_action action)
void xprt_wait_for_buffer_space(struct rpc_xprt *xprt)
{
{
	struct rpc_rqst *req = task->tk_rqstp;
	set_bit(XPRT_WRITE_SPACE, &xprt->state);
	struct rpc_xprt *xprt = req->rq_xprt;

	task->tk_timeout = RPC_IS_SOFT(task) ? req->rq_timeout : 0;
	rpc_sleep_on(&xprt->pending, task, action);
}
}
EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space);
EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space);


static bool
xprt_clear_write_space_locked(struct rpc_xprt *xprt)
{
	if (test_and_clear_bit(XPRT_WRITE_SPACE, &xprt->state)) {
		__xprt_lock_write_next(xprt);
		dprintk("RPC:       write space: waking waiting task on "
				"xprt %p\n", xprt);
		return true;
	}
	return false;
}

/**
/**
 * xprt_write_space - wake the task waiting for transport output buffer space
 * xprt_write_space - wake the task waiting for transport output buffer space
 * @xprt: transport with waiting tasks
 * @xprt: transport with waiting tasks
 *
 *
 * Can be called in a soft IRQ context, so xprt_write_space never sleeps.
 * Can be called in a soft IRQ context, so xprt_write_space never sleeps.
 */
 */
void xprt_write_space(struct rpc_xprt *xprt)
bool xprt_write_space(struct rpc_xprt *xprt)
{
{
	bool ret;

	if (!test_bit(XPRT_WRITE_SPACE, &xprt->state))
		return false;
	spin_lock_bh(&xprt->transport_lock);
	spin_lock_bh(&xprt->transport_lock);
	if (xprt->snd_task) {
	ret = xprt_clear_write_space_locked(xprt);
		dprintk("RPC:       write space: waking waiting task on "
				"xprt %p\n", xprt);
		rpc_wake_up_queued_task_on_wq(xprtiod_workqueue,
				&xprt->pending, xprt->snd_task);
	}
	spin_unlock_bh(&xprt->transport_lock);
	spin_unlock_bh(&xprt->transport_lock);
	return ret;
}
}
EXPORT_SYMBOL_GPL(xprt_write_space);
EXPORT_SYMBOL_GPL(xprt_write_space);


@@ -653,6 +671,7 @@ void xprt_disconnect_done(struct rpc_xprt *xprt)
	dprintk("RPC:       disconnected transport %p\n", xprt);
	dprintk("RPC:       disconnected transport %p\n", xprt);
	spin_lock_bh(&xprt->transport_lock);
	spin_lock_bh(&xprt->transport_lock);
	xprt_clear_connected(xprt);
	xprt_clear_connected(xprt);
	xprt_clear_write_space_locked(xprt);
	xprt_wake_pending_tasks(xprt, -EAGAIN);
	xprt_wake_pending_tasks(xprt, -EAGAIN);
	spin_unlock_bh(&xprt->transport_lock);
	spin_unlock_bh(&xprt->transport_lock);
}
}
@@ -1326,9 +1345,7 @@ xprt_transmit(struct rpc_task *task)
			if (!xprt_request_data_received(task) ||
			if (!xprt_request_data_received(task) ||
			    test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
			    test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
				continue;
				continue;
		} else if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
		} else if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
			rpc_wake_up_queued_task(&xprt->pending, task);
		else
			task->tk_status = status;
			task->tk_status = status;
		break;
		break;
	}
	}
Loading