Commit e75079fd authored by Maria Matejka's avatar Maria Matejka
Browse files

Worker: Performance cleaning of fast-path in task pushers unblocking

parent 3f67a11d
Loading
Loading
Loading
Loading
+24 −14
Original line number Diff line number Diff line
@@ -916,16 +916,23 @@ worker_loop(void *_data UNUSED)
  while (1) {
    WQ_LOCK_PREFETCH(wq->pending.head, NULL);

    uint blocked = 0;
    if (!prepended)
      blocked = atomic_load_explicit(&wq->blocked, memory_order_relaxed);

    if (!SEM_TRYWAIT(&wq->waiting))
    {
      WDBG("Worker will wait\n");
      if (!prepended)
	if (atomic_load_explicit(&wq->blocked, memory_order_relaxed))
	  atomic_fetch_add_explicit(&wq->postponed, 1, memory_order_relaxed);
	else
      WORKER_YIELD();

      /* If no work is waiting, we must release the available-semaphore */
      SEM_POST(&wq->available);

      /* And also flush all the postponed semaphores */
      uint postponed = atomic_exchange_explicit(&wq->postponed, 0, memory_order_relaxed);
      for (uint i=0; i<postponed; i++)
	SEM_POST(&wq->available);

      WORKER_YIELD();
      SEM_WAIT(&wq->waiting);
      WORKER_CONTINUE();
      WDBG("Worker woken up\n");
@@ -933,8 +940,11 @@ worker_loop(void *_data UNUSED)
    else
    {
      WDBG("Worker won't wait\n");
      if (!prepended)
	if (atomic_load_explicit(&wq->blocked, memory_order_relaxed))

      if (blocked)
	/* There are some task_push() calls blocked. We keep them blocked
	 * until the queue is processed to prevent unnecessary semaphore
	 * calls and context switches. */
	atomic_fetch_add_explicit(&wq->postponed, 1, memory_order_relaxed);
      else
	SEM_POST(&wq->available);
@@ -949,14 +959,14 @@ worker_loop(void *_data UNUSED)
      struct task *t = HEAD(wq->pending);
      rem_node(&t->n);

      /* Check list for emptiness */
      /* Check worker queue for emptiness */
      int empty = EMPTY_LIST(wq->pending);

      /* No more operations on worker queue */
      WQ_UNLOCK();

      /* Flush the postponed available semaphores */
      if (empty && atomic_load_explicit(&wq->postponed, memory_order_relaxed))
      /* Flush the postponed available semaphores if the queue is empty */
      if (empty)
      {
	uint postponed = atomic_exchange_explicit(&wq->postponed, 0, memory_order_relaxed);
	for (uint i=0; i<postponed; i++)