Commit 497a7e53 authored by Maria Matejka's avatar Maria Matejka
Browse files

Workers: Fast path in rwlocks now uses atomic operations instead of global spinlock

This makes the runtime faster and nonblocking in most cases, provided
that we have at least one CPU per one allowed worker thread.
parent afb4894f
Loading
Loading
Loading
Loading
+197 −91
Original line number Diff line number Diff line
@@ -201,10 +201,11 @@ static _Thread_local int worker_sleeping = 1;

static inline void WORKER_YIELD(void)
{
  WQ_ASSERT_LOCKED();
  WQ_ASSERT_UNLOCKED();
  ASSERT(!worker_sleeping);
  WQ_STATELOG(WQS_YIELD);

  WQ_LOCK();
  /* We may want to start another worker */
  if (wq->running < wq->max_workers)
  {
@@ -217,6 +218,7 @@ static inline void WORKER_YIELD(void)
      worker_start();
  }

  WQ_UNLOCK();
  SEM_POST(&wq->yield);
  worker_sleeping = 1;
}
@@ -239,19 +241,59 @@ struct domain {
  list rdtasks;		/* Reader tasks blocked */
  list wrtasks;		/* Writer tasks blocked */
  uint rdsem_n;		/* How many secondary readers waiting */
  uint wrsem_n;		/* How many secondary writers waiting + locked */
  uint wrsem_n;		/* How many secondary writers waiting */
  uint rdtasks_n;	/* How many reader tasks waiting */
  uint wrtasks_n;	/* How many writer tasks waiting + locked */
  uint rdlocked;	/* How many readers are locked */
  uint prepended;	/* How many tasks have been prepended */
  uint wrlocked:1;	/* If a writer is locked */
  uint wrtasks_n;	/* How many writer tasks waiting */
  _Atomic u64 lock;	/* Lock state:
			   bit 63: 1 = spinlock, don't do anything on this state
			   bit 62: 1 = writer locked
			   bit 61: 1 = there are readers waiting (rdsem_n + rdtasks_n > 0)
			   bit 60: 1 = there are writers waiting (wrsem_n + wrtasks_n > 0)
			   bit 30-59: number of prepended tasks
			   bit 0-29: number of concurrently locked readers */
};

#define DOMAIN_LOCK_PREPENDED(lock)	(((lock) >> 30) & ((1U << 30) - 1))
#define DOMAIN_LOCK_RDLOCKED(lock)	((lock) & ((1U << 30) - 1))
#define DOMAIN_LOCK_WRITERS_BIT		(1ULL << 60)
#define DOMAIN_LOCK_READERS_BIT		(1ULL << 61)
#define DOMAIN_LOCK_WRLOCKED_BIT	(1ULL << 62)
#define DOMAIN_LOCK_SPINLOCK_BIT	(1ULL << 63)

#define DOMAIN_LOCK_PREPENDED_ONE	(1U << 30)
#define DOMAIN_LOCK_RDLOCKED_ONE	1

#define DOMAIN_LOCK_LOAD_() lock = atomic_load_explicit(&d->lock, memory_order_acquire)
#define DOMAIN_LOCK_LOAD() \
  u64 lock; DOMAIN_LOCK_LOAD_(); \
retry: do { \
  if (lock & DOMAIN_LOCK_SPINLOCK_BIT) { \
    CPU_RELAX(); \
    DOMAIN_LOCK_LOAD_(); \
    goto retry; \
  } \
} while (0)

#define DOMAIN_LOCK_STORE(what) do { \
  if (!atomic_compare_exchange_strong_explicit( \
	&d->lock, &lock, what, memory_order_acq_rel, memory_order_acquire)) \
    goto retry; \
} while (0)

#define DOMAIN_LOCK_ENTER_SLOWPATH() DOMAIN_LOCK_STORE(lock | DOMAIN_LOCK_SPINLOCK_BIT)
#define DOMAIN_LOCK_EXIT_SLOWPATH(what) do { \
  u64 slock = lock | DOMAIN_LOCK_SPINLOCK_BIT; \
  if (!atomic_compare_exchange_strong_explicit( \
	&d->lock, &slock, what, memory_order_acq_rel, memory_order_acquire)) \
    bug("Lock state value shall never change while in slowpath"); \
} while (0)

#define TASK_PREPEND(d, t) do { \
  WQ_LOCK(); \
  t->flags |= TF_PREPENDED; \
  add_head(&wq->pending, &((t)->n)); \
  WQ_UNLOCK(); \
  SEM_POST(&wq->waiting); \
  d->prepended++; \
} while (0)

#define TASK_APPEND(t) do { \
@@ -276,11 +318,13 @@ void
domain_dump(resource *r)
{
  struct domain *d = SKIP_BACK(struct domain, r, r);
  WQ_LOCK();
  DOMAIN_LOCK_LOAD();
  DOMAIN_LOCK_ENTER_SLOWPATH();
  wdebug("Locking domain: WP:%u WS:%u RP:%u RS:%u RL:%u PREP:%u WL:%u\n",
      d->wrtasks_n, d->wrsem_n, d->rdtasks_n, d->rdsem_n,
      d->rdlocked, d->prepended, d->wrlocked);
  WQ_UNLOCK();
      DOMAIN_LOCK_RDLOCKED(lock), DOMAIN_LOCK_PREPENDED(lock),
      !!(lock & DOMAIN_LOCK_WRLOCKED_BIT));
  DOMAIN_LOCK_EXIT_SLOWPATH(lock);
}

static struct resclass domain_resclass = {
@@ -371,7 +415,6 @@ domain_pop_lock(struct domain *d UNUSED)
#define DOMAIN_STATLOG(what_, dom, task_) \
  WQ_STATELOG(what_, .domain = { .rdsem_n = dom->rdsem_n, .wrsem_n = dom->wrsem_n, .rdtasks_n = dom->rdtasks_n, .wrtasks_n = dom->wrtasks_n, .rdlocked = dom->rdlocked, .prepended = dom->prepended, .domain = dom, .task = task_ })


static int
domain_read_lock_primary(struct domain *d, struct task *t)
{
@@ -379,42 +422,51 @@ domain_read_lock_primary(struct domain *d, struct task *t)
  WDBG("Primary read lock: domain %p, task %p\n", d, t);
  DOMAIN_STATLOG(WQS_DOMAIN_RDLOCK_REQUEST, d, t);

  WQ_LOCK();
  DOMAIN_LOCK_LOAD();

  /* Fast path for prepended tasks */
  if (t->flags & TF_PREPENDED)
  {
    if (!d->prepended--)
      wbug("Got a pending task without pending bit set");
    if (DOMAIN_LOCK_PREPENDED(lock) == 0)
      wbug("Got a pending task with pending count zero");

    if (d->wrlocked)
    if (lock & DOMAIN_LOCK_WRLOCKED_BIT)
      wbug("Got a prepended reader task with writer locked");

    if (!d->rdlocked)
    if (DOMAIN_LOCK_RDLOCKED(lock) == 0)
      wbug("Reader shall be already locked");

    DOMAIN_LOCK_STORE(lock - DOMAIN_LOCK_PREPENDED_ONE);

    WDBG("-> Forcibly acquiring prepended lock.\n");
/*    d->rdlocked++;  is called by the prepender */

    domain_push_lock(d, 0);
    DOMAIN_STATLOG(WQS_DOMAIN_RDLOCK_SUCCESS, d, t);
    WQ_UNLOCK();
    return 1;
  }
  else if (d->wrlocked || d->wrsem_n || d->wrtasks_n)

  /* Slow path if locked for writing */
  if (lock & (DOMAIN_LOCK_WRLOCKED_BIT | DOMAIN_LOCK_WRITERS_BIT))
  {
    /* Blocked */
    /* Task is blocked */
    DOMAIN_LOCK_ENTER_SLOWPATH();
    add_tail(&(d->rdtasks), &(t->n));
    d->rdtasks_n++;
    DOMAIN_LOCK_EXIT_SLOWPATH(lock | DOMAIN_LOCK_READERS_BIT);

    WDBG("-> Blocked (n=%u)\n", d->rdtasks_n);
    DOMAIN_STATLOG(WQS_DOMAIN_RDLOCK_BLOCKED, d, t);
    WQ_UNLOCK();
    return 0;
  }
  else
  {
    d->rdlocked++;
    DOMAIN_LOCK_STORE(lock + DOMAIN_LOCK_RDLOCKED_ONE);

    WDBG("-> Instantly locked (n=%u)\n", DOMAIN_LOCK_RDLOCKED(lock));

    domain_push_lock(d, 0);
    WDBG("-> Instantly locked (n=%u)\n", d->rdlocked);
    DOMAIN_STATLOG(WQS_DOMAIN_RDLOCK_SUCCESS, d, t);
    WQ_UNLOCK();
    return 1;
  }
}
@@ -426,15 +478,19 @@ domain_read_lock(struct domain *d)
  WDBG("Secondary read lock: domain %p\n", d);
  DOMAIN_STATLOG(WQS_DOMAIN_RDLOCK_REQUEST, d, NULL);

  WQ_LOCK();
  if (d->wrlocked || d->wrsem_n || d->wrtasks_n)
  DOMAIN_LOCK_LOAD();

  if (lock & (DOMAIN_LOCK_WRLOCKED_BIT | DOMAIN_LOCK_WRITERS_BIT))
  {
    /* Blocked */
    DOMAIN_LOCK_ENTER_SLOWPATH();
    d->rdsem_n++;
    DOMAIN_LOCK_EXIT_SLOWPATH(lock | DOMAIN_LOCK_READERS_BIT);

    WDBG("-> Blocked (n=%u)\n", d->rdsem_n);

    DOMAIN_STATLOG(WQS_DOMAIN_RDLOCK_BLOCKED, d, NULL);
    WORKER_YIELD();
    WQ_UNLOCK();

    /* Wait until somebody unblocks us.
     * That thread also locks the lock for us
@@ -444,13 +500,13 @@ domain_read_lock(struct domain *d)
  }
  else
  {
    d->rdlocked++;
    WDBG("-> Instantly locked (n=%u)\n", d->rdlocked);
    WQ_UNLOCK();
    DOMAIN_LOCK_STORE(lock + DOMAIN_LOCK_RDLOCKED_ONE);

    WDBG("-> Instantly locked (n=%u)\n", DOMAIN_LOCK_RDLOCKED(lock));
  }

  DOMAIN_STATLOG(WQS_DOMAIN_RDLOCK_SUCCESS, d, NULL);
  domain_push_lock(d, 0);
  DOMAIN_STATLOG(WQS_DOMAIN_RDLOCK_SUCCESS, d, NULL);
}

static int
@@ -460,45 +516,52 @@ domain_write_lock_primary(struct domain *d, struct task *t)
  WDBG("Primary write lock: domain %p, task %p\n", d, t);
  DOMAIN_STATLOG(WQS_DOMAIN_WRLOCK_REQUEST, d, t);

  WQ_LOCK();
  DOMAIN_LOCK_LOAD();

  /* Fast path for prepended tasks */
  if (t->flags & TF_PREPENDED)
  {
    if (!d->prepended--)
      wbug("Got a pending task without pending bit set");
    if (DOMAIN_LOCK_PREPENDED(lock) != 1)
      wbug("Got a pending task without pending count set to 1");

    if (d->rdlocked)
    if (DOMAIN_LOCK_RDLOCKED(lock) > 0)
      wbug("Got a prepended writer task with reader locked");

    if (!d->wrlocked)
    if (!(lock & DOMAIN_LOCK_WRLOCKED_BIT))
      wbug("Writer shall be already locked here");

    DOMAIN_LOCK_STORE(lock - DOMAIN_LOCK_PREPENDED_ONE);

    WDBG("-> Forcibly acquiring prepended lock.\n");
/*    d->wrlocked = 1;  is called by the prepender */

    domain_push_lock(d, 1);
    DOMAIN_STATLOG(WQS_DOMAIN_WRLOCK_SUCCESS, d, t);
    WQ_UNLOCK();
    return 1;
  }
  else if (d->wrlocked || d->rdlocked || d->wrtasks_n || d->wrsem_n)

  /* Slow path if locked */
  if ((lock & (DOMAIN_LOCK_WRLOCKED_BIT | DOMAIN_LOCK_WRITERS_BIT))
      || (DOMAIN_LOCK_RDLOCKED(lock) > 0))
  {
    /* Blocked */
    /* Task is blocked */
    DOMAIN_LOCK_ENTER_SLOWPATH();
    add_tail(&(d->wrtasks), &(t->n));
    d->wrtasks_n++;
    DOMAIN_LOCK_EXIT_SLOWPATH(lock | DOMAIN_LOCK_WRITERS_BIT);

    WDBG("-> Blocked (n=%u)\n", d->wrtasks_n);
    DOMAIN_STATLOG(WQS_DOMAIN_WRLOCK_BLOCKED, d, t);
    WQ_UNLOCK();
    return 0;
  }
  else
  {
    if (d->rdtasks_n || d->rdsem_n || d->prepended)
      wbug("This shall never happen");
    DOMAIN_LOCK_STORE(lock | DOMAIN_LOCK_WRLOCKED_BIT);

    d->wrlocked = 1;
    domain_push_lock(d, 1);
    WDBG("-> Instantly locked\n");

    domain_push_lock(d, 1);
    DOMAIN_STATLOG(WQS_DOMAIN_WRLOCK_SUCCESS, d, t);
    WQ_UNLOCK();
    return 1;
  }
}
@@ -510,15 +573,21 @@ domain_write_lock(struct domain *d)
  WDBG("Secondary write lock: domain %p\n", d);
  DOMAIN_STATLOG(WQS_DOMAIN_WRLOCK_REQUEST, d, NULL);

  WQ_LOCK();
  if (d->wrlocked || d->rdlocked || d->wrsem_n || d->wrtasks_n)
  DOMAIN_LOCK_LOAD();

  /* Slow path if locked */
  if ((lock & (DOMAIN_LOCK_WRLOCKED_BIT | DOMAIN_LOCK_WRITERS_BIT))
      || (DOMAIN_LOCK_RDLOCKED(lock) > 0))
  {
    /* Blocked */
    DOMAIN_LOCK_ENTER_SLOWPATH();
    d->wrsem_n++;
    DOMAIN_LOCK_EXIT_SLOWPATH(lock | DOMAIN_LOCK_WRITERS_BIT);

    WDBG("-> Blocked (n=%u)\n", d->wrsem_n);

    DOMAIN_STATLOG(WQS_DOMAIN_WRLOCK_BLOCKED, d, NULL);
    WORKER_YIELD();
    WQ_UNLOCK();

    /* Wait until somebody unblocks us.
     * That thread also locks the lock for us
@@ -528,31 +597,28 @@ domain_write_lock(struct domain *d)
  }
  else
  {
    if (d->rdtasks_n || d->rdsem_n || d->prepended) 
      wbug("This shall never happen");
    DOMAIN_LOCK_STORE(lock | DOMAIN_LOCK_WRLOCKED_BIT);

    d->wrlocked = 1;
    WDBG("-> Instantly locked\n");
    WQ_UNLOCK();
  }

  DOMAIN_STATLOG(WQS_DOMAIN_WRLOCK_SUCCESS, d, NULL);
  domain_push_lock(d, 1);
  DOMAIN_STATLOG(WQS_DOMAIN_WRLOCK_SUCCESS, d, NULL);
}

static void
domain_unlock_common(struct domain *d)
static u64
domain_unlock_writers(struct domain *d, u64 lock)
{
  WQ_ASSERT_LOCKED();
  ASSERT(DOMAIN_LOCK_RDLOCKED(lock) == 0);
  ASSERT(!(lock & DOMAIN_LOCK_WRLOCKED_BIT));
  ASSERT(DOMAIN_LOCK_PREPENDED(lock) == 0);

  if (d->wrtasks_n)
  {
    WDBG("-> There is a writer task waiting for us (n=%u)\n", d->wrtasks_n);
    if (d->prepended || d->wrlocked)
    if ((lock & DOMAIN_LOCK_WRLOCKED_BIT) || DOMAIN_LOCK_PREPENDED(lock))
      wbug("This shall never happen");

    /* Lock the domain now */
    d->wrlocked = 1;

    /* Get the task */
    struct task *t = HEAD(d->wrtasks);
    rem_node(&t->n);
@@ -562,18 +628,26 @@ domain_unlock_common(struct domain *d)

    /* Lower the wrtasks_n counter */
    d->wrtasks_n--;

    return DOMAIN_LOCK_PREPENDED_ONE + DOMAIN_LOCK_WRLOCKED_BIT +
      ((d->wrtasks_n || d->wrsem_n) ? DOMAIN_LOCK_WRITERS_BIT : 0) +
      ((d->rdtasks_n || d->rdsem_n) ? DOMAIN_LOCK_READERS_BIT : 0);
  }
  else if (d->wrsem_n)

  if (d->wrsem_n)
  {
    WDBG("-> There is a secondary writer waiting for us (n=%u)\n", d->wrsem_n);
    
    /* Lock the domain now */
    d->wrlocked = 1;
    
    /* Wake up that thread */
    d->wrsem_n--;
    SEM_POST(&d->wrsem);

    return DOMAIN_LOCK_WRLOCKED_BIT +
      ((d->wrtasks_n || d->wrsem_n) ? DOMAIN_LOCK_WRITERS_BIT : 0) +
      ((d->rdtasks_n || d->rdsem_n) ? DOMAIN_LOCK_READERS_BIT : 0);
  }

  wbug("This shall never happen");
}

void
@@ -581,24 +655,33 @@ domain_read_unlock(struct domain *d)
{
  domain_assert_read_locked(d);
  WDBG("Read unlock: domain %p\n", d);

  WQ_LOCK();
  DOMAIN_STATLOG(WQS_DOMAIN_RDUNLOCK_REQUEST, d, NULL);
  if (!d->rdlocked)
    wbug("Read lock count underflow");

  d->rdlocked--;
  domain_pop_lock(d);
  DOMAIN_LOCK_LOAD();

  if (d->rdlocked)
  if (DOMAIN_LOCK_RDLOCKED(lock) == 0)
    wbug("Read lock count underflow");

  if (DOMAIN_LOCK_RDLOCKED(lock) > 1)
  {
    DOMAIN_LOCK_STORE(lock - DOMAIN_LOCK_RDLOCKED_ONE);
    WDBG("-> Unlocked leaving other readers behind (n=%u)\n", DOMAIN_LOCK_RDLOCKED(lock));
  }
  else if (lock & DOMAIN_LOCK_WRITERS_BIT)
  {
    WDBG("-> Unlocked leaving other readers behind (n=%u)\n", d->rdlocked);
    DOMAIN_LOCK_ENTER_SLOWPATH();
    DOMAIN_LOCK_EXIT_SLOWPATH(domain_unlock_writers(d, lock - DOMAIN_LOCK_RDLOCKED_ONE));
  }
  else
    domain_unlock_common(d);
  {
    /* Writing the completely unlocked state */
    ASSERT(lock == DOMAIN_LOCK_RDLOCKED_ONE);
    DOMAIN_LOCK_STORE(0);
    WDBG("-> Unlocked leaving the lock unlocked\n");
  }

  domain_pop_lock(d);
  DOMAIN_STATLOG(WQS_DOMAIN_RDUNLOCK_DONE, d, NULL);
  WQ_UNLOCK();
}

void
@@ -606,14 +689,30 @@ domain_write_unlock(struct domain *d)
{
  domain_assert_write_locked(d);
  WDBG("Write unlock: domain %p\n", d);

  WQ_LOCK();
  DOMAIN_STATLOG(WQS_DOMAIN_WRUNLOCK_REQUEST, d, NULL);
  if (!d->wrlocked)

  DOMAIN_LOCK_LOAD();

  if (!(lock & DOMAIN_LOCK_WRLOCKED_BIT))
    wbug("Write lock already unlocked");

  d->wrlocked = 0;
  if (DOMAIN_LOCK_PREPENDED(lock))
    wbug("Nothing shall be prepended on writer locked");

  if (!(lock & (DOMAIN_LOCK_WRITERS_BIT | DOMAIN_LOCK_READERS_BIT)))
  {
    /* Nobody is waiting on the lock */
    if (lock != DOMAIN_LOCK_WRLOCKED_BIT)
      wbug("Write unlock fast path triggered by mistake with state %lu", lock);

    /* Writing the completely unlocked state */
    DOMAIN_LOCK_STORE(0);
    domain_pop_lock(d);
    DOMAIN_STATLOG(WQS_DOMAIN_WRUNLOCK_DONE, d, NULL);
    return;
  }

  DOMAIN_LOCK_ENTER_SLOWPATH();

  uint w = d->wrtasks_n + d->wrsem_n;
  uint r = d->rdtasks_n + d->rdsem_n;
@@ -628,31 +727,36 @@ domain_write_unlock(struct domain *d)
    WDBG("-> Flushing readers: WP=%u WS=%u RP=%u RS=%u\n",
	d->wrtasks_n, d->wrsem_n, d->rdtasks_n, d->rdsem_n);
    /* We shall flush the readers */
    d->rdlocked = d->rdtasks_n + d->rdsem_n;
    u64 rdlocked = d->rdtasks_n + d->rdsem_n;

    /* Put the tasks into queue */
    uint check = 0;
    u64 prepend = 0;
    struct task *t, *tt;
    WALK_LIST_BACKWARDS_DELSAFE(t, tt, d->rdtasks)
    {
      rem_node(&t->n);
      TASK_PREPEND(d, t);
      check++;
      prepend++;
    }

    if (check != d->rdtasks_n || !EMPTY_LIST(d->rdtasks))
    if (prepend != d->rdtasks_n || !EMPTY_LIST(d->rdtasks))
      wbug("This shall never happen");

    d->rdtasks_n = 0;

    for ( ; d->rdsem_n; d->rdsem_n--)
      SEM_POST(&d->rdsem);

    DOMAIN_LOCK_EXIT_SLOWPATH(
	DOMAIN_LOCK_RDLOCKED_ONE * rdlocked +
	DOMAIN_LOCK_PREPENDED_ONE * prepend +
	((d->wrtasks_n || d->wrsem_n) ? DOMAIN_LOCK_WRITERS_BIT : 0));
  }
  else
    domain_unlock_common(d);
    DOMAIN_LOCK_EXIT_SLOWPATH(domain_unlock_writers(d, lock & ~DOMAIN_LOCK_WRLOCKED_BIT));

  domain_pop_lock(d);
  DOMAIN_STATLOG(WQS_DOMAIN_WRUNLOCK_DONE, d, NULL);
  WQ_UNLOCK();
}

extern _Thread_local struct timeloop *timeloop_current;
@@ -719,7 +823,6 @@ worker_loop(void *_data UNUSED)
       * when the lock is available. */

      WDBG("Worker yielding\n");
      WQ_LOCKED
      WORKER_YIELD();

      /* If this task was prepended, the available semaphore was not waited for */
@@ -738,10 +841,10 @@ worker_loop(void *_data UNUSED)
      WDBG("Worker stopping\n");
      wq->stop--;
      wq->running--;
      WQ_UNLOCK();

      /* Let others work instead of us */
      WORKER_YIELD();
      WQ_UNLOCK();

      /* This makes one worker less available */
      SEM_WAIT(&wq->available);
@@ -813,10 +916,12 @@ worker_queue_destroy(void)
  while (wq->running)
  {
    TASK_STOP_WORKER;
    WORKER_YIELD();
    WQ_UNLOCK();

    WORKER_YIELD();
    SEM_WAIT(&wq->stopped);
    WORKER_CONTINUE();

    WQ_LOCK();
  }

@@ -934,9 +1039,10 @@ task_push(struct task *t)
    /* No available worker. We're going to sleep.
     * Anyway, the task still exists in the queue. */
    TASK_APPEND(t);
    WORKER_YIELD();
    WQ_UNLOCK();

    WORKER_YIELD();

    /* Wait until somebody picks the task up */
    SEM_WAIT(&wq->available);
    WORKER_CONTINUE();