Commit 34c88174 authored by Uladzislau Rezki (Sony)'s avatar Uladzislau Rezki (Sony) Committed by Paul E. McKenney
Browse files

rcu: Support kfree_bulk() interface in kfree_rcu()



The kfree_rcu() logic can be improved further by using kfree_bulk()
interface along with "basic batching support" introduced earlier.

The are at least two advantages of using "bulk" interface:
- in case of large number of kfree_rcu() requests kfree_bulk()
  reduces the per-object overhead caused by calling kfree()
  per-object.

- reduces the number of cache-misses due to "pointer chasing"
  between objects which can be far spread between each other.

This approach defines a new kfree_rcu_bulk_data structure that
stores pointers in an array with a specific size. Number of entries
in that array depends on PAGE_SIZE making kfree_rcu_bulk_data
structure to be exactly one page.

Since it deals with "block-chain" technique there is an extra
need in dynamic allocation when a new block is required. Memory
is allocated with GFP_NOWAIT | __GFP_NOWARN flags, i.e. that
allows to skip direct reclaim under low memory condition to
prevent stalling and fails silently under high memory pressure.

The "emergency path" gets maintained when a system is run out of
memory. In that case objects are linked into regular list.

The "rcuperf" was run to analyze this change in terms of memory
consumption and kfree_bulk() throughput.

1) Testing on the Intel(R) Xeon(R) W-2135 CPU @ 3.70GHz, 12xCPUs
with following parameters:

kfree_loops=200000 kfree_alloc_num=1000 kfree_rcu_test=1 kfree_vary_obj_size=1
dev.2020.01.10a branch

Default / CONFIG_SLAB
53607352517 ns, loops: 200000, batches: 1885, memory footprint: 1248MB
53529637912 ns, loops: 200000, batches: 1921, memory footprint: 1193MB
53570175705 ns, loops: 200000, batches: 1929, memory footprint: 1250MB

Patch / CONFIG_SLAB
23981587315 ns, loops: 200000, batches: 810, memory footprint: 1219MB
23879375281 ns, loops: 200000, batches: 822, memory footprint: 1190MB
24086841707 ns, loops: 200000, batches: 794, memory footprint: 1380MB

Default / CONFIG_SLUB
51291025022 ns, loops: 200000, batches: 1713, memory footprint: 741MB
51278911477 ns, loops: 200000, batches: 1671, memory footprint: 719MB
51256183045 ns, loops: 200000, batches: 1719, memory footprint: 647MB

Patch / CONFIG_SLUB
50709919132 ns, loops: 200000, batches: 1618, memory footprint: 456MB
50736297452 ns, loops: 200000, batches: 1633, memory footprint: 507MB
50660403893 ns, loops: 200000, batches: 1628, memory footprint: 429MB

in case of CONFIG_SLAB there is double increase in performance and
slightly higher memory usage. As for CONFIG_SLUB, the performance
figures are better together with lower memory usage.

2) Testing on the HiKey-960, arm64, 8xCPUs with below parameters:

CONFIG_SLAB=y
kfree_loops=200000 kfree_alloc_num=1000 kfree_rcu_test=1

102898760401 ns, loops: 200000, batches: 5822, memory footprint: 158MB
89947009882  ns, loops: 200000, batches: 6715, memory footprint: 115MB

rcuperf shows approximately ~12% better throughput in case of
using "bulk" interface. The "drain logic" or its RCU callback
does the work faster that leads to better throughput.

Signed-off-by: default avatarUladzislau Rezki (Sony) <urezki@gmail.com>
Tested-by: default avatarJoel Fernandes (Google) <joel@joelfernandes.org>
Signed-off-by: default avatarPaul E. McKenney <paulmck@kernel.org>
parent bb6d3fb3
Loading
Loading
Loading
Loading
+169 −35
Original line number Diff line number Diff line
@@ -2689,22 +2689,47 @@ EXPORT_SYMBOL_GPL(call_rcu);
#define KFREE_DRAIN_JIFFIES (HZ / 50)
#define KFREE_N_BATCHES 2

/*
 * This macro defines how many entries the "records" array
 * will contain. It is based on the fact that the size of
 * kfree_rcu_bulk_data structure becomes exactly one page.
 */
#define KFREE_BULK_MAX_ENTR ((PAGE_SIZE / sizeof(void *)) - 3)

/**
 * struct kfree_rcu_bulk_data - single block to store kfree_rcu() pointers
 * @nr_records: Number of active pointers in the array
 * @records: Array of the kfree_rcu() pointers
 * @next: Next bulk object in the block chain
 * @head_free_debug: For debug, when CONFIG_DEBUG_OBJECTS_RCU_HEAD is set
 */
struct kfree_rcu_bulk_data {
	unsigned long nr_records;
	void *records[KFREE_BULK_MAX_ENTR];
	struct kfree_rcu_bulk_data *next;
	struct rcu_head *head_free_debug;
};

/**
 * struct kfree_rcu_cpu_work - single batch of kfree_rcu() requests
 * @rcu_work: Let queue_rcu_work() invoke workqueue handler after grace period
 * @head_free: List of kfree_rcu() objects waiting for a grace period
 * @bhead_free: Bulk-List of kfree_rcu() objects waiting for a grace period
 * @krcp: Pointer to @kfree_rcu_cpu structure
 */

struct kfree_rcu_cpu_work {
	struct rcu_work rcu_work;
	struct rcu_head *head_free;
	struct kfree_rcu_bulk_data *bhead_free;
	struct kfree_rcu_cpu *krcp;
};

/**
 * struct kfree_rcu_cpu - batch up kfree_rcu() requests for RCU grace period
 * @head: List of kfree_rcu() objects not yet waiting for a grace period
 * @bhead: Bulk-List of kfree_rcu() objects not yet waiting for a grace period
 * @bcached: Keeps at most one object for later reuse when build chain blocks
 * @krw_arr: Array of batches of kfree_rcu() objects waiting for a grace period
 * @lock: Synchronize access to this structure
 * @monitor_work: Promote @head to @head_free after KFREE_DRAIN_JIFFIES
@@ -2718,6 +2743,8 @@ struct kfree_rcu_cpu_work {
 */
struct kfree_rcu_cpu {
	struct rcu_head *head;
	struct kfree_rcu_bulk_data *bhead;
	struct kfree_rcu_bulk_data *bcached;
	struct kfree_rcu_cpu_work krw_arr[KFREE_N_BATCHES];
	spinlock_t lock;
	struct delayed_work monitor_work;
@@ -2727,14 +2754,24 @@ struct kfree_rcu_cpu {

static DEFINE_PER_CPU(struct kfree_rcu_cpu, krc);

static __always_inline void
debug_rcu_head_unqueue_bulk(struct rcu_head *head)
{
#ifdef CONFIG_DEBUG_OBJECTS_RCU_HEAD
	for (; head; head = head->next)
		debug_rcu_head_unqueue(head);
#endif
}

/*
 * This function is invoked in workqueue context after a grace period.
 * It frees all the objects queued on ->head_free.
 * It frees all the objects queued on ->bhead_free or ->head_free.
 */
static void kfree_rcu_work(struct work_struct *work)
{
	unsigned long flags;
	struct rcu_head *head, *next;
	struct kfree_rcu_bulk_data *bhead, *bnext;
	struct kfree_rcu_cpu *krcp;
	struct kfree_rcu_cpu_work *krwp;

@@ -2744,22 +2781,41 @@ static void kfree_rcu_work(struct work_struct *work)
	spin_lock_irqsave(&krcp->lock, flags);
	head = krwp->head_free;
	krwp->head_free = NULL;
	bhead = krwp->bhead_free;
	krwp->bhead_free = NULL;
	spin_unlock_irqrestore(&krcp->lock, flags);

	// List "head" is now private, so traverse locklessly.
	/* "bhead" is now private, so traverse locklessly. */
	for (; bhead; bhead = bnext) {
		bnext = bhead->next;

		debug_rcu_head_unqueue_bulk(bhead->head_free_debug);

		rcu_lock_acquire(&rcu_callback_map);
		kfree_bulk(bhead->nr_records, bhead->records);
		rcu_lock_release(&rcu_callback_map);

		if (cmpxchg(&krcp->bcached, NULL, bhead))
			free_page((unsigned long) bhead);

		cond_resched_tasks_rcu_qs();
	}

	/*
	 * Emergency case only. It can happen under low memory
	 * condition when an allocation gets failed, so the "bulk"
	 * path can not be temporary maintained.
	 */
	for (; head; head = next) {
		unsigned long offset = (unsigned long)head->func;

		next = head->next;
		// Potentially optimize with kfree_bulk in future.
		debug_rcu_head_unqueue(head);
		rcu_lock_acquire(&rcu_callback_map);
		trace_rcu_invoke_kfree_callback(rcu_state.name, head, offset);

		if (!WARN_ON_ONCE(!__is_kfree_rcu_offset(offset))) {
			/* Could be optimized with kfree_bulk() in future. */
		if (!WARN_ON_ONCE(!__is_kfree_rcu_offset(offset)))
			kfree((void *)head - offset);
		}

		rcu_lock_release(&rcu_callback_map);
		cond_resched_tasks_rcu_qs();
@@ -2774,26 +2830,48 @@ static void kfree_rcu_work(struct work_struct *work)
 */
static inline bool queue_kfree_rcu_work(struct kfree_rcu_cpu *krcp)
{
	struct kfree_rcu_cpu_work *krwp;
	bool queued = false;
	int i;
	struct kfree_rcu_cpu_work *krwp = NULL;

	lockdep_assert_held(&krcp->lock);
	for (i = 0; i < KFREE_N_BATCHES; i++)
		if (!krcp->krw_arr[i].head_free) {

	for (i = 0; i < KFREE_N_BATCHES; i++) {
		krwp = &(krcp->krw_arr[i]);
			break;
		}

	// If a previous RCU batch is in progress, we cannot immediately
	// queue another one, so return false to tell caller to retry.
	if (!krwp)
		return false;
		/*
		 * Try to detach bhead or head and attach it over any
		 * available corresponding free channel. It can be that
		 * a previous RCU batch is in progress, it means that
		 * immediately to queue another one is not possible so
		 * return false to tell caller to retry.
		 */
		if ((krcp->bhead && !krwp->bhead_free) ||
				(krcp->head && !krwp->head_free)) {
			/* Channel 1. */
			if (!krwp->bhead_free) {
				krwp->bhead_free = krcp->bhead;
				krcp->bhead = NULL;
			}

			/* Channel 2. */
			if (!krwp->head_free) {
				krwp->head_free = krcp->head;
				krcp->head = NULL;
	INIT_RCU_WORK(&krwp->rcu_work, kfree_rcu_work);
			}

			/*
			 * One work is per one batch, so there are two "free channels",
			 * "bhead_free" and "head_free" the batch can handle. It can be
			 * that the work is in the pending state when two channels have
			 * been detached following each other, one by one.
			 */
			queue_rcu_work(system_wq, &krwp->rcu_work);
	return true;
			queued = true;
		}
	}

	return queued;
}

static inline void kfree_rcu_drain_unlock(struct kfree_rcu_cpu *krcp,
@@ -2830,19 +2908,65 @@ static void kfree_rcu_monitor(struct work_struct *work)
		spin_unlock_irqrestore(&krcp->lock, flags);
}

static inline bool
kfree_call_rcu_add_ptr_to_bulk(struct kfree_rcu_cpu *krcp,
	struct rcu_head *head, rcu_callback_t func)
{
	struct kfree_rcu_bulk_data *bnode;

	if (unlikely(!krcp->initialized))
		return false;

	lockdep_assert_held(&krcp->lock);

	/* Check if a new block is required. */
	if (!krcp->bhead ||
			krcp->bhead->nr_records == KFREE_BULK_MAX_ENTR) {
		bnode = xchg(&krcp->bcached, NULL);
		if (!bnode) {
			WARN_ON_ONCE(sizeof(struct kfree_rcu_bulk_data) > PAGE_SIZE);

			bnode = (struct kfree_rcu_bulk_data *)
				__get_free_page(GFP_NOWAIT | __GFP_NOWARN);
		}

		/* Switch to emergency path. */
		if (unlikely(!bnode))
			return false;

		/* Initialize the new block. */
		bnode->nr_records = 0;
		bnode->next = krcp->bhead;
		bnode->head_free_debug = NULL;

		/* Attach it to the head. */
		krcp->bhead = bnode;
	}

#ifdef CONFIG_DEBUG_OBJECTS_RCU_HEAD
	head->func = func;
	head->next = krcp->bhead->head_free_debug;
	krcp->bhead->head_free_debug = head;
#endif

	/* Finally insert. */
	krcp->bhead->records[krcp->bhead->nr_records++] =
		(void *) head - (unsigned long) func;

	return true;
}

/*
 * Queue a request for lazy invocation of kfree() after a grace period.
 * Queue a request for lazy invocation of kfree_bulk()/kfree() after a grace
 * period. Please note there are two paths are maintained, one is the main one
 * that uses kfree_bulk() interface and second one is emergency one, that is
 * used only when the main path can not be maintained temporary, due to memory
 * pressure.
 *
 * Each kfree_call_rcu() request is added to a batch. The batch will be drained
 * every KFREE_DRAIN_JIFFIES number of jiffies. All the objects in the batch
 * will be kfree'd in workqueue context. This allows us to:
 *
 * 1.	Batch requests together to reduce the number of grace periods during
 *	heavy kfree_rcu() load.
 *
 * 2.	It makes it possible to use kfree_bulk() on a large number of
 *	kfree_rcu() requests thus reducing cache misses and the per-object
 *	overhead of kfree().
 * every KFREE_DRAIN_JIFFIES number of jiffies. All the objects in the batch will
 * be free'd in workqueue context. This allows us to: batch requests together to
 * reduce the number of grace periods during heavy kfree_rcu() load.
 */
void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
{
@@ -2861,9 +2985,16 @@ void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
			  __func__, head);
		goto unlock_return;
	}

	/*
	 * Under high memory pressure GFP_NOWAIT can fail,
	 * in that case the emergency path is maintained.
	 */
	if (unlikely(!kfree_call_rcu_add_ptr_to_bulk(krcp, head, func))) {
		head->func = func;
		head->next = krcp->head;
		krcp->head = head;
	}

	// Set timer to drain after KFREE_DRAIN_JIFFIES.
	if (rcu_scheduler_active == RCU_SCHEDULER_RUNNING &&
@@ -3769,8 +3900,11 @@ static void __init kfree_rcu_batch_init(void)
		struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu);

		spin_lock_init(&krcp->lock);
		for (i = 0; i < KFREE_N_BATCHES; i++)
		for (i = 0; i < KFREE_N_BATCHES; i++) {
			INIT_RCU_WORK(&krcp->krw_arr[i].rcu_work, kfree_rcu_work);
			krcp->krw_arr[i].krcp = krcp;
		}

		INIT_DELAYED_WORK(&krcp->monitor_work, kfree_rcu_monitor);
		krcp->initialized = true;
	}