Commit 94808893 authored by Maria Matejka's avatar Maria Matejka
Browse files

Coroutines cancellation is now synchronous

Asynchronous cancellation led to a callback hell and lasagna code.
It is simpler and clearer to require the coroutines to be cancellable on
the appropriate domain entry than to postpone object cleanup after all
the events finish.
parent 3e95b06b
Loading
Loading
Loading
Loading
+10 −8
Original line number Diff line number Diff line
@@ -171,6 +171,7 @@ void debug(const char *msg, ...); /* Printf to debug output */
#ifdef DEBUGGING
# define ASSERT(x) ASSERT_DIE(x)
# define ASSUME(x) ASSERT_DIE(x)
# define BUG_WARN(...) bug(__VA_ARGS__)
# ifdef ENABLE_EXPENSIVE_CHECKS
#   undef EXPENSIVE_CHECK
#   define EXPENSIVE_CHECK(x) ASSERT_DIE(x)
@@ -178,6 +179,7 @@ void debug(const char *msg, ...); /* Printf to debug output */
#else
# define ASSERT(x) do { if (!(x)) log(L_BUG "Assertion '%s' failed at %s:%d", #x, __FILE__, __LINE__); } while(0)
# define ASSUME(x) /* intentionally left blank */
# define BUG_WARN(...) log(L_BUG __VA_ARGS__)
#endif


+12 −1
Original line number Diff line number Diff line
@@ -28,7 +28,18 @@ static void
ev_free(resource *r)
{
  event *e = (event *) r;
  ev_cancel(e);
  switch (ev_cancel(e, 1))
  {
    case EV_CANCEL_STOPPED:
      BUG_WARN("Stopping event %p (%s inited at %s:%u) from ev_free() called by another coroutine", e, e->name, e->file, e->line);
      break;
    case EV_CANCEL_SELF:
      BUG_WARN("Stopping event %p (%s inited at %s:%u) from ev_free() called by self (!!)", e, e->name, e->file, e->line);
      break;
    case EV_CANCEL_NONE:
      /* Everything is ok! */
      break;
  }
}

static void ev_dump_res(resource *r)
+29 −18
Original line number Diff line number Diff line
@@ -34,13 +34,10 @@ event *ev_new(pool *);

/* Initialize an event; run only if event is inactive. */
#define ev_setup(e, _hook, _data) ({ \
    EVENT_LOCKED_INIT_LOCK((e)); \
    EVENT_LOCKED { \
      AUTO_TYPE eu = UNLOCKED_STRUCT(event_state, e); \
      ASSERT_DIE(eu->coro == NULL); \
      eu->hook = _hook; \
      eu->data = _data; \
    } \
    EVENT_LOCKED_INIT((e), \
	.hook = _hook, \
	.data = _data, \
	); \
    (e)->name = #_hook; \
    (e)->file = __FILE__; \
    (e)->line = __LINE__; \
@@ -63,7 +60,7 @@ event *ev_new(pool *);
    ev_setup_unlocked(e, hook, data); \
    e; })

/* Schedule the event */
/* Schedule the event. */
#ifdef DEBUGGING
void ev_schedule_(event *, const char *, const char *, uint);
#define ev_schedule(e) ev_schedule_(e, #e, __FILE__, __LINE__)
@@ -71,16 +68,30 @@ void ev_schedule_(event *, const char *, const char *, uint);
void ev_schedule(event *);
#endif

/* Cancel an event. Returns 1 if there was an active event running. */
_Bool ev_cancel(event *);

/* Suspend and wait for current locks.
 * This is an explicit cancellation point. */
void ev_suspend(void);

/* Cancellation point check */
_Bool ev_get_cancelled(void);
NORET void ev_exit(void);
/* Cancel an event. Set @allow_self=1 to allow self cancellation.
 * Blocks until the event has stopped.
 *
 * You may not cancel every event around there. To be on the safe side,
 * you should:
 *
 * (1) have the event owner locked AND
 * (2) explicitly allow cancellation in the event implementation AND
 * (3) never allocate or free other events from any cancellable event.
 *
 * The cancellation is implemented as domain lock failure.
 * When implementing the cancellable event, you MUST use
 * LOCKED_DO ( cleanup ) when acquiring a domain lock from an unlocked context.
 * These domains are called cancellation-critical.
 * The cancellation requestor MUST ensure that the target event has no of
 * the cancellation-critical domains locked.
 *
 * For more info on the event model, see the documentation.
 * */
enum ev_cancel_result {
  EV_CANCEL_NONE = 0,	  /* The event was not scheduled/running */
  EV_CANCEL_STOPPED = 1,  /* The event has stopped */
  EV_CANCEL_SELF = 2,	  /* Cancelling self, be careful */
} ev_cancel(event *, _Bool allow_self);

/* Dump event info on debug console */
void ev_dump(event *r);
+10 −19
Original line number Diff line number Diff line
@@ -31,7 +31,7 @@ extern _Thread_local struct lock_order locking_stack;
extern _Thread_local struct domain_generic **last_locked;

/* Internal for locking */
void do_lock(struct domain_generic *dg, struct domain_generic **lsp);
USE_RESULT _Bool do_lock(struct domain_generic *dg, struct domain_generic **lsp);
void do_unlock(struct domain_generic *dg, struct domain_generic **lsp);

#define DOMAIN(type) struct domain__##type
@@ -57,15 +57,17 @@ void domain_free_after_unlock(struct domain_generic *dg);
#define SUPER_LOCK(type)  ({ ASSERT_DIE(IS_LOCKED(type)); (DOMAIN(type)) { .type = locking_stack.type }; })

/* Uncoupled lock/unlock, don't use directly */
#define LOCK_DOMAIN(type, d)	LOCKED(type) = (do_lock(((d).type), &(locking_stack.type)), (d))
#define LOCK_DOMAIN(type, d)	LOCKED(type) = (do_lock(((d).type), &(locking_stack.type)) ? (d) : (DOMAIN(type)) {})
#define UNLOCK_DOMAIN(type, d)  do_unlock(((d).type), &(locking_stack.type))

/* Do something in a locked context */
#define LOCKED_DO(type, d) for ( \
/* Do something in a locked context; run cleanup if unsuccessful */
#define LOCKED_DO(type, d, cleanup) for ( \
    LOCK_DOMAIN(type, d), _bird_aux = (d); \
    _bird_aux.type ? ((_bird_aux.type = NULL), 1) : 0; \
    CURRENT_LOCK.type ? (_bird_aux.type ? ((_bird_aux.type = NULL), 1) : 0) : ((cleanup), 0); \
    UNLOCK_DOMAIN(type, d))

#define LOCKED_DO_NOFAIL(type, d) LOCKED_DO(type, d, bug("Invalid cancellation at %s:%d", __FILE__, __LINE__))

/* Part of struct that should be accessed only with a locked lock */
#define LOCKED_STRUCT(lock_type, ...) struct { \
  __VA_ARGS__ \
@@ -119,9 +121,8 @@ void domain_free_after_unlock(struct domain_generic *dg);
DEFINE_DOMAIN(event_state);
extern DOMAIN(event_state) event_state_domain;

#define EVENT_LOCKED LOCKED_DO(event_state, event_state_domain)
#define EVENT_LOCKED_GET(str, var)  LOCKED_GET(event_state, str, event_state_domain, var)
#define EVENT_LOCKED_SET(str, var, val)  LOCKED_SET(event_state, str, event_state_domain, var, val)
#define EVENT_LOCKED(cancel) LOCKED_DO(event_state, event_state_domain, cancel)
#define EVENT_LOCKED_NOFAIL LOCKED_DO_NOFAIL(event_state, event_state_domain)
#define EVENT_LOCKED_INIT(str, ...) LOCKED_STRUCT_INIT(event_state, str, event_state_domain, __VA_ARGS__)
#define EVENT_LOCKED_INIT_LOCK(str) LOCKED_STRUCT_INIT_LOCK(event_state, str, event_state_domain)

@@ -132,17 +133,7 @@ extern DOMAIN(the_bird) the_bird_domain;
#define the_bird_lock()		do_lock(the_bird_domain.the_bird, &locking_stack.the_bird)
#define the_bird_unlock()	do_unlock(the_bird_domain.the_bird, &locking_stack.the_bird)

#define THE_BIRD_LOCKED for ( \
    UNUSED LOCK_DOMAIN(the_bird, the_bird_domain), *_bird_aux = &the_bird_domain; \
    _bird_aux ? ((_bird_aux = NULL), 1) : 0; \
    UNLOCK_DOMAIN(the_bird, the_bird_domain))

#define THE_BIRD_LOCKED_RETURN(expr) return ({ \
    UNUSED LOCK_DOMAIN(the_bird, the_bird_domain); \
    AUTO_TYPE _ret = expr; \
    UNLOCK_DOMAIN(the_bird, the_bird_domain); \
    _ret; \
    })
#define THE_BIRD_LOCKED(cleanup)  LOCKED_DO(the_bird, the_bird_domain, cleanup)

#define assert_bird_lock() ASSERT_DIE(SUPER_LOCK(the_bird).the_bird == the_bird_domain.the_bird)

+19 −22
Original line number Diff line number Diff line
@@ -49,6 +49,7 @@ typedef struct birdsock {
  resource r;
  pool *pool;				/* Pool where incoming connections should be allocated (for SK_xxx_PASSIVE) */
  struct proto *owner;			/* Protocol which this socket belongs to; NULL for BIRD-wide sockets */
  const struct sock_class *class;	/* Socket hook set */
  int type;				/* Socket type */
  int subtype;				/* Socket subtype */
  void *data;				/* User data */
@@ -64,20 +65,10 @@ typedef struct birdsock {

  /* To be locked by the real owner instead */
  LOCKED_STRUCT(event_state, 
      /* rx_hook: On stream sockets (TCP, UNIX) returns number of processed bytes. Otherwise ignored. */
      uint (*rx_hook)(struct birdsock *, byte *buf, uint size);
      void (*rx_err)(struct birdsock *, int); /* errno or zero if EOF */

      _Bool (*tx_hook)(struct birdsock *);    /* returns 1 to call again */
      void (*tx_err)(struct birdsock *, int); /* errno or zero if EOF */

      void (*cli_info)(LOCKED(event_state), struct birdsock *, char *buf, uint len);	/* Write CLI info to the buf */

      list tx_chain;
      list tx_chain, used_tx_bufs;
      struct coro_sock *rx_coro, *tx_coro;
      uint rbsize;			/* May be changed ONLY with RX stopped or from RX hook inside */
      uint rbsize;			/* Use sk_set_rbsize() to set when RX is running */
      _Bool tx_active;			/* Set when somebody is trying to TX directly */
      _Bool closing;			/* Set when the socket is closing */
      );

  /* Information about received datagrams (UDP, RAW), valid in rx_hook */
@@ -95,6 +86,17 @@ typedef struct birdsock {
  struct ssh_sock *ssh;			/* Used in SK_SSH */
} sock;

struct sock_class {
  /* rx_hook: On stream sockets (TCP, UNIX) returns number of processed bytes. Otherwise ignored. */
  uint (*rx_hook)(struct birdsock *, byte *buf, uint size);
  void (*rx_err)(struct birdsock *, int); /* errno or zero if EOF */

  _Bool (*tx_hook)(struct birdsock *);    /* returns 1 to call again */
  void (*tx_err)(struct birdsock *, int); /* errno or zero if EOF */

  void (*cli_info)(struct birdsock *, char *buf, uint len);	/* Write CLI info to the buf */
};

sock *sock_new(pool *);			/* Allocate new socket */
#define sk_new(X) sock_new(X)		/* Wrapper to avoid name collision with OpenSSL */

@@ -110,22 +112,17 @@ void sk_cancel_rx(sock *);
void sk_schedule_tx(sock *);
void sk_cancel_tx(sock *);

/* Close and free the socket (asynchronous) */
void sk_close(sock *);
/* Close and free the socket (synchronously).
 * Do not call from a cancellable routine.
 * Returns 1 if cancelling self. */
_Bool sk_close(sock *, _Bool allow_self);

/* Resize read buffer */
/* Resize read buffer. Do not call from a cancellable routine. */
void sk_set_rbsize(sock *, uint);

int sk_is_ipv4(sock *s);		/* True if socket is IPv4 */
int sk_is_ipv6(sock *s);		/* True if socket is IPv6 */

#if 0
static inline _Bool sk_tx_buffer_empty(sock *sk)
{
  return !EVENT_LOCKED_GET(sk, tx_active);
}
#endif

int sk_setup_multicast(sock *s);	/* Prepare UDP or IP socket for multicasting */
int sk_join_group(sock *s, ip_addr maddr);	/* Join multicast group on sk iface */
int sk_leave_group(sock *s, ip_addr maddr);	/* Leave multicast group on sk iface */
Loading