Commit e3d877f8 authored by Tom Burdick's avatar Tom Burdick Committed by Carles Cufi
Browse files

rtio: Userspace support



Add support for userspace with RTIO by making rtio and rtio_iodev
k_objects. As well as adding three syscalls for copying in submissions,
copying out completions, and starting tasks with submit.

For the small devices Zephyr typically runs on one of the most important
attributes tends to be low memory usage. To maintain the low footprint of
RTIO and its current executor implementations the rings are not shared with
userspace. Sharing the rings it turns out would require copying submissions
before working with them to avoid TOCTOU issues.

The API could still support shared rings in the future so that a
kernel thread could directly poll, copy, verify, and start the submitted
work. This would require a third executor implementation that maintains its
own copy of submissions similiar to how io_uring in Linux works.

Signed-off-by: default avatarTom Burdick <thomas.burdick@intel.com>
parent 21da8c27
Loading
Loading
Loading
Loading
+5 −0
Original line number Diff line number Diff line
@@ -97,6 +97,11 @@ if(CONFIG_USB_DEVICE_BOS)
  )
endif()

if(CONFIG_RTIO)
  zephyr_iterable_section(NAME rtio GROUP DATA_REGION ${XIP_ALIGN_WITH_INPUT} SUBALIGN 4)
  zephyr_iterable_section(NAME rtio_iodev GROUP DATA_REGION ${XIP_ALIGN_WITH_INPUT} SUBALIGN 4)
endif()

#if(CONFIG_USERSPACE)
#	_static_kernel_objects_end = .;
#endif()
+15 −0
Original line number Diff line number Diff line
@@ -142,6 +142,21 @@
	} GROUP_DATA_LINK_IN(RAMABLE_REGION, ROMABLE_REGION)
#endif /* CONFIG_USB_DEVICE_BOS */


#if defined(CONFIG_RTIO)
	SECTION_DATA_PROLOGUE(rtio,,SUBALIGN(4))
	{
		__rtio_start = .;
		*(".rtio.*")
		KEEP(*(SORT_BY_NAME(".rtio.*")))
		__rtio_end = .;
	} GROUP_DATA_LINK_IN(RAMABLE_REGION, ROMABLE_REGION)

	ITERABLE_SECTION_RAM(rtio, 4)
	ITERABLE_SECTION_RAM(rtio_iodev, 4)
#endif /* CONFIG_RTIO */


#ifdef CONFIG_USERSPACE
	_static_kernel_objects_end = .;
#endif
+248 −79
Original line number Diff line number Diff line
@@ -113,7 +113,7 @@ struct rtio_sqe {

	uint16_t flags; /**< Op Flags */

	struct rtio_iodev *iodev; /**< Device to operation on */
	const struct rtio_iodev *iodev; /**< Device to operation on */

	/**
	 * User provided pointer to data which is returned upon operation
@@ -302,10 +302,6 @@ struct rtio_iodev_sq {

/**
 * @brief An IO device with a function table for submitting requests
 *
 * This is required to be the first member of every iodev. There's a strong
 * possibility this will be extended with some common data fields (statistics)
 * in the future.
 */
struct rtio_iodev {
	/* Function pointer table */
@@ -313,8 +309,10 @@ struct rtio_iodev {

	/* Queue of RTIO contexts with requests */
	struct rtio_iodev_sq *iodev_sq;
};

	/* Data associated with this iodev */
	void *data;
};

/** An operation that does nothing and will complete immediately */
#define RTIO_OP_NOP 0
@@ -329,7 +327,7 @@ struct rtio_iodev {
 * @brief Prepare a nop (no op) submission
 */
static inline void rtio_sqe_prep_nop(struct rtio_sqe *sqe,
				struct rtio_iodev *iodev,
				const struct rtio_iodev *iodev,
				void *userdata)
{
	sqe->op = RTIO_OP_NOP;
@@ -341,7 +339,7 @@ static inline void rtio_sqe_prep_nop(struct rtio_sqe *sqe,
 * @brief Prepare a read op submission
 */
static inline void rtio_sqe_prep_read(struct rtio_sqe *sqe,
				      struct rtio_iodev *iodev,
				      const struct rtio_iodev *iodev,
				      int8_t prio,
				      uint8_t *buf,
				      uint32_t len,
@@ -359,7 +357,7 @@ static inline void rtio_sqe_prep_read(struct rtio_sqe *sqe,
 * @brief Prepare a write op submission
 */
static inline void rtio_sqe_prep_write(struct rtio_sqe *sqe,
				       struct rtio_iodev *iodev,
				       const struct rtio_iodev *iodev,
				       int8_t prio,
				       uint8_t *buf,
				       uint32_t len,
@@ -380,7 +378,7 @@ static inline void rtio_sqe_prep_write(struct rtio_sqe *sqe,
 * @param len Queue length, power of 2 required (2, 4, 8).
 */
#define RTIO_SQ_DEFINE(name, len)			\
	static RTIO_SPSC_DEFINE(name, struct rtio_sqe, len)
	RTIO_SPSC_DEFINE(name, struct rtio_sqe, len)

/**
 * @brief Statically define and initialize a fixed length completion queue.
@@ -389,7 +387,7 @@ static inline void rtio_sqe_prep_write(struct rtio_sqe *sqe,
 * @param len Queue length, power of 2 required (2, 4, 8).
 */
#define RTIO_CQ_DEFINE(name, len)			\
	static RTIO_SPSC_DEFINE(name, struct rtio_cqe, len)
	RTIO_SPSC_DEFINE(name, struct rtio_cqe, len)


/**
@@ -399,7 +397,23 @@ static inline void rtio_sqe_prep_write(struct rtio_sqe *sqe,
 * @param len Queue length, power of 2 required
 */
#define RTIO_IODEV_SQ_DEFINE(name, len) \
	static RTIO_SPSC_DEFINE(name, struct rtio_iodev_sqe, len)
	RTIO_SPSC_DEFINE(name, struct rtio_iodev_sqe, len)

/**
 * @brief Statically define and initialize an RTIO IODev
 *
 * @param name Name of the iodev
 * @param iodev_api Pointer to struct rtio_iodev_api
 * @param qsize Size of the submission queue, must be power of 2
 * @param iodev_data Data pointer
 */
#define RTIO_IODEV_DEFINE(name, iodev_api, qsize, iodev_data)                                      \
	static RTIO_IODEV_SQ_DEFINE(_iodev_sq_##name, qsize);                                      \
	const STRUCT_SECTION_ITERABLE(rtio_iodev, name) = {                                        \
		.api = (iodev_api),                                                                \
		.iodev_sq = (struct rtio_iodev_sq *const)&_iodev_sq_##name,                        \
		.data = (iodev_data),                                                              \
	}

/**
 * @brief Statically define and initialize an RTIO context
@@ -410,11 +424,13 @@ static inline void rtio_sqe_prep_write(struct rtio_sqe *sqe,
 * @param cq_sz Size of the completion queue, must be power of 2
 */
#define RTIO_DEFINE(name, exec, sq_sz, cq_sz)	\
	IF_ENABLED(CONFIG_RTIO_SUBMIT_SEM, (K_SEM_DEFINE(_submit_sem_##name, 0, K_SEM_MAX_LIMIT))) \
	IF_ENABLED(CONFIG_RTIO_CONSUME_SEM, (K_SEM_DEFINE(_consume_sem_##name, 0, 1)))		   \
	RTIO_SQ_DEFINE(_sq_##name, sq_sz);							   \
	RTIO_CQ_DEFINE(_cq_##name, cq_sz);							   \
	static struct rtio name = {								   \
	IF_ENABLED(CONFIG_RTIO_SUBMIT_SEM,							   \
		   (static K_SEM_DEFINE(_submit_sem_##name, 0, K_SEM_MAX_LIMIT)))		   \
	IF_ENABLED(CONFIG_RTIO_CONSUME_SEM,							   \
		   (static K_SEM_DEFINE(_consume_sem_##name, 0, 1)))				   \
	static RTIO_SQ_DEFINE(_sq_##name, sq_sz);						   \
	static RTIO_CQ_DEFINE(_cq_##name, cq_sz);						   \
	STRUCT_SECTION_ITERABLE(rtio, name) = {							   \
		.executor = (exec),                                                                \
		.xcqcnt = ATOMIC_INIT(0),                                                          \
		IF_ENABLED(CONFIG_RTIO_SUBMIT_SEM, (.submit_sem = &_submit_sem_##name,))	   \
@@ -422,7 +438,7 @@ static inline void rtio_sqe_prep_write(struct rtio_sqe *sqe,
		IF_ENABLED(CONFIG_RTIO_CONSUME_SEM, (.consume_sem = &_consume_sem_##name,))	   \
		.sq = (struct rtio_sq *const)&_sq_##name,					   \
		.cq = (struct rtio_cq *const)&_cq_##name,                                          \
	}
	};

/**
 * @brief Set the executor of the rtio context
@@ -444,72 +460,51 @@ static inline void rtio_iodev_submit(const struct rtio_sqe *sqe, struct rtio *r)
}

/**
 * @brief Submit I/O requests to the underlying executor
 *
 * Submits the queue of requested IO operation chains to
 * the underlying executor. The underlying executor will decide
 * on which hardware and with what sort of parallelism the execution
 * of IO chains is performed.
 * @brief Count of acquirable submission queue events
 *
 * @param r RTIO context
 * @param wait_count Count of completions to wait for
 * If wait_count is for completions flag is set, the call will not
 * return until the desired number of completions are done. A wait count of
 * non-zero requires the caller be on a thread.
 *
 * @retval 0 On success
 * @return Count of acquirable submission queue events
 */
static inline int rtio_submit(struct rtio *r, uint32_t wait_count)
static inline uint32_t rtio_sqe_acquirable(struct rtio *r)
{
	int res;

	__ASSERT(r->executor != NULL, "expected rtio submit context to have an executor");
	return rtio_spsc_acquirable(r->sq);
}

#ifdef CONFIG_RTIO_SUBMIT_SEM
	/* TODO undefined behavior if another thread calls submit of course
/**
 * @brief Acquire a single submission queue event if available
 *
 * @param r RTIO context
 *
 * @retval sqe A valid submission queue event acquired from the submission queue
 * @retval NULL No subsmission queue event available
 */
	if (wait_count > 0) {
		__ASSERT(!k_is_in_isr(),
			 "expected rtio submit with wait count to be called from a thread");

		k_sem_reset(r->submit_sem);
		r->submit_count = wait_count;
static inline struct rtio_sqe *rtio_sqe_acquire(struct rtio *r)
{
	return rtio_spsc_acquire(r->sq);
}
#endif

	/* Enqueue all prepared submissions */
	rtio_spsc_produce_all(r->sq);

	/* Submit the queue to the executor which consumes submissions
	 * and produces completions through ISR chains or other means.
/**
 * @brief Produce all previously acquired sqe
 *
 * @param r RTIO context
 */
	res = r->executor->api->submit(r);
	if (res != 0) {
		return res;
static inline void rtio_sqe_produce_all(struct rtio *r)
{
	rtio_spsc_produce_all(r->sq);
}

	/* TODO could be nicer if we could suspend the thread and not
	 * wake up on each completion here.
	 */
#ifdef CONFIG_RTIO_SUBMIT_SEM

	if (wait_count > 0) {
		res = k_sem_take(r->submit_sem, K_FOREVER);
		__ASSERT(res == 0,
			 "semaphore was reset or timed out while waiting on completions!");
	}
#else
	while (rtio_spsc_consumable(r->cq) < wait_count) {
#ifdef CONFIG_BOARD_NATIVE_POSIX
		k_busy_wait(1);
#else
		k_yield();
#endif /* CONFIG_BOARD_NATIVE_POSIX */
/**
 * @brief Drop all previously acquired sqe
 *
 * @param r RTIO context
 */
static inline void rtio_sqe_drop_all(struct rtio *r)
{
	rtio_spsc_drop_all(r->sq);
}
#endif

	return res;
}

/**
 * @brief Consume a single completion queue event if available
@@ -560,6 +555,17 @@ static inline struct rtio_cqe *rtio_cqe_consume_block(struct rtio *r)
	return cqe;
}

/**
 * @brief Release all consumed completion queue events
 *
 * @param r RTIO context
 */
static inline void rtio_cqe_release_all(struct rtio *r)
{
	rtio_spsc_release_all(r->cq);
}


/**
 * @brief Inform the executor of a submission completion with success
 *
@@ -622,9 +628,170 @@ static inline void rtio_cqe_submit(struct rtio *r, int result, void *userdata)
#endif
}

/* TODO add rtio_sqe_suspend() for suspending a submission chain that must
 * wait on other in progress submissions or submission chains.
/**
 * Grant access to an RTIO context to a user thread
 */
static inline void rtio_access_grant(struct rtio *r, struct k_thread *t)
{
	k_object_access_grant(r, t);

#ifdef CONFIG_RTIO_SUBMIT_SEM
	k_object_access_grant(r->submit_sem, t);
#endif

#ifdef CONFIG_RTIO_CONSUME_SEM
	k_object_access_grant(r->consume_sem, t);
#endif
}

/**
 * @brief Copy an array of SQEs into the queue
 *
 * Useful if a batch of submissions is stored in ROM or
 * RTIO is used from user mode where a copy must be made.
 *
 * Partial copying is not done as chained SQEs need to be submitted
 * as a whole set.
 *
 * @param r RTIO context
 * @param sqes Pointer to an array of SQEs
 * @param sqe_count Count of sqes in array
 *
 * @retval 0 success
 * @retval -ENOMEM not enough room in the queue
 */
__syscall int rtio_sqe_copy_in(struct rtio *r,
			       const struct rtio_sqe *sqes,
			       size_t sqe_count);
static inline int z_impl_rtio_sqe_copy_in(struct rtio *r,
					  const struct rtio_sqe *sqes,
					  size_t sqe_count)
{
	struct rtio_sqe *sqe;
	uint32_t acquirable = rtio_sqe_acquirable(r);

	if (acquirable < sqe_count) {
		return -ENOMEM;
	}

	for (int i = 0; i < sqe_count; i++) {
		sqe = rtio_sqe_acquire(r);
		__ASSERT_NO_MSG(sqe != NULL);
		*sqe = sqes[i];
	}

	rtio_sqe_produce_all(r);

	return 0;
}

/**
 * @brief Copy an array of CQEs from the queue
 *
 * Copies from the RTIO context and its queue completion queue
 * events, waiting for the given time period to gather the number
 * of completions requested.
 *
 * @param r RTIO context
 * @param cqes Pointer to an array of SQEs
 * @param cqe_count Count of sqes in array
 * @param timeout Timeout to wait for each completion event. Total wait time is
 *                potentially timeout*cqe_count at maximum.
 *
 * @retval copy_count Count of copied CQEs (0 to cqe_count)
 */
__syscall int rtio_cqe_copy_out(struct rtio *r,
				struct rtio_cqe *cqes,
				size_t cqe_count,
				k_timeout_t timeout);
static inline int z_impl_rtio_cqe_copy_out(struct rtio *r,
					   struct rtio_cqe *cqes,
					   size_t cqe_count,
					   k_timeout_t timeout)
{
	size_t copied;
	struct rtio_cqe *cqe;

	for (copied = 0; copied < cqe_count; copied++) {
		cqe = rtio_cqe_consume_block(r);
		if (cqe == NULL) {
			break;
		}
		cqes[copied] = *cqe;
	}


	rtio_cqe_release_all(r);

	return copied;
}

/**
 * @brief Submit I/O requests to the underlying executor
 *
 * Submits the queue of submission queue events to the executor.
 * The executor will do the work of managing tasks representing each
 * submission chain, freeing submission queue events when done, and
 * producing completion queue events as submissions are completed.
 *
 * @param r RTIO context
 * @param wait_count Number of submissions to wait for completion of.
 *
 * @retval 0 On success
 */
__syscall int rtio_submit(struct rtio *r, uint32_t wait_count);

static inline int z_impl_rtio_submit(struct rtio *r, uint32_t wait_count)
{
	int res;

	__ASSERT(r->executor != NULL, "expected rtio submit context to have an executor");

#ifdef CONFIG_RTIO_SUBMIT_SEM
	/* TODO undefined behavior if another thread calls submit of course
	 */
	if (wait_count > 0) {
		__ASSERT(!k_is_in_isr(),
			 "expected rtio submit with wait count to be called from a thread");

		k_sem_reset(r->submit_sem);
		r->submit_count = wait_count;
	}
#endif

	/* Enqueue all prepared submissions */
	rtio_spsc_produce_all(r->sq);

	/* Submit the queue to the executor which consumes submissions
	 * and produces completions through ISR chains or other means.
	 */
	res = r->executor->api->submit(r);
	if (res != 0) {
		return res;
	}

	/* TODO could be nicer if we could suspend the thread and not
	 * wake up on each completion here.
	 */
#ifdef CONFIG_RTIO_SUBMIT_SEM

	if (wait_count > 0) {
		res = k_sem_take(r->submit_sem, K_FOREVER);
		__ASSERT(res == 0,
			 "semaphore was reset or timed out while waiting on completions!");
	}
#else
	while (rtio_spsc_consumable(r->cq) < wait_count) {
#ifdef CONFIG_BOARD_NATIVE_POSIX
		k_busy_wait(1);
#else
		k_yield();
#endif /* CONFIG_BOARD_NATIVE_POSIX */
	}
#endif

	return res;
}

/**
 * @}
@@ -634,4 +801,6 @@ static inline void rtio_cqe_submit(struct rtio *r, int result, void *userdata)
}
#endif

#include <syscalls/rtio.h>

#endif /* ZEPHYR_INCLUDE_RTIO_RTIO_H_ */
+38 −0
Original line number Diff line number Diff line
@@ -193,6 +193,18 @@ struct rtio_spsc {
		}                                                                                  \
	})

/**
 * @brief Drop all previously acquired elements
 *
 * This makes all previous acquired elements available to be acquired again
 *
 * @param spsc SPSC to drop all previously acquired elements or do nothing
 */
#define rtio_spsc_drop_all(spsc)		\
	do {					\
		(spsc)->_spsc.acquire = 0;	\
	} while (false)

/**
 * @brief Consume an element from the spsc
 *
@@ -223,6 +235,32 @@ struct rtio_spsc {
		}                                                                                  \
	})


/**
 * @brief Release all consumed elements
 *
 * @param spsc SPSC to release consumed elements or do nothing
 */
#define rtio_spsc_release_all(spsc)                                                                \
	({                                                                                         \
		if ((spsc)->_spsc.consume > 0) {                                                   \
			unsigned long consumed = (spsc)->_spsc.consume;                            \
			(spsc)->_spsc.consume = 0;                                                 \
			atomic_add(&(spsc)->_spsc.out, consumed);                                  \
		}                                                                                  \
	})

/**
 * @brief Count of acquirable in spsc
 *
 * @param spsc SPSC to get item count for
 */
#define rtio_spsc_acquirable(spsc)                                                                 \
	({                                                                                         \
		(((spsc)->_spsc.in + (spsc)->_spsc.acquire) - (spsc)->_spsc.out) -                 \
			rtio_spsc_size(spsc);                                                      \
	})

/**
 * @brief Count of consumables in spsc
 *
+3 −1
Original line number Diff line number Diff line
@@ -111,7 +111,9 @@ kobjects = OrderedDict([
    ("ztest_suite_node", ("CONFIG_ZTEST", True, False)),
    ("ztest_suite_stats", ("CONFIG_ZTEST", True, False)),
    ("ztest_unit_test", ("CONFIG_ZTEST_NEW_API", True, False)),
    ("ztest_test_rule", ("CONFIG_ZTEST_NEW_API", True, False))
    ("ztest_test_rule", ("CONFIG_ZTEST_NEW_API", True, False)),
    ("rtio", ("CONFIG_RTIO", False, False)),
    ("rtio_iodev", ("CONFIG_RTIO", False, False))
])

def kobject_to_enum(kobj):
Loading