Commit b0caaed4 authored by Luis Ubieda's avatar Luis Ubieda Committed by Daniel DeGrasse
Browse files

rtio: workq: Restructure workqueue as a threads pool with a queue



Based on a discussion around P4WQ limitations for our application, it
was determined that the RTIO workqueue required the ability to use
additional threads from the pool in spite of work items blocked.
Since this is not covered, nor desired for the P4WQ, then remove this
dependency and re-implement it in a way that covers also this use-case.

Signed-off-by: default avatarLuis Ubieda <luisf@croxel.com>
parent ee44416d
Loading
Loading
Loading
Loading
+6 −4
Original line number Diff line number Diff line
@@ -3,8 +3,6 @@

config RTIO_WORKQ
	bool "RTIO Work-queues service to process Sync operations"
	select SCHED_DEADLINE
	select P4WQ_INIT_STAGE_EARLY
	select RTIO_CONSUME_SEM
	help
	  Enable RTIO Work-queues to allow processing synchronous operations
@@ -12,10 +10,14 @@ config RTIO_WORKQ

if RTIO_WORKQ

config RTIO_WORKQ_PRIO_MED
	int "Medium Thread priority of RTIO Work-queues"
config RTIO_WORKQ_THREADS_POOL_PRIO
	int "Priority of RTIO Workqueue Threads Pool"
	default MAIN_THREAD_PRIORITY

config RTIO_WORKQ_THREADS_POOL_STACK_SIZE
	int "Priority of RTIO Workqueue Threads Pool"
	default 1024

config RTIO_WORKQ_STACK_SIZE
	int "Thread stack-size of RTIO Workqueues"
	default 2048
+46 −52
Original line number Diff line number Diff line
/*
 * Copyright (c) 2024 Croxel Inc.
 * Copyright (c) 2025 Croxel Inc.
 *
 * SPDX-License-Identifier: Apache-2.0
 */
@@ -7,37 +8,15 @@
#include <zephyr/rtio/work.h>
#include <zephyr/kernel.h>

#define RTIO_WORKQ_PRIO_MED		CONFIG_RTIO_WORKQ_PRIO_MED
#define RTIO_WORKQ_PRIO_HIGH		RTIO_WORKQ_PRIO_MED - 1
#define RTIO_WORKQ_PRIO_LOW		RTIO_WORKQ_PRIO_MED + 1

K_MEM_SLAB_DEFINE_STATIC(rtio_work_items_slab,
			 sizeof(struct rtio_work_req),
			 CONFIG_RTIO_WORKQ_POOL_ITEMS,
			 4);

static void rtio_work_req_done_handler(struct k_p4wq_work *work)
{
	struct rtio_work_req *req = CONTAINER_OF(work,
						 struct rtio_work_req,
						 work);
	k_mem_slab_free(&rtio_work_items_slab, req);
}

K_P4WQ_DEFINE_WITH_DONE_HANDLER(rtio_workq,
static K_THREAD_STACK_ARRAY_DEFINE(rtio_workq_threads_stack,
				   CONFIG_RTIO_WORKQ_THREADS_POOL,
	      CONFIG_RTIO_WORKQ_STACK_SIZE,
		  rtio_work_req_done_handler);

static void rtio_work_handler(struct k_p4wq_work *work)
{
	struct rtio_work_req *req = CONTAINER_OF(work,
						 struct rtio_work_req,
						 work);
	struct rtio_iodev_sqe *iodev_sqe = req->iodev_sqe;

	req->handler(iodev_sqe);
}
				   CONFIG_RTIO_WORKQ_THREADS_POOL_STACK_SIZE);
static struct k_thread rtio_work_threads[CONFIG_RTIO_WORKQ_THREADS_POOL];
static K_QUEUE_DEFINE(rtio_workq);

struct rtio_work_req *rtio_work_req_alloc(void)
{
@@ -49,12 +28,6 @@ struct rtio_work_req *rtio_work_req_alloc(void)
		return NULL;
	}

	/** Initialize work item before using it as it comes
	 * from a Memory slab (no-init region).
	 */
	req->work.thread = NULL;
	(void)k_sem_init(&req->work.done_sem, 1, 1);

	return req;
}

@@ -71,31 +44,52 @@ void rtio_work_req_submit(struct rtio_work_req *req,
		return;
	}

	struct k_p4wq_work *work = &req->work;
	struct rtio_sqe *sqe = &iodev_sqe->sqe;

	/** Link the relevant info so that we can get it on the k_p4wq_work work item.
	 */
	req->iodev_sqe = iodev_sqe;
	req->handler = handler;

	/** Set the required information to handle the action */
	work->handler = rtio_work_handler;
	work->deadline = 0;
	/** For now we're simply treating this as a FIFO queue. It may be
	 * desirable to expand this to handle queue ordering based on RTIO
	 * SQE priority.
	 */
	k_queue_append(&rtio_workq, req);
}

	if (sqe->prio == RTIO_PRIO_LOW) {
		work->priority = RTIO_WORKQ_PRIO_LOW;
	} else if (sqe->prio == RTIO_PRIO_HIGH) {
		work->priority = RTIO_WORKQ_PRIO_HIGH;
	} else {
		work->priority = RTIO_WORKQ_PRIO_MED;
uint32_t rtio_work_req_used_count_get(void)
{
	return k_mem_slab_num_used_get(&rtio_work_items_slab);
}

	/** Decoupling action: Let the P4WQ execute the action. */
	k_p4wq_submit(&rtio_workq, work);
static void rtio_workq_thread_fn(void *arg1, void *arg2, void *arg3)
{
	ARG_UNUSED(arg1);
	ARG_UNUSED(arg2);
	ARG_UNUSED(arg3);

	while (true) {
		struct rtio_work_req *req = k_queue_get(&rtio_workq, K_FOREVER);

		if (req != NULL) {
			req->handler(req->iodev_sqe);

			k_mem_slab_free(&rtio_work_items_slab, req);
		}
	}
}

uint32_t rtio_work_req_used_count_get(void)
static int static_init(void)
{
	return k_mem_slab_num_used_get(&rtio_work_items_slab);
	for (size_t i = 0 ; i < ARRAY_SIZE(rtio_work_threads) ; i++) {
		k_thread_create(&rtio_work_threads[i],
				rtio_workq_threads_stack[i],
				CONFIG_RTIO_WORKQ_THREADS_POOL_STACK_SIZE,
				rtio_workq_thread_fn,
				NULL, NULL, NULL,
				CONFIG_RTIO_WORKQ_THREADS_POOL_PRIO,
				0,
				K_NO_WAIT);
	}

	return 0;
}

SYS_INIT(static_init, POST_KERNEL, 1);
+0 −1
Original line number Diff line number Diff line
CONFIG_RTIO=y
CONFIG_RTIO_WORKQ=y
CONFIG_RTIO_WORKQ_THREADS_POOL=3
CONFIG_RTIO_WORKQ_PRIO_MED=3
CONFIG_ZTEST=y
CONFIG_ZTEST_THREAD_PRIORITY=8
CONFIG_MP_MAX_NUM_CPUS=1