Commit 1d0f75e2 authored by Maria Matejka's avatar Maria Matejka
Browse files

TMP

parent 471f9b1a
Loading
Loading
Loading
Loading
+23 −14
Original line number Diff line number Diff line
@@ -103,39 +103,48 @@ extern _Thread_local u64 worker_id;
 * There is 
 *
 * */
#define CIRCULAR_QUEUE_PTR struct { \
  _Atomic u64 acquire, release; \
  _Atomic uint waiting; \
  struct semaphore *sem; \
}

#define CIRCULAR_QUEUE_N(type_, size_, ptrs_) struct { \
struct cq_ptr {
  _Atomic u64 acquire, release;
  _Atomic uint waiting;
  struct semaphore *sem;
  void (*feed)(struct cq_ptr *);
};

#define CQ_N(type_, size_, ptrs_) struct { \
  type_ buffer[size_]; \
  CIRCULAR_QUEUE_PTR ptr[ptrs_]; \
  struct cq_ptr ptr[ptrs_]; \
}

#define CIRCULAR_QUEUE(type_, size_) CIRCULAR_QUEUE_N(type_, size_, 2)
#define CQ(type_, size_) CQ_N(type_, size_, 2)

#define CQ_PTR_COUNT(buf_) sizeof((buf_)->ptr) / sizeof((buf_)->ptr[0])

#define CIRCULAR_QUEUE_PTR_INIT(ptr_, pool_) do { \
#define CQ_PTR_INIT(ptr_, pool_) do { \
  (ptr_)->acquire = (ptr_)->release = ATOMIC_VAR_INIT(0); \
  (ptr_)->waiting = ATOMIC_VAR_INIT(0); \
  (ptr_)->sem = semaphore_new(pool_, 0); \
} while (0)

#define CIRCULAR_QUEUE_INIT(buf_, pool_) do { \
#define CQ_PTR_SET_HANDLER(ptr_, handler_, domain_, exclusive_) task_init((ptr_)->sem, TF_IDEMPOTENT | ((exclusive_) ? TF_EXCLUSIVE : 0), (domain_), (handler_))

#define CQ_INIT(buf_, pool_) do { \
  memset((buf_)->buffer, 0, sizeof((buf_)->buffer)); \
  for (uint i_=0; i_<sizeof((buf_)->ptr) / sizeof((buf_)->ptr[0]); i_++) \
    CIRCULAR_QUEUE_PTR_INIT(&((buf_)->ptr[i_]), pool_); \
  for (uint i_=0; i_<CQ_PTR_COUNT((buf_)); i_++) \
    CQ_PTR_INIT(&((buf_)->ptr[i_]), pool_); \
} while (0)

#define CIRCULAR_QUEUE_CLEANUP(buf_) do { \
#define CQ_CLEANUP(buf_) do { \
  u64 val = atomic_load(&((buf_)->ptr[0].acquire)); \
  for (uint i_=0; i_<sizeof((buf_)->ptr) / sizeof((buf_)->ptr[0]); i_++) { \
  for (uint i_=0; i_<CQ_PTR_COUNT((buf_)); i_++) { \
    ASSERT(val == atomic_load(&((buf_)->ptr[i_].acquire))); \
    ASSERT(val == atomic_load(&((buf_)->ptr[i_].release))); \
    ASSERT(0 == atomic_load(&((buf_)->ptr[i_].waiting))); \
    ASSERT(atomic_load(&((buf_)->ptr[i_].task.flags)) & TF_ENQUEUED == 0); \
    rfree((buf_)->ptr[i_].sem); \
  } \
} while (0)

#define CQ_ACQUIRE(buf_, id_

#endif
+4 −4
Original line number Diff line number Diff line
@@ -43,12 +43,12 @@ enum task_flags {
  TF_PUBLIC_MASK = 0xff,	/* Flags are masked by this value on task push */
  /* These flags are private for worker queue */
  TF_PREPENDED = 0x100,		/* Task is waiting for the first free worker */
  TF_ENQUEUED = 0x200,		/* Task is in queue */
} PACKED;

struct task {
  node n;				/* Init this to zero. */
  enum task_flags flags;		/* Task flags */
  atomic_flag enqueued;			/* Is in queue */
  _Atomic enum task_flags flags;	/* Task flags */
  struct domain *domain;		/* Task's primary domain */
  void (*execute)(struct task *);	/* This will be called to execute the task */
};
@@ -56,11 +56,11 @@ struct task {
/* Always initialize the task by task_init() */
static inline void task_init(struct task *t, enum task_flags tf, struct domain *domain, void (*execute)(struct task *))
{
  ASSERT(t);
  ASSERT(execute);
  *t = (struct task) {
    .n = { },
    .flags = tf & TF_PUBLIC_MASK,
    .enqueued =	ATOMIC_FLAG_INIT,
    .flags = ATOMIC_VAR_INIT(tf & TF_PUBLIC_MASK),
    .domain = domain,
    .execute = execute,
  };
+2 −2
Original line number Diff line number Diff line
@@ -168,7 +168,7 @@ proto_add_channel(struct proto *p, struct channel_config *cf)
  c->last_tx_filter_change = current_time();
  c->reloadable = 1;

  CIRCULAR_QUEUE_INIT(&c->pending_imports, proto_pool);
  CQ_INIT(&c->pending_imports, proto_pool);

  CALL(c->channel->init, c, cf);

@@ -186,7 +186,7 @@ proto_remove_channel(struct proto *p, struct channel *c)

  PD(p, "Channel %s removed", c->name);

  CIRCULAR_QUEUE_CLEANUP(&c->pending_imports);
  CQ_CLEANUP(&c->pending_imports);

  rem_node(&c->n);
  mb_free(c);