Commit 3bf0fb6f authored by David Howells's avatar David Howells
Browse files

afs: Probe multiple fileservers simultaneously



Send probes to all the unprobed fileservers in a fileserver list on all
addresses simultaneously in an attempt to find out the fastest route whilst
not getting stuck for 20s on any server or address that we don't get a
reply from.

This alleviates the problem whereby attempting to access a new server can
take a long time because the rotation algorithm ends up rotating through
all servers and addresses until it finds one that responds.

Signed-off-by: default avatarDavid Howells <dhowells@redhat.com>
parent 18ac6185
Loading
Loading
Loading
Loading
+3 −1
Original line number Diff line number Diff line
@@ -17,6 +17,7 @@ kafs-y := \
	file.o \
	flock.o \
	fsclient.o \
	fs_probe.o \
	inode.o \
	main.o \
	misc.o \
@@ -29,8 +30,9 @@ kafs-y := \
	super.o \
	netdevices.o \
	vlclient.o \
	vl_rotate.o \
	vl_list.o \
	vl_probe.o \
	vl_rotate.o \
	volume.o \
	write.o \
	xattr.o \
+27 −13
Original line number Diff line number Diff line
@@ -303,6 +303,8 @@ void afs_merge_fs_addr4(struct afs_addr_list *alist, __be32 xdr, u16 port)
			sizeof(alist->addrs[0]) * (alist->nr_addrs - i));

	srx = &alist->addrs[i];
	srx->srx_family = AF_RXRPC;
	srx->transport_type = SOCK_DGRAM;
	srx->transport_len = sizeof(srx->transport.sin);
	srx->transport.sin.sin_family = AF_INET;
	srx->transport.sin.sin_port = htons(port);
@@ -341,6 +343,8 @@ void afs_merge_fs_addr6(struct afs_addr_list *alist, __be32 *xdr, u16 port)
			sizeof(alist->addrs[0]) * (alist->nr_addrs - i));

	srx = &alist->addrs[i];
	srx->srx_family = AF_RXRPC;
	srx->transport_type = SOCK_DGRAM;
	srx->transport_len = sizeof(srx->transport.sin6);
	srx->transport.sin6.sin6_family = AF_INET6;
	srx->transport.sin6.sin6_port = htons(port);
@@ -353,23 +357,32 @@ void afs_merge_fs_addr6(struct afs_addr_list *alist, __be32 *xdr, u16 port)
 */
bool afs_iterate_addresses(struct afs_addr_cursor *ac)
{
	_enter("%hu+%hd", ac->start, (short)ac->index);
	unsigned long set, failed;
	int index;

	if (!ac->alist)
		return false;

	set = ac->alist->responded;
	failed = ac->alist->failed;
	_enter("%lx-%lx-%lx,%d", set, failed, ac->tried, ac->index);

	ac->nr_iterations++;

	if (ac->begun) {
		ac->index++;
		if (ac->index == ac->alist->nr_addrs)
			ac->index = 0;
	set &= ~(failed | ac->tried);

		if (ac->index == ac->start)
	if (!set)
		return false;
	}

	ac->begun = true;
	index = READ_ONCE(ac->alist->preferred);
	if (test_bit(index, &set))
		goto selected;

	index = __ffs(set);

selected:
	ac->index = index;
	set_bit(index, &ac->tried);
	ac->responded = false;
	return true;
}
@@ -383,12 +396,13 @@ int afs_end_cursor(struct afs_addr_cursor *ac)

	alist = ac->alist;
	if (alist) {
		if (ac->responded && ac->index != ac->start)
			WRITE_ONCE(alist->index, ac->index);
		if (ac->responded &&
		    ac->index != alist->preferred &&
		    test_bit(ac->alist->preferred, &ac->tried))
			WRITE_ONCE(alist->preferred, ac->index);
		afs_put_addrlist(alist);
		ac->alist = NULL;
	}

	ac->alist = NULL;
	ac->begun = false;
	return ac->error;
}
+94 −35
Original line number Diff line number Diff line
@@ -122,6 +122,8 @@ bool afs_cm_incoming_call(struct afs_call *call)
{
	_enter("{%u, CB.OP %u}", call->service_id, call->operation_ID);

	call->epoch = rxrpc_kernel_get_epoch(call->net->socket, call->rxcall);

	switch (call->operation_ID) {
	case CBCallBack:
		call->type = &afs_SRXCBCallBack;
@@ -151,6 +153,91 @@ bool afs_cm_incoming_call(struct afs_call *call)
	}
}

/*
 * Record a probe to the cache manager from a server.
 */
static int afs_record_cm_probe(struct afs_call *call, struct afs_server *server)
{
	_enter("");

	if (test_bit(AFS_SERVER_FL_HAVE_EPOCH, &server->flags) &&
	    !test_bit(AFS_SERVER_FL_PROBING, &server->flags)) {
		if (server->cm_epoch == call->epoch)
			return 0;

		if (!server->probe.said_rebooted) {
			pr_notice("kAFS: FS rebooted %pU\n", &server->uuid);
			server->probe.said_rebooted = true;
		}
	}

	spin_lock(&server->probe_lock);

	if (!test_bit(AFS_SERVER_FL_HAVE_EPOCH, &server->flags)) {
		server->cm_epoch = call->epoch;
		server->probe.cm_epoch = call->epoch;
		goto out;
	}

	if (server->probe.cm_probed &&
	    call->epoch != server->probe.cm_epoch &&
	    !server->probe.said_inconsistent) {
		pr_notice("kAFS: FS endpoints inconsistent %pU\n",
			  &server->uuid);
		server->probe.said_inconsistent = true;
	}

	if (!server->probe.cm_probed || call->epoch == server->cm_epoch)
		server->probe.cm_epoch = server->cm_epoch;

out:
	server->probe.cm_probed = true;
	spin_unlock(&server->probe_lock);
	return 0;
}

/*
 * Find the server record by peer address and record a probe to the cache
 * manager from a server.
 */
static int afs_find_cm_server_by_peer(struct afs_call *call)
{
	struct sockaddr_rxrpc srx;
	struct afs_server *server;

	rxrpc_kernel_get_peer(call->net->socket, call->rxcall, &srx);

	server = afs_find_server(call->net, &srx);
	if (!server) {
		trace_afs_cm_no_server(call, &srx);
		return 0;
	}

	call->cm_server = server;
	return afs_record_cm_probe(call, server);
}

/*
 * Find the server record by server UUID and record a probe to the cache
 * manager from a server.
 */
static int afs_find_cm_server_by_uuid(struct afs_call *call,
				      struct afs_uuid *uuid)
{
	struct afs_server *server;

	rcu_read_lock();
	server = afs_find_server_by_uuid(call->net, call->request);
	rcu_read_unlock();
	if (!server) {
		trace_afs_cm_no_server_u(call, call->request);
		return 0;
	}

	call->cm_server = server;
	return afs_record_cm_probe(call, server);
}

/*
 * Clean up a cache manager call.
 */
@@ -187,7 +274,6 @@ static void SRXAFSCB_CallBack(struct work_struct *work)
static int afs_deliver_cb_callback(struct afs_call *call)
{
	struct afs_callback_break *cb;
	struct sockaddr_rxrpc srx;
	__be32 *bp;
	int ret, loop;

@@ -276,12 +362,7 @@ static int afs_deliver_cb_callback(struct afs_call *call)

	/* we'll need the file server record as that tells us which set of
	 * vnodes to operate upon */
	rxrpc_kernel_get_peer(call->net->socket, call->rxcall, &srx);
	call->cm_server = afs_find_server(call->net, &srx);
	if (!call->cm_server)
		trace_afs_cm_no_server(call, &srx);

	return afs_queue_call_work(call);
	return afs_find_cm_server_by_peer(call);
}

/*
@@ -305,13 +386,10 @@ static void SRXAFSCB_InitCallBackState(struct work_struct *work)
 */
static int afs_deliver_cb_init_call_back_state(struct afs_call *call)
{
	struct sockaddr_rxrpc srx;
	int ret;

	_enter("");

	rxrpc_kernel_get_peer(call->net->socket, call->rxcall, &srx);

	afs_extract_discard(call, 0);
	ret = afs_extract_data(call, false);
	if (ret < 0)
@@ -319,11 +397,7 @@ static int afs_deliver_cb_init_call_back_state(struct afs_call *call)

	/* we'll need the file server record as that tells us which set of
	 * vnodes to operate upon */
	call->cm_server = afs_find_server(call->net, &srx);
	if (!call->cm_server)
		trace_afs_cm_no_server(call, &srx);

	return afs_queue_call_work(call);
	return afs_find_cm_server_by_peer(call);
}

/*
@@ -384,13 +458,7 @@ static int afs_deliver_cb_init_call_back_state3(struct afs_call *call)

	/* we'll need the file server record as that tells us which set of
	 * vnodes to operate upon */
	rcu_read_lock();
	call->cm_server = afs_find_server_by_uuid(call->net, call->request);
	rcu_read_unlock();
	if (!call->cm_server)
		trace_afs_cm_no_server_u(call, call->request);

	return afs_queue_call_work(call);
	return afs_find_cm_server_by_uuid(call, call->request);
}

/*
@@ -422,8 +490,7 @@ static int afs_deliver_cb_probe(struct afs_call *call)

	if (!afs_check_call_state(call, AFS_CALL_SV_REPLYING))
		return afs_io_error(call, afs_io_error_cm_reply);

	return afs_queue_call_work(call);
	return afs_find_cm_server_by_peer(call);
}

/*
@@ -503,8 +570,7 @@ static int afs_deliver_cb_probe_uuid(struct afs_call *call)

	if (!afs_check_call_state(call, AFS_CALL_SV_REPLYING))
		return afs_io_error(call, afs_io_error_cm_reply);

	return afs_queue_call_work(call);
	return afs_find_cm_server_by_uuid(call, call->request);
}

/*
@@ -586,8 +652,7 @@ static int afs_deliver_cb_tell_me_about_yourself(struct afs_call *call)

	if (!afs_check_call_state(call, AFS_CALL_SV_REPLYING))
		return afs_io_error(call, afs_io_error_cm_reply);

	return afs_queue_call_work(call);
	return afs_find_cm_server_by_peer(call);
}

/*
@@ -596,7 +661,6 @@ static int afs_deliver_cb_tell_me_about_yourself(struct afs_call *call)
static int afs_deliver_yfs_cb_callback(struct afs_call *call)
{
	struct afs_callback_break *cb;
	struct sockaddr_rxrpc srx;
	struct yfs_xdr_YFSFid *bp;
	size_t size;
	int ret, loop;
@@ -664,10 +728,5 @@ static int afs_deliver_yfs_cb_callback(struct afs_call *call)
	/* We'll need the file server record as that tells us which set of
	 * vnodes to operate upon.
	 */
	rxrpc_kernel_get_peer(call->net->socket, call->rxcall, &srx);
	call->cm_server = afs_find_server(call->net, &srx);
	if (!call->cm_server)
		trace_afs_cm_no_server(call, &srx);

	return afs_queue_call_work(call);
	return afs_find_cm_server_by_peer(call);
}

fs/afs/fs_probe.c

0 → 100644
+270 −0
Original line number Diff line number Diff line
/* AFS fileserver probing
 *
 * Copyright (C) 2018 Red Hat, Inc. All Rights Reserved.
 * Written by David Howells (dhowells@redhat.com)
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public Licence
 * as published by the Free Software Foundation; either version
 * 2 of the Licence, or (at your option) any later version.
 */

#include <linux/sched.h>
#include <linux/slab.h>
#include "afs_fs.h"
#include "internal.h"
#include "protocol_yfs.h"

static bool afs_fs_probe_done(struct afs_server *server)
{
	if (!atomic_dec_and_test(&server->probe_outstanding))
		return false;

	wake_up_var(&server->probe_outstanding);
	clear_bit_unlock(AFS_SERVER_FL_PROBING, &server->flags);
	wake_up_bit(&server->flags, AFS_SERVER_FL_PROBING);
	return true;
}

/*
 * Process the result of probing a fileserver.  This is called after successful
 * or failed delivery of an FS.GetCapabilities operation.
 */
void afs_fileserver_probe_result(struct afs_call *call)
{
	struct afs_addr_list *alist = call->alist;
	struct afs_server *server = call->reply[0];
	unsigned int server_index = (long)call->reply[1];
	unsigned int index = call->addr_ix;
	unsigned int rtt = UINT_MAX;
	bool have_result = false;
	u64 _rtt;
	int ret = call->error;

	_enter("%pU,%u", &server->uuid, index);

	spin_lock(&server->probe_lock);

	switch (ret) {
	case 0:
		server->probe.error = 0;
		goto responded;
	case -ECONNABORTED:
		if (!server->probe.responded) {
			server->probe.abort_code = call->abort_code;
			server->probe.error = ret;
		}
		goto responded;
	case -ENOMEM:
	case -ENONET:
		server->probe.local_failure = true;
		afs_io_error(call, afs_io_error_fs_probe_fail);
		goto out;
	case -ECONNRESET: /* Responded, but call expired. */
	case -ENETUNREACH:
	case -EHOSTUNREACH:
	case -ECONNREFUSED:
	case -ETIMEDOUT:
	case -ETIME:
	default:
		clear_bit(index, &alist->responded);
		set_bit(index, &alist->failed);
		if (!server->probe.responded &&
		    (server->probe.error == 0 ||
		     server->probe.error == -ETIMEDOUT ||
		     server->probe.error == -ETIME))
			server->probe.error = ret;
		afs_io_error(call, afs_io_error_fs_probe_fail);
		goto out;
	}

responded:
	set_bit(index, &alist->responded);
	clear_bit(index, &alist->failed);

	if (call->service_id == YFS_FS_SERVICE) {
		server->probe.is_yfs = true;
		set_bit(AFS_SERVER_FL_IS_YFS, &server->flags);
		alist->addrs[index].srx_service = call->service_id;
	} else {
		server->probe.not_yfs = true;
		if (!server->probe.is_yfs) {
			clear_bit(AFS_SERVER_FL_IS_YFS, &server->flags);
			alist->addrs[index].srx_service = call->service_id;
		}
	}

	/* Get the RTT and scale it to fit into a 32-bit value that represents
	 * over a minute of time so that we can access it with one instruction
	 * on a 32-bit system.
	 */
	_rtt = rxrpc_kernel_get_rtt(call->net->socket, call->rxcall);
	_rtt /= 64;
	rtt = (_rtt > UINT_MAX) ? UINT_MAX : _rtt;
	if (rtt < server->probe.rtt) {
		server->probe.rtt = rtt;
		alist->preferred = index;
		have_result = true;
	}

	smp_wmb(); /* Set rtt before responded. */
	server->probe.responded = true;
	set_bit(AFS_SERVER_FL_PROBED, &server->flags);
out:
	spin_unlock(&server->probe_lock);

	_debug("probe [%u][%u] %pISpc rtt=%u ret=%d",
	       server_index, index, &alist->addrs[index].transport,
	       (unsigned int)rtt, ret);

	have_result |= afs_fs_probe_done(server);
	if (have_result) {
		server->probe.have_result = true;
		wake_up_var(&server->probe.have_result);
		wake_up_all(&server->probe_wq);
	}
}

/*
 * Probe all of a fileserver's addresses to find out the best route and to
 * query its capabilities.
 */
static int afs_do_probe_fileserver(struct afs_net *net,
				   struct afs_server *server,
				   struct key *key,
				   unsigned int server_index)
{
	struct afs_addr_cursor ac = {
		.index = 0,
	};
	int ret;

	_enter("%pU", &server->uuid);

	read_lock(&server->fs_lock);
	ac.alist = rcu_dereference_protected(server->addresses,
					     lockdep_is_held(&server->fs_lock));
	read_unlock(&server->fs_lock);

	atomic_set(&server->probe_outstanding, ac.alist->nr_addrs);
	memset(&server->probe, 0, sizeof(server->probe));
	server->probe.rtt = UINT_MAX;

	for (ac.index = 0; ac.index < ac.alist->nr_addrs; ac.index++) {
		ret = afs_fs_get_capabilities(net, server, &ac, key, server_index,
					      true);
		if (ret != -EINPROGRESS) {
			afs_fs_probe_done(server);
			return ret;
		}
	}

	return 0;
}

/*
 * Send off probes to all unprobed servers.
 */
int afs_probe_fileservers(struct afs_net *net, struct key *key,
			  struct afs_server_list *list)
{
	struct afs_server *server;
	int i, ret;

	for (i = 0; i < list->nr_servers; i++) {
		server = list->servers[i].server;
		if (test_bit(AFS_SERVER_FL_PROBED, &server->flags))
			continue;

		if (!test_and_set_bit_lock(AFS_SERVER_FL_PROBING, &server->flags)) {
			ret = afs_do_probe_fileserver(net, server, key, i);
			if (ret)
				return ret;
		}
	}

	return 0;
}

/*
 * Wait for the first as-yet untried fileserver to respond.
 */
int afs_wait_for_fs_probes(struct afs_server_list *slist, unsigned long untried)
{
	struct wait_queue_entry *waits;
	struct afs_server *server;
	unsigned int rtt = UINT_MAX;
	bool have_responders = false;
	int pref = -1, i;

	_enter("%u,%lx", slist->nr_servers, untried);

	/* Only wait for servers that have a probe outstanding. */
	for (i = 0; i < slist->nr_servers; i++) {
		if (test_bit(i, &untried)) {
			server = slist->servers[i].server;
			if (!test_bit(AFS_SERVER_FL_PROBING, &server->flags))
				__clear_bit(i, &untried);
			if (server->probe.responded)
				have_responders = true;
		}
	}
	if (have_responders || !untried)
		return 0;

	waits = kmalloc(array_size(slist->nr_servers, sizeof(*waits)), GFP_KERNEL);
	if (!waits)
		return -ENOMEM;

	for (i = 0; i < slist->nr_servers; i++) {
		if (test_bit(i, &untried)) {
			server = slist->servers[i].server;
			init_waitqueue_entry(&waits[i], current);
			add_wait_queue(&server->probe_wq, &waits[i]);
		}
	}

	for (;;) {
		bool still_probing = false;

		set_current_state(TASK_INTERRUPTIBLE);
		for (i = 0; i < slist->nr_servers; i++) {
			if (test_bit(i, &untried)) {
				server = slist->servers[i].server;
				if (server->probe.responded)
					goto stop;
				if (test_bit(AFS_SERVER_FL_PROBING, &server->flags))
					still_probing = true;
			}
		}

		if (!still_probing || unlikely(signal_pending(current)))
			goto stop;
		schedule();
	}

stop:
	set_current_state(TASK_RUNNING);

	for (i = 0; i < slist->nr_servers; i++) {
		if (test_bit(i, &untried)) {
			server = slist->servers[i].server;
			if (server->probe.responded &&
			    server->probe.rtt < rtt) {
				pref = i;
				rtt = server->probe.rtt;
			}

			remove_wait_queue(&server->probe_wq, &waits[i]);
		}
	}

	kfree(waits);

	if (pref == -1 && signal_pending(current))
		return -ERESTARTSYS;

	if (pref >= 0)
		slist->preferred = pref;
	return 0;
}
+17 −10
Original line number Diff line number Diff line
@@ -2006,7 +2006,6 @@ int afs_fs_give_up_all_callbacks(struct afs_net *net,
 */
static int afs_deliver_fs_get_capabilities(struct afs_call *call)
{
	struct afs_server *server = call->reply[0];
	u32 count;
	int ret;

@@ -2042,15 +2041,18 @@ static int afs_deliver_fs_get_capabilities(struct afs_call *call)
		break;
	}

	if (call->service_id == YFS_FS_SERVICE)
		set_bit(AFS_SERVER_FL_IS_YFS, &server->flags);
	else
		clear_bit(AFS_SERVER_FL_IS_YFS, &server->flags);

	_leave(" = 0 [done]");
	return 0;
}

static void afs_destroy_fs_get_capabilities(struct afs_call *call)
{
	struct afs_server *server = call->reply[0];

	afs_put_server(call->net, server);
	afs_flat_call_destructor(call);
}

/*
 * FS.GetCapabilities operation type
 */
@@ -2058,7 +2060,8 @@ static const struct afs_call_type afs_RXFSGetCapabilities = {
	.name		= "FS.GetCapabilities",
	.op		= afs_FS_GetCapabilities,
	.deliver	= afs_deliver_fs_get_capabilities,
	.destructor	= afs_flat_call_destructor,
	.done		= afs_fileserver_probe_result,
	.destructor	= afs_destroy_fs_get_capabilities,
};

/*
@@ -2068,7 +2071,9 @@ static const struct afs_call_type afs_RXFSGetCapabilities = {
int afs_fs_get_capabilities(struct afs_net *net,
			    struct afs_server *server,
			    struct afs_addr_cursor *ac,
			    struct key *key)
			    struct key *key,
			    unsigned int server_index,
			    bool async)
{
	struct afs_call *call;
	__be32 *bp;
@@ -2080,8 +2085,10 @@ int afs_fs_get_capabilities(struct afs_net *net,
		return -ENOMEM;

	call->key = key;
	call->reply[0] = server;
	call->reply[0] = afs_get_server(server);
	call->reply[1] = (void *)(long)server_index;
	call->upgrade = true;
	call->want_reply_time = true;

	/* marshall the parameters */
	bp = call->request;
@@ -2089,7 +2096,7 @@ int afs_fs_get_capabilities(struct afs_net *net,

	/* Can't take a ref on server */
	trace_afs_make_fs_call(call, NULL);
	return afs_make_call(ac, call, GFP_NOFS, false);
	return afs_make_call(ac, call, GFP_NOFS, async);
}

/*
Loading