Commit 0fd1c1d0 authored by Maria Matejka's avatar Maria Matejka
Browse files

Route attribute cache is now lockless on read / clone.

Lots of time was spent locking when accessing route attribute cache.
This overhead should be now reduced to a minimum.
parent 2a224a9e
Loading
Loading
Loading
Loading
+11 −14
Original line number Diff line number Diff line
@@ -606,8 +606,8 @@ struct rte_src {


typedef struct rta {
  struct rta *next, **pprev;		/* Hash chain */
  u32 uc;				/* Use count */
  struct rta * _Atomic next, * _Atomic *pprev;	/* Hash chain */
  _Atomic u32 uc;			/* Use count */
  u32 hash_key;				/* Hash over important fields */
  struct ea_list *eattrs;		/* Extended Attribute chain */
  struct hostentry *hostentry;		/* Hostentry for recursive next-hops */
@@ -758,12 +758,6 @@ struct rte_owner {
  event *stop;
};

DEFINE_DOMAIN(attrs);
extern DOMAIN(attrs) attrs_domain;

#define RTA_LOCK	LOCK_DOMAIN(attrs, attrs_domain)
#define RTA_UNLOCK	UNLOCK_DOMAIN(attrs, attrs_domain)

#define RTE_SRC_PU_SHIFT      44
#define RTE_SRC_IN_PROGRESS   (1ULL << RTE_SRC_PU_SHIFT)

@@ -879,20 +873,23 @@ static inline size_t rta_size(const rta *a) { return sizeof(rta) + sizeof(u32)*a
#define RTA_MAX_SIZE (sizeof(rta) + sizeof(u32)*MPLS_MAX_LABEL_STACK)
rta *rta_lookup(rta *);			/* Get rta equivalent to this one, uc++ */
static inline int rta_is_cached(rta *r) { return r->cached; }

static inline rta *rta_clone(rta *r) {
  RTA_LOCK;
  r->uc++;
  RTA_UNLOCK;
  u32 uc = atomic_fetch_add_explicit(&r->uc, 1, memory_order_acq_rel);
  ASSERT_DIE(uc > 0);
  return r;
}

void rta__free(rta *r);
static inline void rta_free(rta *r) {
  RTA_LOCK;
  if (r && !--r->uc)
  if (!r)
    return;

  u32 uc = atomic_fetch_sub_explicit(&r->uc, 1, memory_order_acq_rel);
  if (uc == 1)
    rta__free(r);
  RTA_UNLOCK;
}

rta *rta_do_cow(rta *o, linpool *lp);
static inline rta * rta_cow(rta *r, linpool *lp) { return rta_is_cached(r) ? rta_do_cow(r, lp) : r; }
static inline void rta_uncache(rta *r) { r->cached = 0; r->uc = 0; }
+161 −55
Original line number Diff line number Diff line
@@ -54,6 +54,7 @@
#include "lib/hash.h"
#include "lib/idm.h"
#include "lib/resource.h"
#include "lib/rcu.h"
#include "lib/string.h"

#include <stddef.h>
@@ -85,8 +86,8 @@ const char * rta_dest_names[RTD_MAX] = {
  [RTD_PROHIBIT]	= "prohibited",
};

DOMAIN(attrs) attrs_domain;
DOMAIN(attrs) src_domain;
DEFINE_DOMAIN(attrs);
static DOMAIN(attrs) src_domain;

#define SRC_LOCK	LOCK_DOMAIN(attrs, src_domain)
#define SRC_UNLOCK	UNLOCK_DOMAIN(attrs, src_domain)
@@ -1166,21 +1167,28 @@ ea_append(ea_list *to, ea_list *what)
 *	rta's
 */

static uint rta_cache_count;
static uint rta_cache_size = 32;
static uint rta_cache_limit;
static uint rta_cache_mask;
static rta **rta_hash_table;
static DOMAIN(attrs) attrs_domain;

static void
rta_alloc_hash(void)
#define RTA_LOCK	LOCK_DOMAIN(attrs, attrs_domain)
#define RTA_UNLOCK	UNLOCK_DOMAIN(attrs, attrs_domain)

struct rta_cache {
  u32 count;
  u32 size;
  u32 limit;
  u32 mask;
  rta * _Atomic table[0];
} * _Atomic rta_cache;
// rta_aux, rta_cache = { .size = ATOMIC_VAR_INIT(32), };

static struct rta_cache *
rta_alloc_hash(u32 size)
{
  rta_hash_table = mb_allocz(rta_pool, sizeof(rta *) * rta_cache_size);
  if (rta_cache_size < 32768)
    rta_cache_limit = rta_cache_size * 2;
  else
    rta_cache_limit = ~0;
  rta_cache_mask = rta_cache_size - 1;
  struct rta_cache *c = mb_allocz(rta_pool, sizeof(struct rta_cache) + sizeof(rta * _Atomic) * size);
  c->size = size;
  c->limit = (size >> 20) ? (~0U) : (size * 2);
  c->mask = size - 1;
  return c;
}

static inline uint
@@ -1234,34 +1242,88 @@ rta_copy(rta *o)
}

static inline void
rta_insert(rta *r)
rta_insert(rta *r, struct rta_cache *c)
{
  uint h = r->hash_key & rta_cache_mask;
  r->next = rta_hash_table[h];
  if (r->next)
    r->next->pprev = &r->next;
  r->pprev = &rta_hash_table[h];
  rta_hash_table[h] = r;
  uint h = r->hash_key & c->mask;
  rta *next = atomic_load_explicit(&c->table[h], memory_order_relaxed);

  atomic_store_explicit(&r->next, next, memory_order_relaxed);
  r->pprev = &c->table[h];

  if (next)
    next->pprev = &r->next;

  /* This store MUST be the last and MUST have release order for thread-safety */
  atomic_store_explicit(&c->table[h], r, memory_order_release);
}

static void
rta_rehash(void)
rta_rehash(struct rta_cache *c)
{
  uint ohs = rta_cache_size;
  uint h;
  rta *r, *n;
  rta **oht = rta_hash_table;
  u32 os = c->size;

  rta_cache_size = 2*rta_cache_size;
  DBG("Rehashing rta cache from %d to %d entries.\n", ohs, rta_cache_size);
  rta_alloc_hash();
  for(h=0; h<ohs; h++)
    for(r=oht[h]; r; r=n)
  struct rta_cache *nc = rta_alloc_hash(os * 2);
  nc->count = c->count;

  /* First we simply copy every chain to both new locations */
  for (u32 h = 0; h < os; h++)
  {
	n = r->next;
	rta_insert(r);
    rta *r = atomic_load_explicit(&c->table[h], memory_order_relaxed);
    atomic_store_explicit(&nc->table[h], r, memory_order_relaxed);
    atomic_store_explicit(&nc->table[h + os], r, memory_order_relaxed);
  }
  mb_free(oht);

  /* Then we exchange the hashes; release semantics forces the previous code to be already done */
  atomic_store_explicit(&rta_cache, nc, memory_order_release);

  /* And now we pass through both chains and filter them */
  for (u32 h = 0; h < c->size; h++)
  {
    rta * _Atomic * ap = &nc->table[h];
    rta * _Atomic * bp = &nc->table[h + os];

    rta *r = atomic_load_explicit(ap, memory_order_relaxed);
    ASSERT_DIE(r == atomic_load_explicit(bp, memory_order_relaxed));

    while (r)
    {
      if (r->hash_key & os)
      {
	r->pprev = bp;
	atomic_store_explicit(bp, r, memory_order_release);
	bp = &r->next;
      }
      else
      {
	r->pprev = ap;
	atomic_store_explicit(ap, r, memory_order_release);
	ap = &r->next;
      }

      r = atomic_load_explicit(&r->next, memory_order_acquire);
    }

    atomic_store_explicit(ap, NULL, memory_order_release);
    atomic_store_explicit(bp, NULL, memory_order_release);
  }

  synchronize_rcu();
  mb_free(c);
}

static rta *
rta_find(rta *o, u32 h, struct rta_cache *c)
{
  rta *r = NULL;

  for (r = atomic_load_explicit(&c->table[h & c->mask], memory_order_acquire); r; r = atomic_load_explicit(&r->next, memory_order_acquire))
    if (r->hash_key == h && rta_same(r, o))
    {
      atomic_fetch_add_explicit(&r->uc, 1, memory_order_acq_rel);
      return r;
    }

  return NULL;
}

/**
@@ -1289,24 +1351,34 @@ rta_lookup(rta *o)

  h = rta_hash(o);

  /* Lockless lookup */
  rcu_read_lock();
  r = rta_find(o, h, atomic_load_explicit(&rta_cache, memory_order_acquire));
  rcu_read_unlock();

  if (r)
    return r;

  RTA_LOCK;

  for(r=rta_hash_table[h & rta_cache_mask]; r; r=r->next)
    if (r->hash_key == h && rta_same(r, o))
  /* Locked lookup to avoid duplicates if possible */
  struct rta_cache *c = atomic_load_explicit(&rta_cache, memory_order_acquire);
  r = rta_find(o, h, c);
  if (r)
  {
      r->uc++;
    RTA_UNLOCK;
    return r;
  }

  /* Store the rta */
  r = rta_copy(o);
  r->hash_key = h;
  r->cached = 1;
  rt_lock_hostentry(r->hostentry);
  rta_insert(r);
  rta_insert(r, c);

  if (++rta_cache_count > rta_cache_limit)
    rta_rehash();
  if (++c->count > c->limit)
    rta_rehash(c);

  RTA_UNLOCK;
  return r;
@@ -1315,17 +1387,47 @@ rta_lookup(rta *o)
void
rta__free(rta *a)
{
  ASSERT(rta_cache_count && a->cached);
  rta_cache_count--;
  *a->pprev = a->next;
  if (a->next)
    a->next->pprev = a->pprev;
  ASSERT(a->cached);

  RTA_LOCK;
  struct rta_cache *c = atomic_load_explicit(&rta_cache, memory_order_acquire);

  if (atomic_load_explicit(&a->uc, memory_order_acquire))
  {
    /* Acquired inbetween */
    RTA_UNLOCK;
    return;
  }

  /* Relink the forward pointer */
  rta *next = atomic_load_explicit(&a->next, memory_order_acquire);
  atomic_store_explicit(a->pprev, next, memory_order_release);

  /* Relink the backwards pointer */
  if (next)
    next->pprev = a->pprev;

  /* Wait until nobody knows about us */
  synchronize_rcu();

  if (atomic_load_explicit(&a->uc, memory_order_acquire))
  {
    /* Acquired inbetween, relink back */
    rta_insert(a, c);
    RTA_UNLOCK;
    return;
  }

  /* Cleared to free the memory */
  rt_unlock_hostentry(a->hostentry);
  if (a->nh.next)
    nexthop_free(a->nh.next);
  ea_free(a->eattrs);
  a->cached = 0;
  c->count--;
  sl_free(rta_slab(a), a);

  RTA_UNLOCK;
}

rta *
@@ -1394,9 +1496,13 @@ rta_dump_all(void)

  RTA_LOCK;

  debug("Route attribute cache (%d entries, rehash at %d):\n", rta_cache_count, rta_cache_limit);
  for(h=0; h<rta_cache_size; h++)
    for(a=rta_hash_table[h]; a; a=a->next)
  struct rta_cache *c = atomic_load_explicit(&rta_cache, memory_order_acquire);

  debug("Route attribute cache (%d entries, rehash at %d):\n", c->count, c->limit);
  for(h=0; h<c->size; h++)
    for(a = atomic_load_explicit(&c->table[h], memory_order_acquire);
	a;
	a = atomic_load_explicit(&a->next, memory_order_acquire))
      {
	debug("%p ", a);
	rta_dump(a);
@@ -1440,7 +1546,7 @@ rta_init(void)
  nexthop_slab_[2] = sl_new(rta_pool, sizeof(struct nexthop) + sizeof(u32)*2);
  nexthop_slab_[3] = sl_new(rta_pool, sizeof(struct nexthop) + sizeof(u32)*MPLS_MAX_LABEL_STACK);

  rta_alloc_hash();
  atomic_store_explicit(&rta_cache, rta_alloc_hash(32), memory_order_relaxed);
  rte_src_init();
}