Commit 06fa42d6 authored by Maria Matejka's avatar Maria Matejka
Browse files

Worker: Simplification of code

parent d6d60f7a
Loading
Loading
Loading
Loading
+30 −37
Original line number Diff line number Diff line
@@ -909,39 +909,46 @@ worker_loop(void *_data UNUSED)
  while (1) {
    WQ_LOCK_PREFETCH(wq->pending.head);

    uint blocked = 0;
    if (!prepended)
      blocked = atomic_load_explicit(&wq->blocked, memory_order_relaxed);
    /* First of all, there must be some task. If there is no task,
     * the thread shall sleep.
     * Then, if there is any task but maximum concurrent workers is reached,
     * the thread shall also sleep.
     * Only if we are allowed to run and there is a task, we shall run.
     */

    if (!SEM_TRYWAIT(&wq->waiting))
    {
      /* There is no worker waiting. Slow path! */
      WDBG("Worker will wait\n");
      WORKER_YIELD();

      /* If no work is waiting, we must release the available-semaphore */
      SEM_POST(&wq->available);
      /* Yield. There may be others waiting for release */
      WORKER_DO_YIELD();

      if (SEM_TRYWAIT(&wq->yield))
      {
	/* Free worker with no work! Releasing the worker. */
	SEM_POST(&wq->yield);

      /* And also flush all the postponed semaphores */
      uint postponed = atomic_exchange_explicit(&wq->postponed, 0, memory_order_relaxed);
	/* Flush the postponed available semaphores
	 * as the queue is certainly empty. */
	uint postponed = atomic_exchange_explicit(&wq->postponed, 0, memory_order_relaxed) + !prepended;
	for (uint i=0; i<postponed; i++)
	  SEM_POST(&wq->available);

	/* Sleep until some task is available */
	SEM_WAIT(&wq->waiting);

	/* But first let it to others */
	SEM_POST(&wq->waiting);
      }

      /* Now wait for our turn */
      WORKER_CONTINUE();
      WDBG("Worker woken up\n");
      continue;
    }
    else
    {
      WDBG("Worker won't wait\n");

      if (blocked)
	/* There are some task_push() calls blocked. We keep them blocked
	 * until the queue is processed to prevent unnecessary semaphore
	 * calls and context switches. */
      /* Picked up a task. The availability semaphore will be posted later. */
      atomic_fetch_add_explicit(&wq->postponed, 1, memory_order_relaxed);
      else
	SEM_POST(&wq->available);
    }

    WQ_LOCK();

@@ -952,20 +959,9 @@ worker_loop(void *_data UNUSED)
      struct task *t = HEAD(wq->pending);
      rem_node(&t->n);

      /* Check worker queue for emptiness */
      int empty = EMPTY_LIST(wq->pending);

      /* No more operations on worker queue */
      WQ_UNLOCK();

      /* Flush the postponed available semaphores if the queue is empty */
      if (empty)
      {
	uint postponed = atomic_exchange_explicit(&wq->postponed, 0, memory_order_relaxed);
	for (uint i=0; i<postponed; i++)
	  SEM_POST(&wq->available);
      }

      /* Store the old flags and domain */
      struct domain *d = t->domain;
      enum task_flags tf = t->flags;
@@ -1013,9 +1009,6 @@ worker_loop(void *_data UNUSED)
      /* Let others work instead of us */
      WORKER_DO_YIELD();

      /* This makes one worker less available */
      SEM_WAIT(&wq->available);

      /* Notify the stop requestor */
      SEM_POST(&wq->stopped);