Commit e4b10328 authored by Tom Burdick's avatar Tom Burdick Committed by Stephanos Ioannidis
Browse files

rtio: Use mpsc for submission and completion queue



Rather than the rings, which weren't shared between userspace and kernel
space in Zephyr like they are in Linux with io_uring, use atomic mpsc
queues for submission and completion queues.

Most importantly this removes a potential head of line blocker in the
submission queue as the sqe would be held until a task is completed.

As additional bonuses this avoids some additional locks and restrictions
about what can be submitted and where. It also removes the need for
two executors as all chains/transactions are done concurrently.

Lastly this opens up the possibility for a common pool of sqe's to
allocate from potentially saving lots of memory.

Signed-off-by: default avatarTom Burdick <thomas.burdick@intel.com>
parent 92f0b549
Loading
Loading
Loading
Loading
+37 −31
Original line number Diff line number Diff line
@@ -21,7 +21,6 @@ LOG_MODULE_REGISTER(spi_sam);
#include <zephyr/drivers/pinctrl.h>
#include <zephyr/drivers/clock_control/atmel_sam_pmc.h>
#include <zephyr/rtio/rtio.h>
#include <zephyr/rtio/rtio_executor_simple.h>
#include <zephyr/sys/__assert.h>
#include <zephyr/sys/util.h>
#include <soc.h>
@@ -55,8 +54,8 @@ struct spi_sam_data {
#ifdef CONFIG_SPI_RTIO
	struct rtio *r; /* context for thread calls */
	struct rtio_iodev iodev;
	struct rtio_iodev_sqe *iodev_sqe;
	struct rtio_sqe *sqe;
	struct rtio_iodev_sqe *txn_head;
	struct rtio_iodev_sqe *txn_curr;
	struct spi_dt_spec dt_spec;
#endif

@@ -304,7 +303,7 @@ static void dma_callback(const struct device *dma_dev, void *user_data,
	struct spi_sam_data *drv_data = dev->data;

#ifdef CONFIG_SPI_RTIO
	if (drv_data->iodev_sqe != NULL) {
	if (drv_data->txn_head != NULL) {
		spi_sam_iodev_complete(dev, status);
		return;
	}
@@ -323,7 +322,7 @@ static int spi_sam_dma_txrx(const struct device *dev,
	const struct spi_sam_config *drv_cfg = dev->config;
	struct spi_sam_data *drv_data = dev->data;
#ifdef CONFIG_SPI_RTIO
	bool blocking = drv_data->iodev_sqe == NULL;
	bool blocking = drv_data->txn_head == NULL;
#else
	bool blocking = true;
#endif
@@ -648,12 +647,13 @@ static bool spi_sam_is_regular(const struct spi_buf_set *tx_bufs,
#else

static void spi_sam_iodev_complete(const struct device *dev, int status);
static void spi_sam_iodev_next(const struct device *dev, bool completion);

static void spi_sam_iodev_start(const struct device *dev)
{
	const struct spi_sam_config *cfg = dev->config;
	struct spi_sam_data *data = dev->data;
	struct rtio_sqe *sqe = data->sqe;
	struct rtio_sqe *sqe = &data->txn_curr->sqe;
	int ret = 0;

	switch (sqe->op) {
@@ -671,9 +671,10 @@ static void spi_sam_iodev_start(const struct device *dev)
		break;
	default:
		LOG_ERR("Invalid op code %d for submission %p\n", sqe->op, (void *)sqe);
		rtio_iodev_sqe_err(data->iodev_sqe, -EINVAL);
		data->iodev_sqe = NULL;
		data->sqe = NULL;
		struct rtio_iodev_sqe *txn_head = data->txn_head;

		spi_sam_iodev_next(dev, true);
		rtio_iodev_sqe_err(txn_head, -EINVAL);
		ret = 0;
	}
	if (ret == 0) {
@@ -687,7 +688,7 @@ static void spi_sam_iodev_next(const struct device *dev, bool completion)

	k_spinlock_key_t key  = spi_spin_lock(dev);

	if (!completion && data->iodev_sqe != NULL) {
	if (!completion && data->txn_curr != NULL) {
		spi_spin_unlock(dev, key);
		return;
	}
@@ -697,17 +698,17 @@ static void spi_sam_iodev_next(const struct device *dev, bool completion)
	if (next != NULL) {
		struct rtio_iodev_sqe *next_sqe = CONTAINER_OF(next, struct rtio_iodev_sqe, q);

		data->iodev_sqe = next_sqe;
		data->sqe = (struct rtio_sqe *)next_sqe->sqe;
		data->txn_head = next_sqe;
		data->txn_curr = next_sqe;
	} else {
		data->iodev_sqe = NULL;
		data->sqe = NULL;
		data->txn_head = NULL;
		data->txn_curr = NULL;
	}

	spi_spin_unlock(dev, key);

	if (data->iodev_sqe != NULL) {
		struct spi_dt_spec *spi_dt_spec = data->sqe->iodev->data;
	if (data->txn_curr != NULL) {
		struct spi_dt_spec *spi_dt_spec = data->txn_curr->sqe.iodev->data;
		struct spi_config *spi_cfg = &spi_dt_spec->config;

		spi_sam_configure(dev, spi_cfg);
@@ -720,15 +721,15 @@ static void spi_sam_iodev_complete(const struct device *dev, int status)
{
	struct spi_sam_data *data = dev->data;

	if (data->sqe->flags & RTIO_SQE_TRANSACTION) {
		data->sqe = rtio_spsc_next(data->iodev_sqe->r->sq, data->sqe);
	if (data->txn_curr->sqe.flags & RTIO_SQE_TRANSACTION) {
		data->txn_curr = rtio_txn_next(data->txn_curr);
		spi_sam_iodev_start(dev);
	} else {
		struct rtio_iodev_sqe *iodev_sqe = data->iodev_sqe;
		struct rtio_iodev_sqe *txn_head = data->txn_head;

		spi_context_cs_control(&data->ctx, false);
		spi_sam_iodev_next(dev, true);
		rtio_iodev_sqe_ok(iodev_sqe, status);
		rtio_iodev_sqe_ok(txn_head, status);
	}
}

@@ -760,20 +761,27 @@ static int spi_sam_transceive(const struct device *dev,

	dt_spec->config = *config;

	sqe = spi_rtio_copy(data->r, &data->iodev, tx_bufs, rx_bufs);
	if (sqe == NULL) {
		err = -ENOMEM;
	int ret = spi_rtio_copy(data->r, &data->iodev, tx_bufs, rx_bufs, &sqe);

	if (ret < 0) {
		err = ret;
		goto done;
	}

	/* Submit request and wait */
	rtio_submit(data->r, 1);
	rtio_submit(data->r, ret);

	while (ret > 0) {
		cqe = rtio_cqe_consume(data->r);

		if (cqe->result < 0) {
			err = cqe->result;
		}

		rtio_cqe_release(data->r, cqe);

	rtio_cqe_release(data->r);
		ret--;
	}
#else
	const struct spi_sam_config *cfg = dev->config;

@@ -905,10 +913,8 @@ static const struct spi_driver_api spi_sam_driver_api = {
		COND_CODE_1(SPI_SAM_USE_DMA(n), (SPI_DMA_INIT(n)), ())				\
	}

#define SPI_SAM_RTIO_DEFINE(n)									\
	RTIO_EXECUTOR_SIMPLE_DEFINE(spi_sam_exec_##n);						\
	RTIO_DEFINE(spi_sam_rtio_##n, (struct rtio_executor *)&spi_sam_exec_##n,		\
		    CONFIG_SPI_SAM_RTIO_SQ_SIZE, 1)
#define SPI_SAM_RTIO_DEFINE(n) RTIO_DEFINE(spi_sam_rtio_##n, CONFIG_SPI_SAM_RTIO_SQ_SIZE,	\
					   CONFIG_SPI_SAM_RTIO_SQ_SIZE)

#define SPI_SAM_DEVICE_INIT(n)									\
	PINCTRL_DT_INST_DEFINE(n);								\
+24 −14
Original line number Diff line number Diff line
@@ -904,7 +904,7 @@ __deprecated static inline int spi_write_async(const struct device *dev,
 */
static inline void spi_iodev_submit(struct rtio_iodev_sqe *iodev_sqe)
{
	const struct spi_dt_spec *dt_spec = iodev_sqe->sqe->iodev->data;
	const struct spi_dt_spec *dt_spec = iodev_sqe->sqe.iodev->data;
	const struct device *dev = dt_spec->bus;
	const struct spi_driver_api *api = (const struct spi_driver_api *)dev->api;

@@ -944,19 +944,22 @@ static inline bool spi_is_ready_iodev(const struct rtio_iodev *spi_iodev)
/**
 * @brief Copy the tx_bufs and rx_bufs into a set of RTIO requests
 *
 * @param r RTIO context
 * @param tx_bufs Transmit buffer set
 * @param rx_bufs Receive buffer set
 * @param r rtio context
 * @param iodev iodev to transceive with
 * @param tx_bufs transmit buffer set
 * @param rx_bufs receive buffer set
 * @param sqe[out] Last sqe submitted, NULL if not enough memory
 *
 * @retval sqe Last submission in the queue added
 * @retval NULL Not enough memory in the context to copy the requests
 * @retval Number of submission queue entries
 * @retval -ENOMEM out of memory
 */
static inline struct rtio_sqe *spi_rtio_copy(struct rtio *r,
static inline int spi_rtio_copy(struct rtio *r,
				struct rtio_iodev *iodev,
				const struct spi_buf_set *tx_bufs,
					     const struct spi_buf_set *rx_bufs)
				const struct spi_buf_set *rx_bufs,
				struct rtio_sqe **last_sqe)
{
	struct rtio_sqe *sqe = NULL;
	int ret = 0;
	size_t tx_count = tx_bufs ? tx_bufs->count : 0;
	size_t rx_count = rx_bufs ? rx_bufs->count : 0;

@@ -964,6 +967,8 @@ static inline struct rtio_sqe *spi_rtio_copy(struct rtio *r,
	uint32_t rx = 0, rx_len = 0;
	uint8_t *tx_buf, *rx_buf;

	struct rtio_sqe *sqe = NULL;

	if (tx < tx_count) {
		tx_buf = tx_bufs->buffers[tx].buf;
		tx_len = tx_bufs->buffers[tx].len;
@@ -985,10 +990,13 @@ static inline struct rtio_sqe *spi_rtio_copy(struct rtio *r,
		sqe = rtio_sqe_acquire(r);

		if (sqe == NULL) {
			rtio_spsc_drop_all(r->sq);
			return NULL;
			ret = -ENOMEM;
			rtio_sqe_drop_all(r);
			goto out;
		}

		ret++;

		/* If tx/rx len are same, we can do a simple transceive */
		if (tx_len == rx_len) {
			if (tx_buf == NULL) {
@@ -1084,9 +1092,11 @@ static inline struct rtio_sqe *spi_rtio_copy(struct rtio *r,

	if (sqe != NULL) {
		sqe->flags = 0;
		*last_sqe = sqe;
	}

	return sqe;
out:
	return ret;
}

#endif /* CONFIG_SPI_RTIO */
+2 −0
Original line number Diff line number Diff line
@@ -115,6 +115,8 @@
#if defined(CONFIG_RTIO)
	ITERABLE_SECTION_RAM(rtio, 4)
	ITERABLE_SECTION_RAM(rtio_iodev, 4)
	ITERABLE_SECTION_RAM(rtio_sqe_pool, 4)
	ITERABLE_SECTION_RAM(rtio_cqe_pool, 4)
#endif /* CONFIG_RTIO */


+210 −215

File changed.

Preview size limit exceeded, changes collapsed.

+0 −120
Original line number Diff line number Diff line
/*
 * Copyright (c) 2022 Intel Corporation.
 *
 * SPDX-License-Identifier: Apache-2.0
 */
#ifndef ZEPHYR_INCLUDE_RTIO_RTIO_EXECUTOR_CONCURRENT_H_
#define ZEPHYR_INCLUDE_RTIO_RTIO_EXECUTOR_CONCURRENT_H_

#include <zephyr/rtio/rtio.h>
#include <zephyr/kernel.h>

#ifdef __cplusplus
extern "C" {
#endif

/**
 * @brief RTIO Concurrent Executor
 *
 * Provides a concurrent executor with a pointer overhead per task and a
 * 2 word overhead over the simple executor to know the order of tasks (fifo).
 *
 * @defgroup rtio_executor_concurrent RTIO concurrent Executor
 * @ingroup rtio
 * @{
 */


/**
 * @brief Submit to the concurrent executor
 *
 * @param r RTIO context to submit
 *
 * @retval 0 always succeeds
 */
int rtio_concurrent_submit(struct rtio *r);

/**
 * @brief Report a SQE has completed successfully
 *
 * @param sqe RTIO IODev SQE to report success
 * @param result Result of the SQE
 */
void rtio_concurrent_ok(struct rtio_iodev_sqe *sqe, int result);

/**
 * @brief Report a SQE has completed with error
 *
 * @param sqe RTIO IODev SQE to report success
 * @param result Result of the SQE
 */
void rtio_concurrent_err(struct rtio_iodev_sqe *sqe, int result);

/**
 * @brief Concurrent Executor
 *
 * Notably all values are effectively owned by each task with the exception
 * of task_in and task_out.
 */
struct rtio_concurrent_executor {
	struct rtio_executor ctx;

	/* Lock around the queues */
	struct k_spinlock lock;

	/* Task ring position and count */
	uint16_t task_in, task_out, task_mask;

	/* First pending sqe to start when a task becomes available */
	struct rtio_sqe *last_sqe;

	/* Array of task statuses */
	uint8_t *task_status;

	/* Array of struct rtio_iodev_sqe *'s one per task' */
	struct rtio_iodev_sqe *task_cur;
};

/**
 * @cond INTERNAL_HIDDEN
 */
static const struct rtio_executor_api z_rtio_concurrent_api = {
	.submit = rtio_concurrent_submit,
	.ok = rtio_concurrent_ok,
	.err = rtio_concurrent_err
};

/**
 * @endcond INTERNAL_HIDDEN
 */


/**
 * @brief Statically define and initialie a concurrent executor
 *
 * @param name Symbol name, must be unique in the context in which its used
 * @param concurrency Allowed concurrency (number of concurrent tasks).
 */
#define RTIO_EXECUTOR_CONCURRENT_DEFINE(name, concurrency)                                         \
	static struct rtio_iodev_sqe _task_cur_##name[(concurrency)];                              \
	uint8_t _task_status_##name[(concurrency)];                                                \
	static struct rtio_concurrent_executor name = {                                            \
		.ctx = { .api = &z_rtio_concurrent_api },                                          \
		.task_in = 0,                                                                      \
		.task_out = 0,                                                                     \
		.task_mask = (concurrency)-1,                                                      \
		.last_sqe = NULL,                                                                  \
		.task_status = _task_status_##name,                                                \
		.task_cur = _task_cur_##name,                                                      \
	};

/**
 * @}
 */

#ifdef __cplusplus
}
#endif


#endif /* ZEPHYR_INCLUDE_RTIO_RTIO_EXECUTOR_CONCURRENT_H_ */
Loading