Commit e0639dc5 authored by Olga Kornievskaia's avatar Olga Kornievskaia Committed by J. Bruce Fields
Browse files

NFSD introduce async copy feature



Upon receiving a request for async copy, create a new kthread.  If we
get asynchronous request, make sure to copy the needed arguments/state
from the stack before starting the copy. Then start the thread and reply
back to the client indicating copy is asynchronous.

nfsd_copy_file_range() will copy in a loop over the total number of
bytes is needed to copy. In case a failure happens in the middle, we
ignore the error and return how much we copied so far. Once done
creating a workitem for the callback workqueue and send CB_OFFLOAD with
the results.

The lifetime of the copy stateid is bound to the vfs copy. This way we
don't need to keep the nfsd_net structure for the callback.  We could
keep it around longer so that an OFFLOAD_STATUS that came late would
still get results, but clients should be able to deal without that.

We handle OFFLOAD_CANCEL by sending a signal to the copy thread and
calling kthread_stop.

A client should cancel any ongoing copies before calling DESTROY_CLIENT;
if not, we return a CLIENT_BUSY error.

If the client is destroyed for some other reason (lease expiration, or
server shutdown), we must clean up any ongoing copies ourselves.

Signed-off-by: default avatarOlga Kornievskaia <kolga@netapp.com>
[colin.king@canonical.com: fix leak in error case]
[bfields@fieldses.org: remove signalling, merge patches]
Signed-off-by: default avatarJ. Bruce Fields <bfields@redhat.com>
parent 885e2bf3
Loading
Loading
Loading
Loading
+8 −0
Original line number Diff line number Diff line
@@ -123,6 +123,14 @@ struct nfsd_net {

	wait_queue_head_t ntf_wq;
	atomic_t ntf_refcnt;

	/*
	 * clientid and stateid data for construction of net unique COPY
	 * stateids.
	 */
	u32		s2s_cp_cl_id;
	struct idr	s2s_cp_stateids;
	spinlock_t	s2s_cp_lock;
};

/* Simple check to find out if a given net was properly initialized */
+242 −19
Original line number Diff line number Diff line
@@ -36,6 +36,7 @@
#include <linux/file.h>
#include <linux/falloc.h>
#include <linux/slab.h>
#include <linux/kthread.h>

#include "idmap.h"
#include "cache.h"
@@ -1089,37 +1090,236 @@ out:
	return status;
}

void nfs4_put_copy(struct nfsd4_copy *copy)
{
	if (!refcount_dec_and_test(&copy->refcount))
		return;
	kfree(copy);
}

static bool
check_and_set_stop_copy(struct nfsd4_copy *copy)
{
	bool value;

	spin_lock(&copy->cp_clp->async_lock);
	value = copy->stopped;
	if (!copy->stopped)
		copy->stopped = true;
	spin_unlock(&copy->cp_clp->async_lock);
	return value;
}

static void nfsd4_stop_copy(struct nfsd4_copy *copy)
{
	/* only 1 thread should stop the copy */
	if (!check_and_set_stop_copy(copy))
		kthread_stop(copy->copy_task);
	nfs4_put_copy(copy);
}

static struct nfsd4_copy *nfsd4_get_copy(struct nfs4_client *clp)
{
	struct nfsd4_copy *copy = NULL;

	spin_lock(&clp->async_lock);
	if (!list_empty(&clp->async_copies)) {
		copy = list_first_entry(&clp->async_copies, struct nfsd4_copy,
					copies);
		refcount_inc(&copy->refcount);
	}
	spin_unlock(&clp->async_lock);
	return copy;
}

void nfsd4_shutdown_copy(struct nfs4_client *clp)
{
	struct nfsd4_copy *copy;

	while ((copy = nfsd4_get_copy(clp)) != NULL)
		nfsd4_stop_copy(copy);
}

static void nfsd4_cb_offload_release(struct nfsd4_callback *cb)
{
	struct nfsd4_copy *copy = container_of(cb, struct nfsd4_copy, cp_cb);

	nfs4_put_copy(copy);
}

static int nfsd4_cb_offload_done(struct nfsd4_callback *cb,
				 struct rpc_task *task)
{
	return 1;
}

static const struct nfsd4_callback_ops nfsd4_cb_offload_ops = {
	.release = nfsd4_cb_offload_release,
	.done = nfsd4_cb_offload_done
};

static void nfsd4_init_copy_res(struct nfsd4_copy *copy, bool sync)
{
	copy->cp_res.wr_stable_how = NFS_UNSTABLE;
	copy->cp_synchronous = sync;
	gen_boot_verifier(&copy->cp_res.wr_verifier, copy->cp_clp->net);
}

static ssize_t _nfsd_copy_file_range(struct nfsd4_copy *copy)
{
	ssize_t bytes_copied = 0;
	size_t bytes_total = copy->cp_count;
	u64 src_pos = copy->cp_src_pos;
	u64 dst_pos = copy->cp_dst_pos;

	do {
		if (kthread_should_stop())
			break;
		bytes_copied = nfsd_copy_file_range(copy->file_src, src_pos,
				copy->file_dst, dst_pos, bytes_total);
		if (bytes_copied <= 0)
			break;
		bytes_total -= bytes_copied;
		copy->cp_res.wr_bytes_written += bytes_copied;
		src_pos += bytes_copied;
		dst_pos += bytes_copied;
	} while (bytes_total > 0 && !copy->cp_synchronous);
	return bytes_copied;
}

static __be32 nfsd4_do_copy(struct nfsd4_copy *copy, bool sync)
{
	__be32 status;
	ssize_t bytes;

	bytes = _nfsd_copy_file_range(copy);
	/* for async copy, we ignore the error, client can always retry
	 * to get the error
	 */
	if (bytes < 0 && !copy->cp_res.wr_bytes_written)
		status = nfserrno(bytes);
	else {
		nfsd4_init_copy_res(copy, sync);
		status = nfs_ok;
	}

	fput(copy->file_src);
	fput(copy->file_dst);
	return status;
}

static void dup_copy_fields(struct nfsd4_copy *src, struct nfsd4_copy *dst)
{
	dst->cp_src_pos = src->cp_src_pos;
	dst->cp_dst_pos = src->cp_dst_pos;
	dst->cp_count = src->cp_count;
	dst->cp_synchronous = src->cp_synchronous;
	memcpy(&dst->cp_res, &src->cp_res, sizeof(src->cp_res));
	memcpy(&dst->fh, &src->fh, sizeof(src->fh));
	dst->cp_clp = src->cp_clp;
	dst->file_dst = get_file(src->file_dst);
	dst->file_src = get_file(src->file_src);
	memcpy(&dst->cp_stateid, &src->cp_stateid, sizeof(src->cp_stateid));
}

static void cleanup_async_copy(struct nfsd4_copy *copy)
{
	nfs4_free_cp_state(copy);
	fput(copy->file_dst);
	fput(copy->file_src);
	spin_lock(&copy->cp_clp->async_lock);
	list_del(&copy->copies);
	spin_unlock(&copy->cp_clp->async_lock);
	nfs4_put_copy(copy);
}

static int nfsd4_do_async_copy(void *data)
{
	struct nfsd4_copy *copy = (struct nfsd4_copy *)data;
	struct nfsd4_copy *cb_copy;

	copy->nfserr = nfsd4_do_copy(copy, 0);
	cb_copy = kzalloc(sizeof(struct nfsd4_copy), GFP_KERNEL);
	if (!cb_copy)
		goto out;
	memcpy(&cb_copy->cp_res, &copy->cp_res, sizeof(copy->cp_res));
	cb_copy->cp_clp = copy->cp_clp;
	cb_copy->nfserr = copy->nfserr;
	memcpy(&cb_copy->fh, &copy->fh, sizeof(copy->fh));
	nfsd4_init_cb(&cb_copy->cp_cb, cb_copy->cp_clp,
			&nfsd4_cb_offload_ops, NFSPROC4_CLNT_CB_OFFLOAD);
	nfsd4_run_cb(&cb_copy->cp_cb);
out:
	cleanup_async_copy(copy);
	return 0;
}

static __be32
nfsd4_copy(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
		union nfsd4_op_u *u)
{
	struct nfsd4_copy *copy = &u->copy;
	struct file *src, *dst;
	__be32 status;
	ssize_t bytes;
	struct nfsd4_copy *async_copy = NULL;

	status = nfsd4_verify_copy(rqstp, cstate, &copy->cp_src_stateid, &src,
				   &copy->cp_dst_stateid, &dst);
	status = nfsd4_verify_copy(rqstp, cstate, &copy->cp_src_stateid,
				   &copy->file_src, &copy->cp_dst_stateid,
				   &copy->file_dst);
	if (status)
		goto out;

	bytes = nfsd_copy_file_range(src, copy->cp_src_pos,
			dst, copy->cp_dst_pos, copy->cp_count);
	copy->cp_clp = cstate->clp;
	memcpy(&copy->fh, &cstate->current_fh.fh_handle,
		sizeof(struct knfsd_fh));
	if (!copy->cp_synchronous) {
		struct nfsd_net *nn = net_generic(SVC_NET(rqstp), nfsd_net_id);

	if (bytes < 0)
		status = nfserrno(bytes);
	else {
		copy->cp_res.wr_bytes_written = bytes;
		copy->cp_res.wr_stable_how = NFS_UNSTABLE;
		copy->cp_synchronous = 1;
		gen_boot_verifier(&copy->cp_res.wr_verifier, SVC_NET(rqstp));
		status = nfs_ok;
		status = nfserrno(-ENOMEM);
		async_copy = kzalloc(sizeof(struct nfsd4_copy), GFP_KERNEL);
		if (!async_copy)
			goto out;
		if (!nfs4_init_cp_state(nn, copy)) {
			kfree(async_copy);
			goto out;
		}

	fput(src);
	fput(dst);
		refcount_set(&async_copy->refcount, 1);
		memcpy(&copy->cp_res.cb_stateid, &copy->cp_stateid,
			sizeof(copy->cp_stateid));
		dup_copy_fields(copy, async_copy);
		async_copy->copy_task = kthread_create(nfsd4_do_async_copy,
				async_copy, "%s", "copy thread");
		if (IS_ERR(async_copy->copy_task))
			goto out_err;
		spin_lock(&async_copy->cp_clp->async_lock);
		list_add(&async_copy->copies,
				&async_copy->cp_clp->async_copies);
		spin_unlock(&async_copy->cp_clp->async_lock);
		wake_up_process(async_copy->copy_task);
		status = nfs_ok;
	} else
		status = nfsd4_do_copy(copy, 1);
out:
	return status;
out_err:
	cleanup_async_copy(async_copy);
	goto out;
}

struct nfsd4_copy *
find_async_copy(struct nfs4_client *clp, stateid_t *stateid)
{
	struct nfsd4_copy *copy;

	spin_lock(&clp->async_lock);
	list_for_each_entry(copy, &clp->async_copies, copies) {
		if (memcmp(&copy->cp_stateid, stateid, NFS4_STATEID_SIZE))
			continue;
		refcount_inc(&copy->refcount);
		spin_unlock(&clp->async_lock);
		return copy;
	}
	spin_unlock(&clp->async_lock);
	return NULL;
}

static __be32
@@ -1127,7 +1327,18 @@ nfsd4_offload_cancel(struct svc_rqst *rqstp,
		     struct nfsd4_compound_state *cstate,
		     union nfsd4_op_u *u)
{
	return 0;
	struct nfsd4_offload_status *os = &u->offload_status;
	__be32 status = 0;
	struct nfsd4_copy *copy;
	struct nfs4_client *clp = cstate->clp;

	copy = find_async_copy(clp, &os->stateid);
	if (copy)
		nfsd4_stop_copy(copy);
	else
		status = nfserr_bad_stateid;

	return status;
}

static __be32
@@ -1157,7 +1368,19 @@ nfsd4_offload_status(struct svc_rqst *rqstp,
		     struct nfsd4_compound_state *cstate,
		     union nfsd4_op_u *u)
{
	return nfserr_notsupp;
	struct nfsd4_offload_status *os = &u->offload_status;
	__be32 status = 0;
	struct nfsd4_copy *copy;
	struct nfs4_client *clp = cstate->clp;

	copy = find_async_copy(clp, &os->stateid);
	if (copy) {
		os->count = copy->cp_res.wr_bytes_written;
		nfs4_put_copy(copy);
	} else
		status = nfserr_bad_stateid;

	return status;
}

static __be32
+37 −1
Original line number Diff line number Diff line
@@ -713,6 +713,36 @@ out_free:
	return NULL;
}

/*
 * Create a unique stateid_t to represent each COPY.
 */
int nfs4_init_cp_state(struct nfsd_net *nn, struct nfsd4_copy *copy)
{
	int new_id;

	idr_preload(GFP_KERNEL);
	spin_lock(&nn->s2s_cp_lock);
	new_id = idr_alloc_cyclic(&nn->s2s_cp_stateids, copy, 0, 0, GFP_NOWAIT);
	spin_unlock(&nn->s2s_cp_lock);
	idr_preload_end();
	if (new_id < 0)
		return 0;
	copy->cp_stateid.si_opaque.so_id = new_id;
	copy->cp_stateid.si_opaque.so_clid.cl_boot = nn->boot_time;
	copy->cp_stateid.si_opaque.so_clid.cl_id = nn->s2s_cp_cl_id;
	return 1;
}

void nfs4_free_cp_state(struct nfsd4_copy *copy)
{
	struct nfsd_net *nn;

	nn = net_generic(copy->cp_clp->net, nfsd_net_id);
	spin_lock(&nn->s2s_cp_lock);
	idr_remove(&nn->s2s_cp_stateids, copy->cp_stateid.si_opaque.so_id);
	spin_unlock(&nn->s2s_cp_lock);
}

static struct nfs4_ol_stateid * nfs4_alloc_open_stateid(struct nfs4_client *clp)
{
	struct nfs4_stid *stid;
@@ -1827,6 +1857,8 @@ static struct nfs4_client *alloc_client(struct xdr_netobj name)
#ifdef CONFIG_NFSD_PNFS
	INIT_LIST_HEAD(&clp->cl_lo_states);
#endif
	INIT_LIST_HEAD(&clp->async_copies);
	spin_lock_init(&clp->async_lock);
	spin_lock_init(&clp->cl_lock);
	rpc_init_wait_queue(&clp->cl_cb_waitq, "Backchannel slot table");
	return clp;
@@ -1942,6 +1974,7 @@ __destroy_client(struct nfs4_client *clp)
		}
	}
	nfsd4_return_all_client_layouts(clp);
	nfsd4_shutdown_copy(clp);
	nfsd4_shutdown_callback(clp);
	if (clp->cl_cb_conn.cb_xprt)
		svc_xprt_put(clp->cl_cb_conn.cb_xprt);
@@ -2475,7 +2508,8 @@ static bool client_has_state(struct nfs4_client *clp)
		|| !list_empty(&clp->cl_lo_states)
#endif
		|| !list_empty(&clp->cl_delegations)
		|| !list_empty(&clp->cl_sessions);
		|| !list_empty(&clp->cl_sessions)
		|| !list_empty(&clp->async_copies);
}

__be32
@@ -7161,6 +7195,8 @@ static int nfs4_state_create_net(struct net *net)
	INIT_LIST_HEAD(&nn->close_lru);
	INIT_LIST_HEAD(&nn->del_recall_lru);
	spin_lock_init(&nn->client_lock);
	spin_lock_init(&nn->s2s_cp_lock);
	idr_init(&nn->s2s_cp_stateids);

	spin_lock_init(&nn->blocked_locks_lock);
	INIT_LIST_HEAD(&nn->blocked_locks_lru);
+17 −4
Original line number Diff line number Diff line
@@ -4231,15 +4231,27 @@ nfsd4_encode_layoutreturn(struct nfsd4_compoundres *resp, __be32 nfserr,
#endif /* CONFIG_NFSD_PNFS */

static __be32
nfsd42_encode_write_res(struct nfsd4_compoundres *resp, struct nfsd42_write_res *write)
nfsd42_encode_write_res(struct nfsd4_compoundres *resp,
		struct nfsd42_write_res *write, bool sync)
{
	__be32 *p;

	p = xdr_reserve_space(&resp->xdr, 4 + 8 + 4 + NFS4_VERIFIER_SIZE);
	p = xdr_reserve_space(&resp->xdr, 4);
	if (!p)
		return nfserr_resource;

	if (sync)
		*p++ = cpu_to_be32(0);
	else {
		__be32 nfserr;
		*p++ = cpu_to_be32(1);
		nfserr = nfsd4_encode_stateid(&resp->xdr, &write->cb_stateid);
		if (nfserr)
			return nfserr;
	}
	p = xdr_reserve_space(&resp->xdr, 8 + 4 + NFS4_VERIFIER_SIZE);
	if (!p)
		return nfserr_resource;

	p = xdr_encode_hyper(p, write->wr_bytes_written);
	*p++ = cpu_to_be32(write->wr_stable_how);
	p = xdr_encode_opaque_fixed(p, write->wr_verifier.data,
@@ -4253,7 +4265,8 @@ nfsd4_encode_copy(struct nfsd4_compoundres *resp, __be32 nfserr,
{
	__be32 *p;

	nfserr = nfsd42_encode_write_res(resp, &copy->cp_res);
	nfserr = nfsd42_encode_write_res(resp, &copy->cp_res,
			copy->cp_synchronous);
	if (nfserr)
		return nfserr;

+1 −0
Original line number Diff line number Diff line
@@ -1242,6 +1242,7 @@ static __net_init int nfsd_init_net(struct net *net)
	nn->somebody_reclaimed = false;
	nn->clverifier_counter = prandom_u32();
	nn->clientid_counter = prandom_u32();
	nn->s2s_cp_cl_id = nn->clientid_counter++;

	atomic_set(&nn->ntf_refcnt, 0);
	init_waitqueue_head(&nn->ntf_wq);
Loading