Commit 3177b619 authored by Maria Matejka's avatar Maria Matejka
Browse files

Hopefully working workers. Not tested yet.

parent a37de78f
Loading
Loading
Loading
Loading
+20 −28
Original line number Diff line number Diff line
@@ -10,47 +10,39 @@

struct config;

struct worker_queue;
extern struct worker_queue *main_queue;

enum task_state {
  TS_DIRECT = 0,
  TS_PENDING = 1,
  TS_INPROGRESS = 2,
  TS_SENDMORE = 3,
  TS_SENDING = 4,
struct domain;
struct limiter;

enum task_flags {
  /* These flags can be set by the user */
  TF_EXCLUSIVE = 0x1,		/* Lock the domain exclusively */
  TF_PUBLIC_MASK = 0xff,	/* Flags are masked by this value on task push */
  /* These flags are private for worker queue */
  TF_PREPENDED = 0x100,		/* Task is the first in domain blocked-queue */
} PACKED;

struct task {
  node n;				/* Init this to zero. */
  enum task_state state;		/* Init this to TS_DIRECT. */
  void (*sender)(struct task *);	/* This will be called to push more tasks */
  void (*receiver)(struct task *);	/* This will be called to execute the task */
  enum task_flags flags;		/* Task flags */
  struct domain *domain;		/* Task's primary domain */
  void (*execute)(struct task *);	/* This will be called to execute the task */
};

/* Fixed-size worker queue. Must be run from the main thread.
 *
 * Returns the worker queue pointer.
 */
struct worker_queue *worker_queue_new(void);
/* Initialize the worker queue. Run once and never more. */
void worker_queue_init(void);

/* Set the right number of workers in worker queue.
 * @wq: worker queue
 * @prefork: how many workers shall run
/* Update configuration for worker queue
 * @c: new config
 */
void worker_queue_update(struct worker_queue *wq, struct config *c);
void worker_queue_update(struct config *c);

/* Push some work to the queue.
 * @wq: queue to push to
 * @t: task to push
 *
 * Returns 1 if the direct path was available and another work may be pushed,
 * otherwise 0 is returned and the t->sender callback will be called.
 *
 * Sender callback should always dispose of the task or reuse it.
 * Receiver callback should dispose of the task if its state is TS_DIRECT.
 * The execute callback should dispose of the task.
 * May block if no worker is available to pick the task.
 */
int worker_push(struct worker_queue *wq, struct task *t);
void task_push(struct task *t);

struct io_ping_handle;
/* Init a handle for main thread wakeup. Must be run from the main thread.
+2 −4
Original line number Diff line number Diff line
@@ -93,8 +93,6 @@ drop_gid(gid_t gid)
 *	Reading the Configuration
 */

static struct worker_queue *queue;

#ifdef PATH_IPROUTE_DIR

static inline void
@@ -187,7 +185,7 @@ int
sysdep_commit(struct config *new, struct config *old UNUSED)
{
  log_switch(0, &new->logfiles, new->syslog_name);
  worker_queue_update(queue, new);
  worker_queue_update(new);
  return 0;
}

@@ -909,7 +907,7 @@ main(int argc, char **argv)

  main_thread_init();

  queue = worker_queue_new();
  worker_queue_init();

  write_pid_file();

+187 −91
Original line number Diff line number Diff line
@@ -12,6 +12,12 @@
#include <pthread.h>
#include <unistd.h>

static inline void SEM_INIT(sem_t *s, uint val)
{
  if (sem_init(s, 0, val) < 0)
    bug("sem_init() failed: %m");
}

static inline void SEM_WAIT(sem_t *s)
{
  while (sem_wait(s) < 0) {
@@ -47,97 +53,155 @@ static inline void SEM_POST(sem_t *s)

static _Thread_local struct timeloop worker_timeloop;

struct worker_queue {
static struct worker_queue {
  sem_t waiting;		/* Workers wait on this semaphore to get work */
  sem_t stopped;		/* Posted on worker stopped */
  sem_t available;		/* How many workers are currently free */
  pthread_spinlock_t lock;	/* Lock for the following values */
  list pending;			/* Pending tasks */
  list sendmore;		/* Pending sendmore requests */
  int available;		/* How many workers are waiting */
  list pending;			/* Tasks pending */
  uint running;			/* How many workers are running */
  uint prefork;			/* Default count of workers */
  uint stop;			/* Stop requests */
} wq_, *wq = &wq_;

struct domain {
  node n;
  pthread_rwlock_t lock;
  list blocked;			/* These tasks are blocked by the rwlock */
};

void
domain_init(struct domain *d)
{
  int e = pthread_rwlock_init(&d->lock, NULL);
  if (e != 0)
    die("Domain init failed: %M", e);

  init_list(&d->blocked);
}

int
domain_trylock_read(struct domain *d)
{
  int e = pthread_rwlock_tryrdlock(&d->lock);
  switch (e)
  {
    case 0:
      return 1;
    case EBUSY:
      return 0;
    default:
      bug("pthread_rwlock_tryrdlock() returned %m");
  }
}

int
domain_trylock_write(struct domain *d)
{
  int e = pthread_rwlock_trywrlock(&d->lock);
  switch (e)
  {
    case 0:
      return 1;
    case EBUSY:
      return 0;
    default:
      bug("pthread_rwlock_tryrdlock() returned %m");
  }
}

void domain_unlock_read(struct domain *d)
{ pthread_rwlock_unlock(&d->lock); }

void domain_unlock_write(struct domain *d)
{ pthread_rwlock_unlock(&d->lock); }


extern _Thread_local struct timeloop *timeloop_current;

static void *
worker_loop(void *_wq)
worker_loop(void *_data UNUSED)
{
  struct worker_queue *wq = _wq;
  
  /* Overall thread initialization */
  times_init(&worker_timeloop);
  timeloop_current = &worker_timeloop;

  debug("Worker started for worker queue %p\n", wq);
  debug("Worker started\n");
 
  /* Run the loop */
  while (1) {
    WQ_LOCK;
    wq->available++;
    WQ_UNLOCK;
    SEM_POST(&wq->available);
    SEM_WAIT(&wq->waiting);
    SEM_WAIT(&wq->available);
    
    WQ_LOCK;
    /* Is there a request to stop? */
    if (wq->stop)
    /* Is there a pending task? */
    if (!EMPTY_LIST(wq->pending))
    {
      wq->stop--;
      WQ_UNLOCK;
      break;
    }

    if (!EMPTY_LIST(wq->pending)) {
      /* Get first pending task out of the list */
      /* Retrieve that task */
      struct task *t = HEAD(wq->pending);
      rem_node(&t->n);
      WQ_UNLOCK;

      /* Execute the task */
      ASSERT(t->state == TS_PENDING);
      t->state = TS_INPROGRESS;
      t->receiver(t);
      t->state = TS_SENDMORE;

      /* Order more tasks */
      WQ_LOCKED add_tail(&wq->sendmore, &t->n);
      /* Does the task need a lock? */
      if (!t->domain)
	/* No. Just run it. */
	t->execute(t);
      else
	/* Yes. And is it available? */
	if (t->flags & TF_EXCLUSIVE ?
	    domain_trylock_write(t->domain) :
	    domain_trylock_read(t->domain))
	{
	  /* Yes. Run it! */
	  t->execute(t);

      /* We have added an item into a queue */
      SEM_POST(&wq->waiting);
      continue;
	  /* And unlock to let others to the domain */
	  t->flags & TF_EXCLUSIVE ?
	    domain_unlock_write(t->domain) :
	    domain_unlock_read(t->domain);
	}

    if (!EMPTY_LIST(wq->sendmore)) {
      /* Get first sendmore task */
      struct task *t = HEAD(wq->sendmore);
      rem_node(&t->n);
      WQ_UNLOCK;

      /* Ask for more work */
      ASSERT(t->state == TS_SENDMORE);
      t->state = TS_SENDING;
      t->sender(t);

      continue;
	else
	{
	  /* Unavailable. Store this task into the blocked list */
	  WQ_LOCKED
	    if (t->flags & TF_PREPENDED)
	      add_head(&t->domain->blocked, &t->n);
	    else
	      add_tail(&t->domain->blocked, &t->n);
	}

    WQ_UNLOCK;
    }
    else
    {
      /* There must be a request to stop then */
      ASSERT(wq->stop > 0);

      /* Requested to stop */
      debug("Worker stopping\n");
      wq->stop--;
      wq->running--;
      WQ_UNLOCK;

      /* Notify the stop requestor */
      SEM_POST(&wq->stopped);

      /* Finished */
      return NULL;
    }
  }
  
  bug("This shall never happen");
}

/* Start a thread */
static int
worker_start(struct worker_queue *wq)
worker_start(void)
{
  /* Run the thread */
  pthread_t id;
  int e = pthread_create(&id, NULL, worker_loop, wq);
  int e = pthread_create(&id, NULL, worker_loop, NULL);
  if ((wq->prefork == 0) && (e < 0))
    bug("Failed to start a worker: %m");

  if (e < 0)
    return e;

@@ -146,73 +210,105 @@ worker_start(struct worker_queue *wq)
  if (e < 0)
    bug("pthread_detach() failed: %m");

  WQ_LOCKED wq->running++;
  WQ_LOCKED
    wq->running++;
  return 0;
}

/* Stop a number of threads */
/* Start a thread */
static void
workers_start(uint count)
{
  uint i;
  for (i=0; i<count; i++)
    if (worker_start() != 0)
      break;

  WQ_LOCKED
    wq->prefork += i;

  /* If started only partially, log a warning */
  if (i < count)
    log(L_WARN "Failed to start a worker (%u of %u): %m", i, count);
}

/* Stop a number of threads. */
static void
worker_stop(struct worker_queue *wq, uint count)
workers_stop(uint count)
{
  WQ_LOCKED wq->stop += count;
  WQ_LOCKED
    wq->stop += count;

  for (uint i=0; i<count; i++)
    SEM_POST(&wq->waiting);

  for (uint i=0; i<count; i++)
    SEM_WAIT(&wq->stopped);

  WQ_LOCKED
    wq->prefork -= count;
}

struct worker_queue *
worker_queue_new(void)
void
worker_queue_init(void)
{
  struct worker_queue *wq = mb_allocz(&root_pool, sizeof(struct worker_queue));

  if (sem_init(&wq->waiting, 0, 0) < 0)
    bug("sem_init() failed: %m");
  if (sem_init(&wq->stopped, 0, 0) < 0)
    bug("sem_init() failed: %m");
  SEM_INIT(&wq->waiting, 0);
  SEM_INIT(&wq->stopped, 0);

  pthread_spin_init(&wq->lock, 0);

  init_list(&wq->pending);
  init_list(&wq->sendmore);

  return wq;
}

void
worker_queue_update(struct worker_queue *wq, struct config *c)
worker_queue_update(struct config *c)
{
  while (c->workers > wq->prefork)
  {
    if (worker_start(wq) == 0)
      wq->prefork++;
    else if (wq->prefork)
      return log(L_WARN "Failed to start a worker: %m");
    else
      bug("Failed to start a worker: %m");
  }

  if (c->workers < wq->prefork)
    worker_stop(wq, wq->prefork - c->workers);
  if (c->workers > wq->prefork)
    workers_start(c->workers - wq->prefork);
  else if (c->workers < wq->prefork)
    workers_stop(wq->prefork - c->workers);
  else /* c->workers == wq->prefork */
    debug("Worker count kept the same");
}

int
worker_push(struct worker_queue *wq, struct task *t)
void
task_push(struct task *t)
{
  ASSERT(t->state == TS_DIRECT);
  ASSERT(t->sender);
  ASSERT(t->receiver);
  ASSERT(t->execute);
  t->flags &= TF_PUBLIC_MASK;
  
  WQ_LOCK;
  t->state = TS_PENDING;
  /* We have a pending task */
  WQ_LOCKED
    add_tail(&wq->pending, &t->n);
  int direct = wq->available-- > 0;
  WQ_UNLOCK;

  /* Is there an available worker right now? */
  if (SEM_TRYWAIT(&wq->available))
  {
    /* Then we have a task for it. */
    SEM_POST(&wq->waiting);
  return direct;
    SEM_POST(&wq->available);
    return;
  }
  else
  {
    /* No available worker. We're going to sleep.
     * Anyway, the task still exists in the queue. */
    SEM_POST(&wq->waiting);

    /* Let's start another worker to keep the number of active workers. */
    if (worker_start() != 0)
      die("Failed to start a temporary worker: %m");

    /* Order one worker stop */
    WQ_LOCKED
      wq->stop++;
    SEM_POST(&wq->waiting);

    /* And wait until it really stops to continue */
    SEM_WAIT(&wq->stopped);
    return;
  }
}

struct io_ping_handle {