Commit 128fcd42 authored by Maria Matejka's avatar Maria Matejka
Browse files

Nest: Semaphores used to maintain maximum number of import queue items

parent 8e7d2e43
Loading
Loading
Loading
Loading
+2 −1
Original line number Diff line number Diff line
@@ -230,7 +230,7 @@ class BIRDWQStateLogPrinter(BIRDPrinter):
                }

    def taskval(self):
        return "%(worker) 4d %(what)s: %(execute)s with%(out)s flags %(tf_exclusive)s%(tf_tail)s%(tf_idempotent)s%(tf_prepended)son %(domain)s" % {
        return "%(worker) 4d %(what)s: %(execute)s with%(out)s flags %(tf_exclusive)s%(tf_tail)s%(tf_idempotent)s%(tf_prepended)son %(domain)s at %(date)s" % {
                "worker": self.val['worker_id'],
                "what": str(self.val["what"]),
                "execute": str(self.val["task"]["execute"]),
@@ -240,6 +240,7 @@ class BIRDWQStateLogPrinter(BIRDPrinter):
                "tf_tail": "TF_TAIL " if self.val["task"]["flags"] & 0x2 == 0x2 else "",
                "tf_idempotent": "TF_IDEMPOTENT " if self.val["task"]["flags"] & 0x4 == 0x4 else "",
                "tf_prepended": "TF_PREPENDED " if self.val["task"]["flags"] & 0x100 == 0x100 else "",
                "date": "%(sec)d.%(nsec)09d" % { "sec": self.val["task"]["when"]["tv_sec"], "nsec": self.val["task"]["when"]["tv_nsec"], },
                }

    def nothing(self):
+4 −0
Original line number Diff line number Diff line
@@ -14,6 +14,10 @@

struct config;

struct semaphore *semaphore_new(pool *p, uint n);
void semaphore_wait(struct semaphore *s);
void semaphore_post(struct semaphore *s);

struct domain *domain_new(pool *p);
void domain_read_lock(struct domain *);
void domain_read_unlock(struct domain *);
+61 −44
Original line number Diff line number Diff line
@@ -60,10 +60,15 @@ static slab *rte_slab;
static linpool * volatile rte_update_pool[RUPS_MAX] = {};
static volatile uint rups = 0;
static volatile uint rup_total_linpools = 0;
static struct semaphore *rup_sem = NULL;

static inline linpool *rup_get(void) {
  SPIN_LOCK(rup_spinlock);
  struct linpool *pool;

  /* Wait until some linpool is available */
  semaphore_wait(rup_sem);

  SPIN_LOCK(rup_spinlock);
  if (!rups)
  {
    pool = lp_new_default(rup_pool);
@@ -77,7 +82,8 @@ static inline linpool *rup_get(void) {
    pool = rte_update_pool[--rups];
  }
  SPIN_UNLOCK(rup_spinlock);
  log(L_INFO "Linpool state %u (get)", rup_total_linpools);

  debug("Linpool state %u (get)\n", rup_total_linpools);
  return pool;
}

@@ -93,7 +99,9 @@ static inline void rup_free(linpool *pool) {
    rte_update_pool[rups++] = pool;
  }
  SPIN_UNLOCK(rup_spinlock);
  log(L_INFO "Linpool state %u (free)", rup_total_linpools);

  semaphore_post(rup_sem);
  debug("Linpool state %u (free)\n", rup_total_linpools);
}

static inline void rud_state_change(struct rte_update_data *rud, enum rte_update_state from, enum rte_update_state to)
@@ -1447,7 +1455,7 @@ rte_finish_update_schedule(struct rte_update_data *rud, _Bool tail)
{
  struct task *task = &(rud->task);

  log(L_INFO "FUS: %d", tail);
  debug("rte_finish_update_schedule(%p, %d)\n", rud, tail);

  /* Check whether it is to be pushed now.
   * There are two cases.
@@ -1491,10 +1499,9 @@ rte_do_update(struct task *task)

  const struct filter *filter = c->in_filter;

  if (filter == FILTER_ACCEPT)
    rud->result = RUR_ACCEPTED;
  else
    {
  ASSERT(filter != FILTER_ACCEPT);
  ASSERT((filter != FILTER_REJECT) || (c->in_keep_filtered));

  int fr;
  if (filter == FILTER_REJECT)
    fr = F_REJECT;
@@ -1507,20 +1514,14 @@ rte_do_update(struct task *task)
  if (fr > F_ACCEPT)
  {
    rud->result = RUR_FILTERED;
	if (!c->in_keep_filtered)
	  goto done;
	
    new->flags |= REF_FILTERED;
  }
  else
    rud->result = RUR_ACCEPTED;

      if (filter != FILTER_REJECT)
  rte_store_tmp_attrs(new, rud->pool);
    }
#undef new

done:
  return rte_finish_update_schedule(rud, 1);
}

@@ -1567,7 +1568,7 @@ done:
static void
rte_dispatch_update(struct rte_update_data *rud)
{
  log(L_INFO "RDU");
  debug("RDU\n");

  /* Get local linpool */
  ASSERT(rud->pool == NULL);
@@ -1587,9 +1588,24 @@ rte_dispatch_update(struct rte_update_data *rud)
  /* Insert rud into the channel's synchronization queue */
  ADD_TAIL_LOCKED(&rud->channel->pending_imports, rud);

  /* Is route update */
  const struct filter *filter = rud->channel->in_filter;
  if ((filter == FILTER_ACCEPT) || (filter == FILTER_REJECT) || !rud->rte)
  {
    /* Is trivial */
    if (rud->rte)
      if (filter == FILTER_ACCEPT)
	rud->result = RUR_ACCEPTED;
      else
	rud->result = RUR_FILTERED;
    else
      rud->result = RUR_WITHDRAW;

    rud_state_change(rud, RUS_PENDING_UPDATE, RUS_UPDATING);
    rte_finish_update_schedule(rud, 0);
  }
  else
  {
    /* Run non-trivial filter */
    /* Create local rte copy */
    rud->rte = LP_DUPLICATE(rud->pool, rud->rte);
    
@@ -1617,13 +1633,6 @@ rte_dispatch_update(struct rte_update_data *rud)
    task_init(&rud->task, 0, NULL, rte_do_update);
    task_push(&rud->task);
  }
  else	/* Is route withdraw */
  {
    /* Skip filter for withdraw */
    rud->result = RUR_WITHDRAW;
    rud_state_change(rud, RUS_PENDING_UPDATE, RUS_UPDATING);
    rte_finish_update_schedule(rud, 0);
  }
}

static inline int
@@ -1690,9 +1699,11 @@ rte_update2(struct channel *c, const net_addr *n, rte *new, struct rte_src *src)
  rte_dispatch_update(&rud);
}

static void
static int
rte_finish_update(struct rte_update_data *rud)
{
  int out = 0;

  domain_assert_write_locked(rud->channel->table->domain);

  rud_state_check(rud, RUS_RECALCULATING);
@@ -1716,7 +1727,6 @@ rte_finish_update(struct rte_update_data *rud)
      if (!rud->channel->in_keep_filtered)
	rud->rte = NULL;
      break;
      /* Fallthrough */
    case RUR_ACCEPTED:
      stats->imp_updates_accepted++;
      break;
@@ -1741,6 +1751,7 @@ rte_finish_update(struct rte_update_data *rud)
      if (rud->result == RUR_WITHDRAW)
	stats->imp_withdraws_ignored++;

      out = 1;
      goto done;
    }
  }
@@ -1759,14 +1770,14 @@ rte_finish_update(struct rte_update_data *rud)
done:
  /* And return the pool */
  rup_free(rud->pool);
  return;
  return out;
}

/* This hook is run in route table domain locked for writing */
static void
rte_finish_update_hook(struct task *task)
{
  log(L_INFO "FUH: %d");
  debug("FUH: %d\n");

  struct rte_update_data *rud = SKIP_BACK(struct rte_update_data, task, task);
  struct channel *c = rud->channel;
@@ -1774,11 +1785,12 @@ rte_finish_update_hook(struct task *task)

  domain_assert_write_locked(rt->domain);

  /* If this task is run, it must be the first pending import */
  /* If this task is run, it must be the first pending import.
   * Removing this import from the queue. */
  ASSERT(REM_HEAD_LOCKED(&(c->pending_imports)) == rud);

  /* Do the real table update */
  rte_finish_update(rud);
  int fast = rte_finish_update(rud);

  /* Look for the next node */
  rud = NULL;
@@ -1801,6 +1813,10 @@ rte_finish_update_hook(struct task *task)
  if (!rud)
    return;

  /* Merge fast route updates */
  if (fast)
    return rte_finish_update_hook(&(rud->task));

  task_init(&(rud->task), TF_EXCLUSIVE | TF_TAIL, rud->channel->table->domain, rte_finish_update_hook);
  task_push(&(rud->task));
}
@@ -2085,6 +2101,7 @@ rt_init(void)

  SPIN_INIT(rup_spinlock);
  rup_pool = rp_new(&root_pool, "Route updates");
  rup_sem = semaphore_new(rup_pool, RUPS_MAX);

  rte_slab = sl_new(rt_table_pool, sizeof(rte));
  init_list(&routing_tables);
+77 −16
Original line number Diff line number Diff line
#undef LOCAL_DEBUG
//#define LOCAL_DEBUG
#undef DEBUG_STATELOG
//#define DEBUG_STATELOG
//#undef DEBUG_STATELOG
#define DEBUG_STATELOG

#ifndef _GNU_SOURCE
#define _GNU_SOURCE
@@ -37,7 +37,7 @@ _Thread_local u64 worker_id;
#define WASSERT(what) do { if (!(what)) wbug("This shall never happen: " #what " at %s:%d", __FILE__, __LINE__); } while (0)

#ifdef DEBUG_STATELOG
#define STATELOG_SIZE_ (1 << 14)
#define STATELOG_SIZE_ (1 << 18)
static const uint STATELOG_SIZE = STATELOG_SIZE_;
#endif

@@ -110,6 +110,7 @@ static struct worker_queue {
	enum task_flags flags;
	struct domain *domain;
	void (*execute)(struct task *);
	struct timespec when;
      } task;
    };
  } statelog[STATELOG_SIZE_];
@@ -130,8 +131,11 @@ struct worker_queue *wq = &wq_;
#define WQ_STATELOG_QUEUE(what_, locked) \
  WQ_STATELOG(what_, .queue = { .running = ADL(wq->running), .workers = ADL(wq->workers), .max_workers = ADL(wq->max_workers), .stop = ADL(wq->stop), .pending = ADL(wq->pending_count), .blocked = ADL(wq->blocked), .postponed = ADL(wq->postponed) })

#define WQ_STATELOG_TASK_EXPLICIT(what_, f_, d_, e_) \
  WQ_STATELOG(what_, .task = { .flags = f_, .domain = d_, .execute = e_ })
#define WQ_STATELOG_TASK_EXPLICIT(what_, f_, d_, e_) do { \
  struct timespec when; \
  clock_gettime(CLOCK_MONOTONIC, &when); \
  WQ_STATELOG(what_, .task = { .flags = f_, .domain = d_, .execute = e_, .when = when }); \
} while (0)

#define WQ_STATELOG_TASK(what_, task_) \
  WQ_STATELOG_TASK_EXPLICIT(what_, task_->flags, task_->domain, task_->execute)
@@ -235,15 +239,15 @@ static inline void SEM_INIT(sem_t *s, uint val)
    wbug("sem_init() failed: %m");
}

#define SEM_WAIT(_s) do { \
  sem_t *s = _s; \
  WQ_STATELOG(WQS_SEM_WAIT_REQUEST, .sem = s); \
  while (sem_wait(s) < 0) { \
#define SEM_WAIT(s) do { \
  sem_t *_s = s; \
  WQ_STATELOG(WQS_SEM_WAIT_REQUEST, .sem = _s); \
  while (sem_wait(_s) < 0) { \
    if (errno == EINTR) \
      continue; \
    wdie("sem_wait: %m"); \
  } \
  WQ_STATELOG(WQS_SEM_WAIT_SUCCESS, .sem = s); \
  WQ_STATELOG(WQS_SEM_WAIT_SUCCESS, .sem = _s); \
} while (0)

static inline int SEM_TRYWAIT(sem_t *s)
@@ -265,11 +269,11 @@ static inline int SEM_TRYWAIT(sem_t *s)
}

//  if (s == &(wq->available)) { int n; sem_getvalue(s, &n); if (n > 64) bug("!"); }
#define SEM_POST(_s) do { \
  sem_t *s = _s; \
  if (sem_post(s) < 0) \
#define SEM_POST(s) do { \
  sem_t *_s = s; \
  if (sem_post(_s) < 0) \
    wbug("sem_post: %m"); \
  WQ_STATELOG(WQS_SEM_POST, .sem = s); \
  WQ_STATELOG(WQS_SEM_POST, .sem = _s); \
} while (0)

static inline void SEM_DESTROY(sem_t *s)
@@ -352,6 +356,63 @@ static inline void WORKER_CONTINUE(void)

static _Thread_local struct timeloop worker_timeloop;

struct semaphore {
  resource r;
  sem_t sem;		/* A single semaphore */
};

static void
semaphore_free(resource *r)
{
  struct semaphore *s = SKIP_BACK(struct semaphore, r, r);
  sem_destroy(&s->sem);
}

static void
semaphore_dump(resource *r)
{
  struct semaphore *s = SKIP_BACK(struct semaphore, r, r);
  int val;
  if (!sem_getvalue(&s->sem, &val))
    bug("sem_getvalue() error: %m");

  debug("Semaphore: %d\n", val);
}

static struct resclass semaphore_resclass = {
  .name = "Semaphore",
  .size = sizeof(struct semaphore),
  .free = semaphore_free,
  .dump = semaphore_dump,
  .lookup = NULL,
  .memsize = NULL,
};

struct semaphore *semaphore_new(pool *p, uint n)
{
  struct semaphore *s = ralloc(p, &semaphore_resclass);

  if (sem_init(&(s->sem), 0, n) < 0)
    bug("Semaphore init error: %m");

  return s;
}

void semaphore_wait(struct semaphore *s)
{
  if (!SEM_TRYWAIT(&(s->sem)))
  {
    WORKER_YIELD();
    SEM_WAIT(&(s->sem));
    WORKER_CONTINUE();
  }
}

void semaphore_post(struct semaphore *s)
{
  SEM_POST(&(s->sem));
}

struct domain {
  resource r;
  sem_t rdsem;		/* Wait semaphore for readers */
@@ -419,7 +480,7 @@ retry: do { \
} while (0)
#endif

void
static void
domain_free(resource *r)
{
  struct domain *d = SKIP_BACK(struct domain, r, r);
@@ -427,7 +488,7 @@ domain_free(resource *r)
  sem_destroy(&d->wrsem);
}

void
static void
domain_dump(resource *r)
{
  struct domain *d = SKIP_BACK(struct domain, r, r);