Commit 7beaf243 authored by Srinivas Eeda's avatar Srinivas Eeda Committed by Joel Becker
Browse files

ocfs2 fix o2dlm dlm run purgelist (rev 3)



This patch fixes two problems in dlm_run_purgelist

1. If a lockres is found to be in use, dlm_run_purgelist keeps trying to purge
the same lockres instead of trying the next lockres.

2. When a lockres is found unused, dlm_run_purgelist releases lockres spinlock
before setting DLM_LOCK_RES_DROPPING_REF and calls dlm_purge_lockres.
spinlock is reacquired but in this window lockres can get reused. This leads
to BUG.

This patch modifies dlm_run_purgelist to skip lockres if it's in use and purge
 next lockres. It also sets DLM_LOCK_RES_DROPPING_REF before releasing the
lockres spinlock protecting it from getting reused.

Signed-off-by: default avatarSrinivas Eeda <srinivas.eeda@oracle.com>
Acked-by: default avatarSunil Mushran <sunil.mushran@oracle.com>
Cc: stable@kernel.org
Signed-off-by: default avatarJoel Becker <joel.becker@oracle.com>
parent 6d98c3cc
Loading
Loading
Loading
Loading
+34 −46
Original line number Original line Diff line number Diff line
@@ -152,45 +152,25 @@ void dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
	spin_unlock(&dlm->spinlock);
	spin_unlock(&dlm->spinlock);
}
}


static int dlm_purge_lockres(struct dlm_ctxt *dlm,
static void dlm_purge_lockres(struct dlm_ctxt *dlm,
			     struct dlm_lock_resource *res)
			     struct dlm_lock_resource *res)
{
{
	int master;
	int master;
	int ret = 0;
	int ret = 0;


	spin_lock(&res->spinlock);
	assert_spin_locked(&dlm->spinlock);
	if (!__dlm_lockres_unused(res)) {
	assert_spin_locked(&res->spinlock);
		mlog(0, "%s:%.*s: tried to purge but not unused\n",
		     dlm->name, res->lockname.len, res->lockname.name);
		__dlm_print_one_lock_resource(res);
		spin_unlock(&res->spinlock);
		BUG();
	}

	if (res->state & DLM_LOCK_RES_MIGRATING) {
		mlog(0, "%s:%.*s: Delay dropref as this lockres is "
		     "being remastered\n", dlm->name, res->lockname.len,
		     res->lockname.name);
		/* Re-add the lockres to the end of the purge list */
		if (!list_empty(&res->purge)) {
			list_del_init(&res->purge);
			list_add_tail(&res->purge, &dlm->purge_list);
		}
		spin_unlock(&res->spinlock);
		return 0;
	}


	master = (res->owner == dlm->node_num);
	master = (res->owner == dlm->node_num);


	if (!master)
		res->state |= DLM_LOCK_RES_DROPPING_REF;
	spin_unlock(&res->spinlock);


	mlog(0, "purging lockres %.*s, master = %d\n", res->lockname.len,
	mlog(0, "purging lockres %.*s, master = %d\n", res->lockname.len,
	     res->lockname.name, master);
	     res->lockname.name, master);


	if (!master) {
	if (!master) {
		res->state |= DLM_LOCK_RES_DROPPING_REF;
		/* drop spinlock...  retake below */
		/* drop spinlock...  retake below */
		spin_unlock(&res->spinlock);
		spin_unlock(&dlm->spinlock);
		spin_unlock(&dlm->spinlock);


		spin_lock(&res->spinlock);
		spin_lock(&res->spinlock);
@@ -208,31 +188,35 @@ static int dlm_purge_lockres(struct dlm_ctxt *dlm,
		mlog(0, "%s:%.*s: dlm_deref_lockres returned %d\n",
		mlog(0, "%s:%.*s: dlm_deref_lockres returned %d\n",
		     dlm->name, res->lockname.len, res->lockname.name, ret);
		     dlm->name, res->lockname.len, res->lockname.name, ret);
		spin_lock(&dlm->spinlock);
		spin_lock(&dlm->spinlock);
		spin_lock(&res->spinlock);
	}
	}


	spin_lock(&res->spinlock);
	if (!list_empty(&res->purge)) {
	if (!list_empty(&res->purge)) {
		mlog(0, "removing lockres %.*s:%p from purgelist, "
		mlog(0, "removing lockres %.*s:%p from purgelist, "
		     "master = %d\n", res->lockname.len, res->lockname.name,
		     "master = %d\n", res->lockname.len, res->lockname.name,
		     res, master);
		     res, master);
		list_del_init(&res->purge);
		list_del_init(&res->purge);
		spin_unlock(&res->spinlock);
		dlm_lockres_put(res);
		dlm_lockres_put(res);
		dlm->purge_count--;
		dlm->purge_count--;
	} else
	}
		spin_unlock(&res->spinlock);

	if (!__dlm_lockres_unused(res)) {
		mlog(ML_ERROR, "found lockres %s:%.*s: in use after deref\n",
		     dlm->name, res->lockname.len, res->lockname.name);
		__dlm_print_one_lock_resource(res);
		BUG();
	}


	__dlm_unhash_lockres(res);
	__dlm_unhash_lockres(res);


	/* lockres is not in the hash now.  drop the flag and wake up
	/* lockres is not in the hash now.  drop the flag and wake up
	 * any processes waiting in dlm_get_lock_resource. */
	 * any processes waiting in dlm_get_lock_resource. */
	if (!master) {
	if (!master) {
		spin_lock(&res->spinlock);
		res->state &= ~DLM_LOCK_RES_DROPPING_REF;
		res->state &= ~DLM_LOCK_RES_DROPPING_REF;
		spin_unlock(&res->spinlock);
		spin_unlock(&res->spinlock);
		wake_up(&res->wq);
		wake_up(&res->wq);
	}
	} else
	return 0;
		spin_unlock(&res->spinlock);
}
}


static void dlm_run_purge_list(struct dlm_ctxt *dlm,
static void dlm_run_purge_list(struct dlm_ctxt *dlm,
@@ -251,17 +235,7 @@ static void dlm_run_purge_list(struct dlm_ctxt *dlm,
		lockres = list_entry(dlm->purge_list.next,
		lockres = list_entry(dlm->purge_list.next,
				     struct dlm_lock_resource, purge);
				     struct dlm_lock_resource, purge);


		/* Status of the lockres *might* change so double
		 * check. If the lockres is unused, holding the dlm
		 * spinlock will prevent people from getting and more
		 * refs on it -- there's no need to keep the lockres
		 * spinlock. */
		spin_lock(&lockres->spinlock);
		spin_lock(&lockres->spinlock);
		unused = __dlm_lockres_unused(lockres);
		spin_unlock(&lockres->spinlock);

		if (!unused)
			continue;


		purge_jiffies = lockres->last_used +
		purge_jiffies = lockres->last_used +
			msecs_to_jiffies(DLM_PURGE_INTERVAL_MS);
			msecs_to_jiffies(DLM_PURGE_INTERVAL_MS);
@@ -273,15 +247,29 @@ static void dlm_run_purge_list(struct dlm_ctxt *dlm,
			 * in tail order, we can stop at the first
			 * in tail order, we can stop at the first
			 * unpurgable resource -- anyone added after
			 * unpurgable resource -- anyone added after
			 * him will have a greater last_used value */
			 * him will have a greater last_used value */
			spin_unlock(&lockres->spinlock);
			break;
			break;
		}
		}


		/* Status of the lockres *might* change so double
		 * check. If the lockres is unused, holding the dlm
		 * spinlock will prevent people from getting and more
		 * refs on it. */
		unused = __dlm_lockres_unused(lockres);
		if (!unused ||
		    (lockres->state & DLM_LOCK_RES_MIGRATING)) {
			mlog(0, "lockres %s:%.*s: is in use or "
			     "being remastered, used %d, state %d\n",
			     dlm->name, lockres->lockname.len,
			     lockres->lockname.name, !unused, lockres->state);
			list_move_tail(&dlm->purge_list, &lockres->purge);
			spin_unlock(&lockres->spinlock);
			continue;
		}

		dlm_lockres_get(lockres);
		dlm_lockres_get(lockres);


		/* This may drop and reacquire the dlm spinlock if it
		dlm_purge_lockres(dlm, lockres);
		 * has to do migration. */
		if (dlm_purge_lockres(dlm, lockres))
			BUG();


		dlm_lockres_put(lockres);
		dlm_lockres_put(lockres);