Commit 06ed695f authored by Maria Matejka's avatar Maria Matejka Committed by Maria Matejka
Browse files

Worker: Delayed flushing of workers blocked on congested queue

parent 58c0bd37
Loading
Loading
Loading
Loading
+5 −1
Original line number Diff line number Diff line
#ifndef _BIRD_ATOMIC_H_
#define _BIRD_ATOMIC_H_

#if HAVE_ATOMIC
//#if HAVE_ATOMIC
#if 0
#include <stdatomic.h>
#else

@@ -16,6 +17,9 @@
#define atomic_fetch_sub(ptr, val) __sync_fetch_and_sub((ptr), (val))
#define atomic_fetch_sub_explicit(ptr, val, memory) __sync_fetch_and_sub((ptr), (val))

#define atomic_exchange(ptr, val) __sync_lock_test_and_set((ptr), (val))
#define atomic_exchange_explicit(ptr, val, memory) atomic_exchange((ptr), (val))

#define atomic_compare_exchange_weak(ptr, desired, wanted) ({ \
    typeof(desired) _desptr = desired; /* save the pointer */ \
    typeof(*_desptr) _old = __sync_val_compare_and_swap(ptr, *_desptr, wanted); /* do the exchange */ \
+30 −7
Original line number Diff line number Diff line
@@ -57,10 +57,9 @@ static struct worker_queue {
  _Atomic uint max_workers;	/* Maximum count of workers incl. sleeping */
  uint queue_size;		/* How many items can be in queue before blocking */
  _Atomic uint stop;		/* Stop requests */
  union {
  _Atomic uint blocked;		/* How many workers are blocked by full queue */
  _Atomic uint postponed;	/* How many available sem_post's have been postponed */
  _Atomic u64 lock;		/* Simple spinlock */
    sem_t mutex;		/* Mutex instead of the spinlock */
  };
  list pending;			/* Tasks pending */
#ifdef DEBUG_STATELOG
  _Atomic u64 statelog_pos;	/* Current position in statelog */
@@ -894,6 +893,9 @@ worker_loop(void *_data UNUSED)
    {
      WDBG("Worker will wait\n");
      if (!prepended)
	if (atomic_load_explicit(&wq->blocked, memory_order_relaxed))
	  atomic_fetch_add_explicit(&wq->postponed, 1, memory_order_relaxed);
	else
	  SEM_POST(&wq->available);

      WORKER_YIELD();
@@ -905,6 +907,9 @@ worker_loop(void *_data UNUSED)
    {
      WDBG("Worker won't wait\n");
      if (!prepended)
	if (atomic_load_explicit(&wq->blocked, memory_order_relaxed))
	  atomic_fetch_add_explicit(&wq->postponed, 1, memory_order_relaxed);
	else
	  SEM_POST(&wq->available);
    }

@@ -915,8 +920,21 @@ worker_loop(void *_data UNUSED)
      /* Retrieve that task */
      struct task *t = HEAD(wq->pending);
      rem_node(&t->n);

      /* Check list for emptiness */
      int empty = EMPTY_LIST(wq->pending);

      /* No more operations on worker queue */
      WQ_UNLOCK();

      /* Flush the postponed available semaphores */
      if (empty && atomic_load_explicit(&wq->postponed, memory_order_relaxed))
      {
	uint postponed = atomic_exchange_explicit(&wq->postponed, 0, memory_order_relaxed);
	for (uint i=0; i<postponed; i++)
	  SEM_POST(&wq->available);
      }

      /* Store the old flags and domain */
      struct domain *d = t->domain;
      enum task_flags tf = t->flags;
@@ -1154,6 +1172,9 @@ static void
task_push_block(struct task *t)
{
  WDBG("Blocking until a worker is available\n");

  atomic_fetch_add_explicit(&wq->blocked, 1, memory_order_relaxed);

  WQ_LOCK();

  /* Idempotency. */
@@ -1180,6 +1201,8 @@ task_push_block(struct task *t)

  /* Wait until somebody picks the task up */
  SEM_WAIT(&wq->available);

  atomic_fetch_sub_explicit(&wq->blocked, 1, memory_order_relaxed);
  WORKER_CONTINUE();
}

@@ -1196,7 +1219,7 @@ task_push(struct task *t)
    return;

  /* Is there an available worker right now? */
  if (SEM_TRYWAIT(&wq->available))
  if ((atomic_load(&wq->blocked) == 0) && SEM_TRYWAIT(&wq->available))
    return task_push_available(t);
  else
    return task_push_block(t);