Commit 80dd4c42 authored by Niu Yawei's avatar Niu Yawei Committed by Greg Kroah-Hartman
Browse files

staging/lustre/ptlrpc: track unreplied requests



The request xid was used to make sure the ost object timestamps
being updated by the out of order setattr/punch/write requests
properly. However, this mechanism is broken by the multiple rcvd
slot feature, where we deferred the xid assignment from request
packing to request sending.

This patch moved back the xid assignment to request packing, and
the manner of finding lowest unreplied xid is changed from scan
sending & delay list to scan a unreplied requests list.

This patch also skipped packing the known replied XID in connect
and disconnect request, so that we can make sure the known replied
XID is increased only on both server & client side.

Signed-off-by: default avatarNiu Yawei <yawei.niu@intel.com>
Reviewed-on: http://review.whamcloud.com/16759
Intel-bug-id: https://jira.hpdd.intel.com/browse/LU-5951


Reviewed-by: default avatarGregoire Pichon <gregoire.pichon@bull.net>
Reviewed-by: default avatarAlex Zhuravlev <alexey.zhuravlev@intel.com>
Signed-off-by: default avatarOleg Drokin <green@linuxhacker.ru>
Signed-off-by: default avatarGreg Kroah-Hartman <gregkh@linuxfoundation.org>
parent f5f4c80e
Loading
Loading
Loading
Loading
+5 −0
Original line number Diff line number Diff line
@@ -185,6 +185,11 @@ struct obd_import {
	struct list_head	       *imp_replay_cursor;
	/** @} */

	/** List of not replied requests */
	struct list_head	imp_unreplied_list;
	/** Known maximal replied XID */
	__u64			imp_known_replied_xid;

	/** obd device for this import */
	struct obd_device	*imp_obd;

+3 −0
Original line number Diff line number Diff line
@@ -596,6 +596,8 @@ struct ptlrpc_cli_req {
	union ptlrpc_async_args		 cr_async_args;
	/** Opaq data for replay and commit callbacks. */
	void				*cr_cb_data;
	/** Link to the imp->imp_unreplied_list */
	struct list_head		 cr_unreplied_list;
	/**
	 * Commit callback, called when request is committed and about to be
	 * freed.
@@ -635,6 +637,7 @@ struct ptlrpc_cli_req {
#define rq_interpret_reply	rq_cli.cr_reply_interp
#define rq_async_args		rq_cli.cr_async_args
#define rq_cb_data		rq_cli.cr_cb_data
#define rq_unreplied_list	rq_cli.cr_unreplied_list
#define rq_commit_cb		rq_cli.cr_commit_cb
#define rq_replay_cb		rq_cli.cr_replay_cb

+2 −0
Original line number Diff line number Diff line
@@ -907,6 +907,8 @@ struct obd_import *class_new_import(struct obd_device *obd)
	INIT_LIST_HEAD(&imp->imp_sending_list);
	INIT_LIST_HEAD(&imp->imp_delayed_list);
	INIT_LIST_HEAD(&imp->imp_committed_list);
	INIT_LIST_HEAD(&imp->imp_unreplied_list);
	imp->imp_known_replied_xid = 0;
	imp->imp_replay_cursor = &imp->imp_committed_list;
	spin_lock_init(&imp->imp_lock);
	imp->imp_last_success_conn = 0;
+81 −31
Original line number Diff line number Diff line
@@ -652,6 +652,42 @@ static void __ptlrpc_free_req_to_pool(struct ptlrpc_request *request)
	spin_unlock(&pool->prp_lock);
}

void ptlrpc_add_unreplied(struct ptlrpc_request *req)
{
	struct obd_import	*imp = req->rq_import;
	struct list_head	*tmp;
	struct ptlrpc_request	*iter;

	assert_spin_locked(&imp->imp_lock);
	LASSERT(list_empty(&req->rq_unreplied_list));

	/* unreplied list is sorted by xid in ascending order */
	list_for_each_prev(tmp, &imp->imp_unreplied_list) {
		iter = list_entry(tmp, struct ptlrpc_request,
				  rq_unreplied_list);

		LASSERT(req->rq_xid != iter->rq_xid);
		if (req->rq_xid < iter->rq_xid)
			continue;
		list_add(&req->rq_unreplied_list, &iter->rq_unreplied_list);
		return;
	}
	list_add(&req->rq_unreplied_list, &imp->imp_unreplied_list);
}

void ptlrpc_assign_next_xid_nolock(struct ptlrpc_request *req)
{
	req->rq_xid = ptlrpc_next_xid();
	ptlrpc_add_unreplied(req);
}

static inline void ptlrpc_assign_next_xid(struct ptlrpc_request *req)
{
	spin_lock(&req->rq_import->imp_lock);
	ptlrpc_assign_next_xid_nolock(req);
	spin_unlock(&req->rq_import->imp_lock);
}

int ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
			     __u32 version, int opcode, char **bufs,
			     struct ptlrpc_cli_ctx *ctx)
@@ -701,6 +737,7 @@ int ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
	ptlrpc_at_set_req_timeout(request);

	lustre_msg_set_opc(request->rq_reqmsg, opcode);
	ptlrpc_assign_next_xid(request);

	/* Let's setup deadline for req/reply/bulk unlink for opcode. */
	if (cfs_fail_val == opcode) {
@@ -1230,6 +1267,24 @@ static void ptlrpc_save_versions(struct ptlrpc_request *req)
	       versions[0], versions[1]);
}

__u64 ptlrpc_known_replied_xid(struct obd_import *imp)
{
	struct ptlrpc_request *req;

	assert_spin_locked(&imp->imp_lock);
	if (list_empty(&imp->imp_unreplied_list))
		return 0;

	req = list_entry(imp->imp_unreplied_list.next, struct ptlrpc_request,
			 rq_unreplied_list);
	LASSERTF(req->rq_xid >= 1, "XID:%llu\n", req->rq_xid);

	if (imp->imp_known_replied_xid < req->rq_xid - 1)
		imp->imp_known_replied_xid = req->rq_xid - 1;

	return req->rq_xid - 1;
}

/**
 * Callback function called when client receives RPC reply for \a req.
 * Returns 0 on success or error code.
@@ -1317,6 +1372,11 @@ static int after_reply(struct ptlrpc_request *req)
		else
			req->rq_sent = now + req->rq_nr_resend;

		/* Resend for EINPROGRESS will use a new XID */
		spin_lock(&imp->imp_lock);
		list_del_init(&req->rq_unreplied_list);
		spin_unlock(&imp->imp_lock);

		return 0;
	}

@@ -1430,8 +1490,7 @@ static int after_reply(struct ptlrpc_request *req)
static int ptlrpc_send_new_req(struct ptlrpc_request *req)
{
	struct obd_import *imp = req->rq_import;
	struct list_head *tmp;
	u64 min_xid = ~0ULL;
	u64 min_xid = 0;
	int rc;

	LASSERT(req->rq_phase == RQ_PHASE_NEW);
@@ -1451,17 +1510,8 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req)

	spin_lock(&imp->imp_lock);

	/*
	 * the very first time we assign XID. it's important to assign XID
	 * and put it on the list atomically, so that the lowest assigned
	 * XID is always known. this is vital for multislot last_rcvd
	 */
	if (req->rq_send_state == LUSTRE_IMP_REPLAY) {
	LASSERT(req->rq_xid);
	} else {
		LASSERT(!req->rq_xid);
		req->rq_xid = ptlrpc_next_xid();
	}
	LASSERT(!list_empty(&req->rq_unreplied_list));

	if (!req->rq_generation_set)
		req->rq_import_generation = imp->imp_generation;
@@ -1493,25 +1543,23 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req)
	list_add_tail(&req->rq_list, &imp->imp_sending_list);
	atomic_inc(&req->rq_import->imp_inflight);

	/* find the lowest unreplied XID */
	list_for_each(tmp, &imp->imp_delayed_list) {
		struct ptlrpc_request *r;

		r = list_entry(tmp, struct ptlrpc_request, rq_list);
		if (r->rq_xid < min_xid)
			min_xid = r->rq_xid;
	}
	list_for_each(tmp, &imp->imp_sending_list) {
		struct ptlrpc_request *r;

		r = list_entry(tmp, struct ptlrpc_request, rq_list);
		if (r->rq_xid < min_xid)
			min_xid = r->rq_xid;
	}
	/* find the known replied XID from the unreplied list, CONNECT
	 * and DISCONNECT requests are skipped to make the sanity check
	 * on server side happy. see process_req_last_xid().
	 *
	 * For CONNECT: Because replay requests have lower XID, it'll
	 * break the sanity check if CONNECT bump the exp_last_xid on
	 * server.
	 *
	 * For DISCONNECT: Since client will abort inflight RPC before
	 * sending DISCONNECT, DISCONNECT may carry an XID which higher
	 * than the inflight RPC.
	 */
	if (!ptlrpc_req_is_connect(req) && !ptlrpc_req_is_disconnect(req))
		min_xid = ptlrpc_known_replied_xid(imp);
	spin_unlock(&imp->imp_lock);

	if (likely(min_xid != ~0ULL))
		lustre_msg_set_last_xid(req->rq_reqmsg, min_xid - 1);
	lustre_msg_set_last_xid(req->rq_reqmsg, min_xid);

	lustre_msg_set_status(req->rq_reqmsg, current_pid());

@@ -1956,6 +2004,7 @@ interpret:
			list_del_init(&req->rq_list);
			atomic_dec(&imp->imp_inflight);
		}
		list_del_init(&req->rq_unreplied_list);
		spin_unlock(&imp->imp_lock);

		atomic_dec(&set->set_remaining);
@@ -2353,6 +2402,7 @@ static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
		if (!locked)
			spin_lock(&request->rq_import->imp_lock);
		list_del_init(&request->rq_replay_list);
		list_del_init(&request->rq_unreplied_list);
		if (!locked)
			spin_unlock(&request->rq_import->imp_lock);
	}
@@ -3060,7 +3110,7 @@ void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req)

	LASSERT(bd);

	if (!req->rq_resend || req->rq_nr_resend) {
	if (!req->rq_resend) {
		/* this request has a new xid, just use it as bulk matchbits */
		req->rq_mbits = req->rq_xid;

+34 −0
Original line number Diff line number Diff line
@@ -903,6 +903,39 @@ static int ptlrpc_connect_set_flags(struct obd_import *imp,
	return 0;
}

/**
 * Add all replay requests back to unreplied list before start replay,
 * so that we can make sure the known replied XID is always increased
 * only even if when replaying requests.
 */
static void ptlrpc_prepare_replay(struct obd_import *imp)
{
	struct ptlrpc_request *req;

	if (imp->imp_state != LUSTRE_IMP_REPLAY ||
	    imp->imp_resend_replay)
		return;

	/*
	 * If the server was restart during repaly, the requests may
	 * have been added to the unreplied list in former replay.
	 */
	spin_lock(&imp->imp_lock);

	list_for_each_entry(req, &imp->imp_committed_list, rq_replay_list) {
		if (list_empty(&req->rq_unreplied_list))
			ptlrpc_add_unreplied(req);
	}

	list_for_each_entry(req, &imp->imp_replay_list, rq_replay_list) {
		if (list_empty(&req->rq_unreplied_list))
			ptlrpc_add_unreplied(req);
	}

	imp->imp_known_replied_xid = ptlrpc_known_replied_xid(imp);
	spin_unlock(&imp->imp_lock);
}

/**
 * interpret_reply callback for connect RPCs.
 * Looks into returned status of connect operation and decides
@@ -1154,6 +1187,7 @@ static int ptlrpc_connect_interpret(const struct lu_env *env,
	}

finish:
	ptlrpc_prepare_replay(imp);
	rc = ptlrpc_import_recovery_state_machine(imp);
	if (rc == -ENOTCONN) {
		CDEBUG(D_HA, "evicted/aborted by %s@%s during recovery; invalidating and reconnecting\n",
Loading