Commit 3e8c7dd9 authored by Maria Matejka's avatar Maria Matejka
Browse files

TMP

parent 1f8c8859
Loading
Loading
Loading
Loading
+54 −17
Original line number Original line Diff line number Diff line
@@ -26,9 +26,12 @@


#define CQ_PTR_COUNT(buf_) (sizeof((buf_)->ptr) / sizeof((buf_)->ptr[0]))
#define CQ_PTR_COUNT(buf_) (sizeof((buf_)->ptr) / sizeof((buf_)->ptr[0]))
#define CQ_PTRN(buf_, ptrn_) (&(buf_)->ptr[ptrn_])
#define CQ_PTRN(buf_, ptrn_) (&(buf_)->ptr[ptrn_])
#define CQ_PTRN_PREV(buf_, ptrn_) CQ_PTRN((buf_), ((ptrn_) + CQ_PTR_COUNT((buf_)) - 1) % CQ_PTR_COUNT((buf_)))


#define CQ_BUF_SIZE(buf_) (sizeof((buf_)->buffer) / sizeof((buf_)->buffer[0]))
#define CQ_BUF_SIZE(buf_) (sizeof((buf_)->buffer) / sizeof((buf_)->buffer[0]))


#define CQ_ITEM(buf_, id_) (&(buf_)->buffer[(id_) % CQ_BUF_SIZE(buf_)])

#define CQ_INIT(buf_, pool_) do { \
#define CQ_INIT(buf_, pool_) do { \
  memset((buf_)->buffer, 0, sizeof((buf_)->buffer)); \
  memset((buf_)->buffer, 0, sizeof((buf_)->buffer)); \
  for (uint i_=0; i_<CQ_PTR_COUNT((buf_)); i_++) \
  for (uint i_=0; i_<CQ_PTR_COUNT((buf_)); i_++) \
@@ -43,32 +46,66 @@
  } \
  } \
} while (0)
} while (0)


#define CQ_RELEASE(buf_, ptrn_, id_, released_) do { \
#define CQ_PENDING_RELEASE_BIT	(((u64) 1) << 63)
  released_ = 0; \

#define CQ_ACQUIRE_TRY(buf_, ptrn_) ({ \
  /* Get local acquire index and adjacent release index */ \
  u64 acquire_ = atomic_load_explicit(&(CQ_PTRN((buf_), (ptrn_))->acquire), memory_order_acquire); \
  u64 adj_rel_ = 0; \
  u64 out_ = ~((u64) 0); \
  while (1) { \
    /* Outdated? */ \
    if (acquire_ > (adj_rel_ & ~CQ_PENDING_RELEASE_BIT)) \
      adj_rel_ = atomic_load_explicit(&(CQ_PTRN_PREV((buf_), (ptrn_))->release), memory_order_acquire); \
    /* Congested? */ \
    if ((acquire_ == (adj_rel_ & ~CQ_PENDING_RELEASE_BIT)) && \
	atomic_compare_exchange_strong_explicit( \
	  &(CQ_PTRN_PREV((buf_), (ptrn_))->release), &adj_rel_, adj_rel_ | CQ_PENDING_RELEASE_BIT, \
	  memory_order_acquire, memory_order_acq_rel)) \
	  break; \
    /* Try to acquire */ \
    if (atomic_compare_exchange_strong_explicit( \
	  &(CQ_PTRN((buf_), (ptrn_))->acquire), &acquire_, acquire_ + 1, \
	  memory_order_acquire, memory_order_acq_rel)) { \
	  out_ = acquire_; \
	  break; \
	  } \
  } \
  out_; \
})

#define CQ_RELEASE(buf_, ptrn_, id_) do { \
  /* Indicate that we're done */ \
  /* Indicate that we're done */ \
  atomic_fetch_or_explicit(&(CQ_PTRN((buf_), (ptrn_))->mask[((id_) % CQ_BUF_SIZE((buf_))) / 64]), (((u64) 1) << ((id_) % 64)), memory_order_acq_rel); \
  atomic_fetch_or_explicit(&(CQ_PTRN((buf_), (ptrn_))->mask[((id_) % CQ_BUF_SIZE((buf_))) / 64]), (((u64) 1) << ((id_) % 64)), memory_order_acq_rel); \
  /* First, check the release index */ \
  while (1) { \
  for (u64 release_; (release_ = atomic_load_explicit(&(CQ_PTRN((buf_), (ptrn_))->release), memory_order_acq_rel)) <= (id_); ) { \
    /* First, get the release index */ \
    u64 release_ = atomic_load_explicit(&(CQ_PTRN((buf_), (ptrn_))->release), memory_order_acquire); \
    /* Get the first bits from mask */ \
    /* Get the first bits from mask */ \
    u64 mask_ = atomic_load_explicit(&(CQ_PTRN((buf_), (ptrn_))->mask[(release_ % CQ_BUF_SIZE((buf_))) / 64]), memory_order_acquire) >> (release_ % 64); \
    u64 mask_ = atomic_load_explicit(&(CQ_PTRN((buf_), (ptrn_))->mask[(release_ % CQ_BUF_SIZE((buf_))) / 64]), memory_order_acquire); \
    /* How many consecutive bits from release index up */ \
    /* How many consecutive bits from release index up */ \
    u64 consec = (~mask_) ? u64_log2(mask_ ^ (mask_ + 1)) : 64; \
    u64 consec_ = mask_ >> (release_ % 64); \
    if (!consec) \
    consec_ = (~consec_) ? u64_log2(consec_ ^ (consec_ + 1)) : 64; \
      break; \
    /* No bits to release */ \
    /* Release what is to be released */ \
    if (!consec_) break; \
    u64 unmask_ = (~mask_) ? : ~(((mask_ ^ (mask_ + 1)) >> 1) << (release_ % 64)); \
    /* Unmask these indices or retry */ \
    u64 unmask_ = (consec_ == 64) ? ~((u64) 0) : (((((u64) 1) << consec_) - 1) << (release_ % 64)); \
    if (!atomic_compare_exchange_strong_explicit( \
    if (!atomic_compare_exchange_strong_explicit( \
	  &(CQ_PTRN((buf_), (ptrn_))->mask[(release_ % CQ_BUF_SIZE((buf_))) / 64]), \
	  &(CQ_PTRN((buf_), (ptrn_))->mask[(release_ % CQ_BUF_SIZE((buf_))) / 64]), \
	  &mask_, mask_ & unmask_, \
	  &mask_, mask_ & ~unmask_, \
	  memory_order_acquire, memory_order_acq_rel)) \
	  memory_order_acquire, memory_order_acq_rel)) \
      continue; \
      continue; \
    u64 rel_tmp_ = release_; \
    if (!atomic_compare_exchange_strong_explicit( \
    if (!atomic_compare_exchange_strong_explicit( \
	  &(CQ_PTRN((buf_), (ptrn_))->release), &release_, release_ + consec, \
	  &(CQ_PTRN((buf_), (ptrn_))->release), &rel_tmp_, release_ + consec_, \
	  memory_order_acquire, memory_order_acq_rel)) \
	  memory_order_acquire, memory_order_acq_rel) && \
	((rel_tmp_ != (release_ | CQ_PENDING_RELEASE_BIT)) || \
	  !atomic_compare_exchange_strong_explicit( \
	    &(CQ_PTRN((buf_), (ptrn_))->release), &rel_tmp_, release_ + consec_, \
	    memory_order_acquire, memory_order_acq_rel))) \
	bug("Invalid release value"); \
	bug("Invalid release value"); \
    released_ += consec; \
    if (!(rel_tmp_ & CQ_PENDING_RELEASE_BIT)) continue; \
  } \
    do
} while (0)


#define CQ_RELEASE_DONE() while (0); } } while (0)


#endif
#endif
+9 −9
Original line number Original line Diff line number Diff line
@@ -1409,15 +1409,14 @@ rte_finish_update_schedule(struct rte_update_data *rud, u64 id)
  struct channel *c = rud->channel;
  struct channel *c = rud->channel;
  debug("rte_finish_update_schedule(%p)\n", rud);
  debug("rte_finish_update_schedule(%p)\n", rud);


  uint released = 0;
  CQ_RELEASE(&(c->import_queue), CIQ_FILTER, id)
  CQ_RELEASE(&(c->import_queue), CIQ_FILTER, id, released);
  {
  if (!released)
    return;
 
    /* Prepare the table-insert task if needed */
    /* Prepare the table-insert task if needed */
    task_init(&(c->import_finish_task), TF_EXCLUSIVE | TF_IDEMPOTENT, rud->channel->table->domain, rte_finish_update_hook);
    task_init(&(c->import_finish_task), TF_EXCLUSIVE | TF_IDEMPOTENT, rud->channel->table->domain, rte_finish_update_hook);
    task_push(&(c->import_finish_task));
    task_push(&(c->import_finish_task));
  }
  }
  CQ_RELEASE_DONE();
}


static void
static void
rte_do_update(struct task *task)
rte_do_update(struct task *task)
@@ -1425,11 +1424,12 @@ rte_do_update(struct task *task)
  struct rte_update_task *rut = SKIP_BACK(struct rte_update_task, task, task);
  struct rte_update_task *rut = SKIP_BACK(struct rte_update_task, task, task);
  struct channel *c = rut->channel;
  struct channel *c = rut->channel;


  u64 id = CQ_ACQUIRE_TRY(&(c->import_queue));
  u64 id = CQ_ACQUIRE_TRY(&(c->import_queue), CIQ_FILTER);


  if (!~id)
  if (!~id)
    return;
    return;


  struct rte_update_data *rud = CQ_ITEM(&(c->import_queue), id);


  rud_state_change(rud, RUS_PENDING_UPDATE, RUS_UPDATING);
  rud_state_change(rud, RUS_PENDING_UPDATE, RUS_UPDATING);