Commit d41c182e authored by Tom Burdick's avatar Tom Burdick Committed by Stephanos Ioannidis
Browse files

rtio: Make MPSC faster on non-smp systems



On non-smp systems where multiple cores aren't in play atomics aren't
really necessary and volatile can be used in stead.

Additionally marks the push function as ALWAYS_INLINE as I saw at times
it was not being inlined.

MPSC operation speed is crucial to the performance of rtio, these changes
provided a 30% throughput improvmement in the throughput test.

Signed-off-by: default avatarTom Burdick <thomas.burdick@intel.com>
parent fc32f1c0
Loading
Loading
Loading
Loading
+47 −15
Original line number Diff line number Diff line
@@ -5,12 +5,12 @@
 * SPDX-License-Identifier: Apache-2.0
 */


#ifndef ZEPHYR_RTIO_MPSC_H_
#define ZEPHYR_RTIO_MPSC_H_

#include <stdint.h>
#include <stdbool.h>
#include <zephyr/toolchain/common.h>
#include <zephyr/sys/atomic.h>
#include <zephyr/kernel.h>

@@ -25,6 +25,40 @@ extern "C" {
 * @{
 */

/*
 * On single core systems atomics are unnecessary
 * and cause a lot of unnecessary cache invalidation
 *
 * Using volatile to at least ensure memory is read/written
 * by the compiler generated op codes is enough.
 *
 * On SMP atomics *must* be used to ensure the pointers
 * are updated in the correct order and the values are
 * updated core caches correctly.
 */
#if defined(CONFIG_SMP)

typedef atomic_ptr_t mpsc_ptr_t;

#define mpsc_ptr_get(ptr)	   atomic_ptr_get(&(ptr))
#define mpsc_ptr_set(ptr, val)	   atomic_ptr_set(&(ptr), val)
#define mpsc_ptr_set_get(ptr, val) atomic_ptr_set(&(ptr), val)

#else

typedef struct rtio_mpsc_node *mpsc_ptr_t;

#define mpsc_ptr_get(ptr)      ptr
#define mpsc_ptr_set(ptr, val) ptr = val
#define mpsc_ptr_set_get(ptr, val)                                                                 \
	({                                                                                         \
		mpsc_ptr_t tmp = ptr;                                                              \
		ptr = val;                                                                         \
		tmp;                                                                               \
	})

#endif

/**
 * @file rtio_mpsc.h
 *
@@ -45,19 +79,18 @@ extern "C" {
 * @brief Queue member
 */
struct rtio_mpsc_node {
	atomic_ptr_t next;
	mpsc_ptr_t next;
};

/**
 * @brief MPSC Queue
 */
struct rtio_mpsc {
	atomic_ptr_t head;
	mpsc_ptr_t head;
	struct rtio_mpsc_node *tail;
	struct rtio_mpsc_node stub;
};


/**
 * @brief Static initializer for a mpsc queue
 *
@@ -81,9 +114,9 @@ struct rtio_mpsc {
 */
static inline void rtio_mpsc_init(struct rtio_mpsc *q)
{
	atomic_ptr_set(&q->head, &q->stub);
	mpsc_ptr_set(q->head, &q->stub);
	q->tail = &q->stub;
	atomic_ptr_set(&q->stub.next, NULL);
	mpsc_ptr_set(q->stub.next, NULL);
}

/**
@@ -92,16 +125,16 @@ static inline void rtio_mpsc_init(struct rtio_mpsc *q)
 * @param q Queue to push the node to
 * @param n Node to push into the queue
 */
static inline void rtio_mpsc_push(struct rtio_mpsc *q, struct rtio_mpsc_node *n)
static ALWAYS_INLINE void rtio_mpsc_push(struct rtio_mpsc *q, struct rtio_mpsc_node *n)
{
	struct rtio_mpsc_node *prev;
	int key;

	atomic_ptr_set(&n->next, NULL);
	mpsc_ptr_set(n->next, NULL);

	key = arch_irq_lock();
	prev = (struct rtio_mpsc_node *)atomic_ptr_set(&q->head, n);
	atomic_ptr_set(&prev->next, n);
	prev = (struct rtio_mpsc_node *)mpsc_ptr_set_get(q->head, n);
	mpsc_ptr_set(prev->next, n);
	arch_irq_unlock(key);
}

@@ -115,7 +148,7 @@ static inline struct rtio_mpsc_node *rtio_mpsc_pop(struct rtio_mpsc *q)
{
	struct rtio_mpsc_node *head;
	struct rtio_mpsc_node *tail = q->tail;
	struct rtio_mpsc_node *next = (struct rtio_mpsc_node *)atomic_ptr_get(&tail->next);
	struct rtio_mpsc_node *next = (struct rtio_mpsc_node *)mpsc_ptr_get(tail->next);

	/* Skip over the stub/sentinel */
	if (tail == &q->stub) {
@@ -125,7 +158,7 @@ static inline struct rtio_mpsc_node *rtio_mpsc_pop(struct rtio_mpsc *q)

		q->tail = next;
		tail = next;
		next = (struct rtio_mpsc_node *)atomic_ptr_get(&next->next);
		next = (struct rtio_mpsc_node *)mpsc_ptr_get(next->next);
	}

	/* If next is non-NULL then a valid node is found, return it */
@@ -134,7 +167,7 @@ static inline struct rtio_mpsc_node *rtio_mpsc_pop(struct rtio_mpsc *q)
		return tail;
	}

	head = (struct rtio_mpsc_node *)atomic_ptr_get(&q->head);
	head = (struct rtio_mpsc_node *)mpsc_ptr_get(q->head);

	/* If next is NULL, and the tail != HEAD then the queue has pending
	 * updates that can't yet be accessed.
@@ -145,7 +178,7 @@ static inline struct rtio_mpsc_node *rtio_mpsc_pop(struct rtio_mpsc *q)

	rtio_mpsc_push(q, &q->stub);

	next = (struct rtio_mpsc_node *)atomic_ptr_get(&tail->next);
	next = (struct rtio_mpsc_node *)mpsc_ptr_get(tail->next);

	if (next != NULL) {
		q->tail = next;
@@ -163,5 +196,4 @@ static inline struct rtio_mpsc_node *rtio_mpsc_pop(struct rtio_mpsc *q)
}
#endif


#endif /* ZEPHYR_RTIO_MPSC_H_ */
+7 −6
Original line number Diff line number Diff line
@@ -26,14 +26,15 @@ static struct rtio_mpsc_node push_pop_nodes[2];
ZTEST(rtio_mpsc, test_push_pop)
{

	struct rtio_mpsc_node *node, *head, *stub, *next, *tail;
	mpsc_ptr_t node, head;
	struct rtio_mpsc_node *stub, *next, *tail;

	rtio_mpsc_init(&push_pop_q);

	head = atomic_ptr_get(&push_pop_q.head);
	head = mpsc_ptr_get(push_pop_q.head);
	tail = push_pop_q.tail;
	stub = &push_pop_q.stub;
	next = atomic_ptr_get(&stub->next);
	next = stub->next;

	zassert_equal(head, stub, "Head should point at stub");
	zassert_equal(tail, stub, "Tail should point at stub");
@@ -44,12 +45,12 @@ ZTEST(rtio_mpsc, test_push_pop)

	rtio_mpsc_push(&push_pop_q, &push_pop_nodes[0]);

	head = atomic_ptr_get(&push_pop_q.head);
	head = mpsc_ptr_get(push_pop_q.head);

	zassert_equal(head, &push_pop_nodes[0], "Queue head should point at push_pop_node");
	next = atomic_ptr_get(&push_pop_nodes[0].next);
	next = mpsc_ptr_get(push_pop_nodes[0].next);
	zassert_is_null(next, NULL, "push_pop_node next should point at null");
	next = atomic_ptr_get(&push_pop_q.stub.next);
	next = mpsc_ptr_get(push_pop_q.stub.next);
	zassert_equal(next, &push_pop_nodes[0], "Queue stub should point at push_pop_node");
	tail = push_pop_q.tail;
	stub = &push_pop_q.stub;