Commit 19a0424e authored by Maria Matejka's avatar Maria Matejka
Browse files

Multithreading: Added the task/worker subsystem.

parent 0e3d4149
Loading
Loading
Loading
Loading
+3 −0
Original line number Diff line number Diff line
@@ -45,6 +45,9 @@ struct config {
  u32 latency_limit;			/* Events with longer duration are logged (us) */
  u32 watchdog_warning;			/* I/O loop watchdog limit for warning (us) */
  u32 watchdog_timeout;			/* Watchdog timeout (in seconds, 0 = disabled) */
  uint workers;				/* How many workers should run by default */
  uint max_workers;			/* How many workers should run at maximum */
  uint queue_size;			/* How many items can be in the global queue */
  char *err_msg;			/* Parser error message */
  int err_lino;				/* Line containing error */
  int err_chno;				/* Character where the parser stopped */
+17 −0
Original line number Diff line number Diff line
@@ -619,6 +619,23 @@ include "tablename.conf";;
	<cf/protocol/ times, and the <cf/iso long ms/ format for <cf/base/ and
	<cf/log/ times.

	<tag><label id="opt-worker">worker <m/number/ [ max <m/number/]</tag>
	Set number of workers to run BIRD. The first number states how many concurrent
	threads are allowed to run, plus one if you run BFD (it has its own thread).
	It's recommended to set this to at most the number of your CPU cores.
	By default, this is 4.

	The max number is a maximal number of threads allowed to exist (plus one for BFD).
	Setting this to a too low value may lead to deadlocks in the route propagation task chain.
	By default, this is 8; if this clause is omitted and the worker count
	is more than 4, this is twice the worker count.

	<tag><label id="opt-queue-size">queue size <m/number/</tag>
	Set how many tasks can wait in the task queue for workers. Higher value means
	more memory consumed with lesser thread switching overhead. Zero means no buffer
	at all. By default, this is 64 and you probably don't want to change it unless
	you are fine-tuning your route server performance.

	<tag><label id="opt-table"><m/nettype/ table <m/name/ [sorted]</tag>
	Create a new routing table. The default routing tables <cf/master4/ and
	<cf/master6/ are created implicitly, other routing tables have to be
+36 −0
Original line number Diff line number Diff line
@@ -137,6 +137,26 @@ init_list(list *l)
  l->tail = &l->head_node;
}

/**
 * add_head_list - concatenate two lists
 * @to: destination list
 * @l: source list
 *
 * This function prepends all elements of the list @l to
 * the list @to in constant time.
 */
LIST_INLINE void
add_head_list(list *to, list *l)
{
  /* Join the lists */
  to->head->prev = l->tail;
  l->tail->next = to->head;

  /* Fix the header */
  to->head = l->head;
  to->head->prev = &to->head_node;
}

/**
 * add_tail_list - concatenate two lists
 * @to: destination list
@@ -169,3 +189,19 @@ list_length(list *l)

  return len;
}

/* Move all nodes from one list to another */
LIST_INLINE void
move_list(list *dest, list *src)
{
  dest->head = src->head;
  dest->null = NULL;
  dest->tail = src->tail;

  dest->head->prev = &dest->head_node;
  dest->tail->next = &dest->tail_node;

  src->head = &src->tail_node;
  src->null = NULL;
  src->tail = &src->head_node;
}

lib/locked.h

0 → 100644
+17 −0
Original line number Diff line number Diff line
/*
 *	BIRD Library -- Locked data structures
 *
 *	(c) 2019 Maria Matejka <mq@ucw.cz>
 *
 *	Can be freely distributed and used under the terms of the GNU GPL.
 */

#ifndef _BIRD_LOCKED_H_
#define _BIRD_LOCKED_H_

/* Worker / Thread ID */
#define NOWORKER (~((u64) 0))
extern _Thread_local u64 worker_id;

#endif

lib/worker.h

0 → 100644
+99 −0
Original line number Diff line number Diff line
/*
 *	BIRD Library -- Worker threads
 *
 *	(c) 2019 Maria Matejka <mq@ucw.cz>
 *
 *	Can be freely distributed and used under the terms of the GNU GPL.
 */

#ifndef _BIRD_WORKER_H_
#define _BIRD_WORKER_H_

#include "lib/birdlib.h"
#include "lib/resource.h"
#include "lib/atomic.h"
#include "lib/locked.h"

struct config;

extern _Thread_local linpool *task_pool;

struct semaphore *semaphore_new(pool *p, uint n);
void semaphore_wait(struct semaphore *s);
void semaphore_post(struct semaphore *s);

struct domain *domain_new(pool *p);
void domain_read_lock(struct domain *);
void domain_read_unlock(struct domain *);
void domain_write_lock(struct domain *);
void domain_write_unlock(struct domain *);

extern _Thread_local struct domain *worker_domain;
extern _Thread_local enum task_flags worker_task_flags;

void domain_assert_write_locked(struct domain *);
void domain_assert_read_locked(struct domain *);
void domain_assert_unlocked(struct domain *);

enum task_flags {
  /* These flags can be set by the user */
  TF_EXCLUSIVE = 0x1,		/* Lock the domain exclusively */
  TF_TAIL = 0x2,		/* This is the last task produced by current task */
  TF_IDEMPOTENT = 0x4,		/* Do nothing if task already pushed */
  TF_PUBLIC_MASK = 0xff,	/* Flags are masked by this value on task push */
  /* These flags are private for worker queue */
  TF_PREPENDED = 0x100,		/* Task is waiting for the first free worker */
  TF_ENQUEUED = 0x200,		/* Task is in queue */
} PACKED;

struct task {
  node n;				/* Init this to zero. */
  _Atomic enum task_flags flags;	/* Task flags */
  struct domain *domain;		/* Task's primary domain */
  void (*execute)(struct task *);	/* This will be called to execute the task */
};

/* Always initialize the task by task_init() */
static inline void task_init(struct task *t, enum task_flags tf, struct domain *domain, void (*execute)(struct task *))
{
  ASSERT(t);
  ASSERT(execute);
  *t = (struct task) {
    .n = { },
    .flags = ATOMIC_VAR_INIT(tf & TF_PUBLIC_MASK),
    .domain = domain,
    .execute = execute,
  };
}

/* Initialize the worker queue. Run once and never more. */
void worker_queue_init(void);

/* Flush and cleanup the worker queue. Run only in tests. */
void worker_queue_destroy(void);

/* Update configuration for worker queue
 * @c: new config
 */
void worker_queue_update(const struct config *c);

/* Push some work to the queue.
 * @t: task to push
 *
 * The execute callback should dispose of the task.
 * May block if no worker is available to pick the task.
 */
void task_push(struct task *t);

struct io_ping_handle;
/* Init a handle for main thread wakeup. Must be run from the main thread.
 * @hook: run this when ping is received in IO (main) thread.
 *
 * Returns the io ping handle.
 */
struct io_ping_handle *io_ping_new(void (*hook)(struct io_ping_handle *));

/* Issue the ping. Run from a worker thread. */
void io_ping(struct io_ping_handle *);

#endif
Loading