Commit 5d4325be authored by Maria Matejka's avatar Maria Matejka
Browse files

TMP

parent 1d0f75e2
Loading
Loading
Loading
Loading

lib/cq.h

0 → 100644
+45 −0
Original line number Diff line number Diff line
/*
 *	BIRD Library -- Atomic pointer-based circular buffer
 *
 *	(c) 2019 Maria Matejka <mq@ucw.cz>
 *
 *	Can be freely distributed and used under the terms of the GNU GPL.
 */

#ifndef _BIRD_CQ_H_
#define _BIRD_CQ_H_

#include "lib/atomic.h"
#include "lib/worker.h"

#define CQ_PTR(size_) struct { \
  _Atomic u64 acquire, release; \
  _Atomic u64 mask[(size_ + 63) / 64]; \
}

#define CQ_N(type_, size_, ptrs_) struct { \
  type_ buffer[size_]; \
  CQ_PTR(size_) ptr[ptrs_]; \
}

#define CQ(type_, size_) CQ_N(type_, size_, 2)

#define CQ_PTR_COUNT(buf_) sizeof((buf_)->ptr) / sizeof((buf_)->ptr[0])

#define CQ_INIT(buf_, pool_) do { \
  memset((buf_)->buffer, 0, sizeof((buf_)->buffer)); \
  for (uint i_=0; i_<CQ_PTR_COUNT((buf_)); i_++) \
    (buf_)->ptr[i_].acquire = (buf_)->ptr[i_].release = ATOMIC_VAR_INIT(0); \
} while (0)

#define CQ_CLEANUP(buf_) do { \
  u64 val = atomic_load(&((buf_)->ptr[0].acquire)); \
  for (uint i_=0; i_<CQ_PTR_COUNT((buf_)); i_++) { \
    ASSERT(val == atomic_load(&((buf_)->ptr[i_].acquire))); \
    ASSERT(val == atomic_load(&((buf_)->ptr[i_].release))); \
  } \
} while (0)

#define CQ_ACQUIRE_TRY(buf_,

#endif
+0 −48
Original line number Diff line number Diff line
@@ -99,52 +99,4 @@ extern _Thread_local u64 worker_id;
  node_->_lsp = NULL; \
} while (0)

/* Atomic pointer-based circular buffer.
 * There is 
 *
 * */

struct cq_ptr {
  _Atomic u64 acquire, release;
  _Atomic uint waiting;
  struct semaphore *sem;
  void (*feed)(struct cq_ptr *);
};

#define CQ_N(type_, size_, ptrs_) struct { \
  type_ buffer[size_]; \
  struct cq_ptr ptr[ptrs_]; \
}

#define CQ(type_, size_) CQ_N(type_, size_, 2)

#define CQ_PTR_COUNT(buf_) sizeof((buf_)->ptr) / sizeof((buf_)->ptr[0])

#define CQ_PTR_INIT(ptr_, pool_) do { \
  (ptr_)->acquire = (ptr_)->release = ATOMIC_VAR_INIT(0); \
  (ptr_)->waiting = ATOMIC_VAR_INIT(0); \
  (ptr_)->sem = semaphore_new(pool_, 0); \
} while (0)

#define CQ_PTR_SET_HANDLER(ptr_, handler_, domain_, exclusive_) task_init((ptr_)->sem, TF_IDEMPOTENT | ((exclusive_) ? TF_EXCLUSIVE : 0), (domain_), (handler_))

#define CQ_INIT(buf_, pool_) do { \
  memset((buf_)->buffer, 0, sizeof((buf_)->buffer)); \
  for (uint i_=0; i_<CQ_PTR_COUNT((buf_)); i_++) \
    CQ_PTR_INIT(&((buf_)->ptr[i_]), pool_); \
} while (0)

#define CQ_CLEANUP(buf_) do { \
  u64 val = atomic_load(&((buf_)->ptr[0].acquire)); \
  for (uint i_=0; i_<CQ_PTR_COUNT((buf_)); i_++) { \
    ASSERT(val == atomic_load(&((buf_)->ptr[i_].acquire))); \
    ASSERT(val == atomic_load(&((buf_)->ptr[i_].release))); \
    ASSERT(0 == atomic_load(&((buf_)->ptr[i_].waiting))); \
    ASSERT(atomic_load(&((buf_)->ptr[i_].task.flags)) & TF_ENQUEUED == 0); \
    rfree((buf_)->ptr[i_].sem); \
  } \
} while (0)

#define CQ_ACQUIRE(buf_, id_

#endif
+1 −2
Original line number Diff line number Diff line
@@ -10,9 +10,8 @@
#define _BIRD_WORKER_H_

#include "lib/birdlib.h"
#include "lib/atomic.h"
#include "lib/locked.h"
#include "lib/resource.h"
#include "lib/atomic.h"

struct config;

+2 −2
Original line number Diff line number Diff line
@@ -168,7 +168,7 @@ proto_add_channel(struct proto *p, struct channel_config *cf)
  c->last_tx_filter_change = current_time();
  c->reloadable = 1;

  CQ_INIT(&c->pending_imports, proto_pool);
  CQ_INIT(&c->import_queue, proto_pool);

  CALL(c->channel->init, c, cf);

@@ -186,7 +186,7 @@ proto_remove_channel(struct proto *p, struct channel *c)

  PD(p, "Channel %s removed", c->name);

  CQ_CLEANUP(&c->pending_imports);
  CQ_CLEANUP(&c->import_queue);

  rem_node(&c->n);
  mb_free(c);
+9 −1
Original line number Diff line number Diff line
@@ -12,6 +12,7 @@
#include "lib/lists.h"
#include "lib/resource.h"
#include "lib/event.h"
#include "lib/cq.h"
#include "nest/route.h"
#include "conf/conf.h"

@@ -538,7 +539,9 @@ struct channel {
  btime last_tx_filter_change;

  /* Circular buffer for pending imports */
  CIRCULAR_QUEUE_N(struct rte_update_data, CHANNEL_QUEUE_SIZE, 3) pending_imports;
  CQ_N(struct rte_update_data, CHANNEL_QUEUE_SIZE, 3) import_queue;

  

  struct rtable *in_table;		/* Internal table for received routes */
  struct event *reload_event;		/* Event responsible for reloading from in_table */
@@ -549,6 +552,11 @@ struct channel {
  struct rtable *out_table;		/* Internal table for exported routes */
};

enum channel_import_queue_ptr {
  CIQ_ORDER = 0,
  CIQ_FILTER = 1,
  CIQ_TABLE = 2,
};

/*
 * Channel states
Loading