Commit 21f0ffaf authored by Trond Myklebust's avatar Trond Myklebust Committed by Trond Myklebust
Browse files

SUNRPC: Add basic load balancing to the transport switch



For now, just count the queue length. It is less accurate than counting
number of bytes queued, but easier to implement.

Signed-off-by: default avatarTrond Myklebust <trond.myklebust@primarydata.com>
parent 44942b4e
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -238,6 +238,7 @@ struct rpc_xprt {
	/*
	 * Send stuff
	 */
	atomic_long_t		queuelen;
	spinlock_t		transport_lock;	/* lock transport info */
	spinlock_t		reserve_lock;	/* lock slot table */
	spinlock_t		queue_lock;	/* send/receive queue lock */
+2 −0
Original line number Diff line number Diff line
@@ -15,6 +15,8 @@ struct rpc_xprt_switch {
	struct kref		xps_kref;

	unsigned int		xps_nxprts;
	unsigned int		xps_nactive;
	atomic_long_t		xps_queuelen;
	struct list_head	xps_xprt_list;

	struct net *		xps_net;
+37 −3
Original line number Diff line number Diff line
@@ -968,12 +968,46 @@ out:
}
EXPORT_SYMBOL_GPL(rpc_bind_new_program);

static struct rpc_xprt *
rpc_task_get_xprt(struct rpc_clnt *clnt)
{
	struct rpc_xprt_switch *xps;
	struct rpc_xprt *xprt= xprt_iter_get_next(&clnt->cl_xpi);

	if (!xprt)
		return NULL;
	rcu_read_lock();
	xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
	atomic_long_inc(&xps->xps_queuelen);
	rcu_read_unlock();
	atomic_long_inc(&xprt->queuelen);

	return xprt;
}

static void
rpc_task_release_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
{
	struct rpc_xprt_switch *xps;

	atomic_long_dec(&xprt->queuelen);
	rcu_read_lock();
	xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
	atomic_long_dec(&xps->xps_queuelen);
	rcu_read_unlock();

	xprt_put(xprt);
}

void rpc_task_release_transport(struct rpc_task *task)
{
	struct rpc_xprt *xprt = task->tk_xprt;

	if (xprt) {
		task->tk_xprt = NULL;
		if (task->tk_client)
			rpc_task_release_xprt(task->tk_client, xprt);
		else
			xprt_put(xprt);
	}
}
@@ -983,6 +1017,7 @@ void rpc_task_release_client(struct rpc_task *task)
{
	struct rpc_clnt *clnt = task->tk_client;

	rpc_task_release_transport(task);
	if (clnt != NULL) {
		/* Remove from client task list */
		spin_lock(&clnt->cl_lock);
@@ -992,14 +1027,13 @@ void rpc_task_release_client(struct rpc_task *task)

		rpc_release_client(clnt);
	}
	rpc_task_release_transport(task);
}

static
void rpc_task_set_transport(struct rpc_task *task, struct rpc_clnt *clnt)
{
	if (!task->tk_xprt)
		task->tk_xprt = xprt_iter_get_next(&clnt->cl_xpi);
		task->tk_xprt = rpc_task_get_xprt(clnt);
}

static
+19 −1
Original line number Diff line number Diff line
@@ -36,6 +36,7 @@ static void xprt_switch_add_xprt_locked(struct rpc_xprt_switch *xps,
	if (xps->xps_nxprts == 0)
		xps->xps_net = xprt->xprt_net;
	xps->xps_nxprts++;
	xps->xps_nactive++;
}

/**
@@ -62,6 +63,7 @@ static void xprt_switch_remove_xprt_locked(struct rpc_xprt_switch *xps,
{
	if (unlikely(xprt == NULL))
		return;
	xps->xps_nactive--;
	xps->xps_nxprts--;
	if (xps->xps_nxprts == 0)
		xps->xps_net = NULL;
@@ -317,8 +319,24 @@ struct rpc_xprt *xprt_switch_find_next_entry_roundrobin(struct list_head *head,
static
struct rpc_xprt *xprt_iter_next_entry_roundrobin(struct rpc_xprt_iter *xpi)
{
	return xprt_iter_next_entry_multiple(xpi,
	struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch);
	struct rpc_xprt *xprt;
	unsigned long xprt_queuelen;
	unsigned long xps_queuelen;
	unsigned long xps_avglen;

	do {
		xprt = xprt_iter_next_entry_multiple(xpi,
			xprt_switch_find_next_entry_roundrobin);
		if (xprt == NULL)
			break;
		xprt_queuelen = atomic_long_read(&xprt->queuelen);
		if (xprt_queuelen <= 2)
			break;
		xps_queuelen = atomic_long_read(&xps->xps_queuelen);
		xps_avglen = DIV_ROUND_UP(xps_queuelen, xps->xps_nactive);
	} while (xprt_queuelen > xps_avglen);
	return xprt;
}

static