Commit 8e7d2e43 authored by Maria Matejka's avatar Maria Matejka
Browse files

Worker: Out-of-order execution fixed

parent 4a06a7f4
Loading
Loading
Loading
Loading
+19 −4
Original line number Diff line number Diff line
@@ -968,11 +968,18 @@ worker_loop(void *_data UNUSED)

  WDBG("Worker started\n");

  /* Prepended task indicator */
  _Bool prepended = 0;

  /* How many prioritized tasks have been executed (so how many times we should not post the available semaphore) */
  uint nopost = 0;
 
  /* Run the loop */
  while (1) {
    WQ_LOCK_PREFETCH(wq->pending.head);
    /* Update the prioritized tasks counter and reset the indicators. */
    nopost += prepended + tail_task;
    prepended = tail_task = 0;

    /* First of all, there must be some task. If there is no task,
     * the thread shall sleep.
@@ -1003,16 +1010,20 @@ worker_loop(void *_data UNUSED)
	/* Sleep until some task is available */
	SEM_WAIT(&wq->waiting);

	/* But first let it to others */
	/* But first let it to others as we don't hold the yield semaphore */
	SEM_POST(&wq->waiting);
      }

      /* Now wait for our turn */
      /* Now wait for our turn, go around */
      WORKER_CONTINUE();
      continue;
    }
    else if ((!prepended) && (!tail_task))
      /* Picked up a task. The availability semaphore will be posted later. */

    if (nopost)
      /* Don't post the availability semaphore if we were processing an out-of-order task */
      nopost--;
    else
      /* The availability semaphore will be posted later. */
      atomic_fetch_add_explicit(&wq->postponed, 1, memory_order_relaxed);

    WQ_LOCK();
@@ -1096,6 +1107,10 @@ worker_loop(void *_data UNUSED)
      /* Let others work instead of us */
      WORKER_DO_YIELD();

      /* Flush the out-of-order counter */
      for (uint i=0; i<nopost; i++)
	SEM_WAIT(&wq->available);

      /* Notify the stop requestor */
      SEM_POST(&wq->stopped);

+1 −0
Original line number Diff line number Diff line
@@ -532,6 +532,7 @@ void cmd_reconfig_undo_notify(void) {}
void sysdep_preconfig(struct config *c) {
  c->workers = 4;
  c->max_workers = 8;
  c->queue_size = 64;
}

int sysdep_commit(struct config *new, struct config *old UNUSED) {