Commit 936fa536 authored by Martin Mares's avatar Martin Mares Committed by Maria Matejka
Browse files

A simple experiment with coroutine-based CLI

parent 46044939
Loading
Loading
Loading
Loading
+4 −0
Original line number Diff line number Diff line
@@ -36,6 +36,8 @@ struct ssh_sock {
};
#endif

struct coroutine;

typedef struct birdsock {
  resource r;
  pool *pool;				/* Pool where incoming connections should be allocated (for SK_xxx_PASSIVE) */
@@ -79,6 +81,8 @@ typedef struct birdsock {
  char *password;			/* Password for MD5 authentication */
  const char *err;			/* Error message */
  struct ssh_sock *ssh;			/* Used in SK_SSH */

  struct coroutine *rx_coroutine;
} sock;

sock *sock_new(pool *);			/* Allocate new socket */
+6 −25
Original line number Diff line number Diff line
@@ -67,6 +67,7 @@
#include "nest/cli.h"
#include "conf/conf.h"
#include "lib/string.h"
#include "sysdep/unix/unix.h"	// FIXME

pool *cli_pool;

@@ -252,8 +253,8 @@ cli_cmd_read_hook(byte *buf, uint max, UNUSED int fd)
  return max;
}

static void
cli_command(struct cli *c)
void
cli_command(cli *c)
{
  struct config f;
  int res;
@@ -281,28 +282,15 @@ static void
cli_event(void *data)
{
  cli *c = data;
  int err;

  while (c->ring_read != c->ring_write &&
      c->async_msg_size < CLI_MAX_ASYNC_QUEUE)
    cli_copy_message(c);

  if (c->tx_pos)
    ;
  else if (c->cont)
    c->cont(c);
  else
    {
      err = cli_get_command(c);
      if (!err)
	return;
      if (err < 0)
	cli_printf(c, 9000, "Command too long");
      else
	cli_command(c);
    }

  cli_write_trigger(c);

  if (c->sleeping_on_yield)
    coro_resume(c->coro);
}

cli *
@@ -325,13 +313,6 @@ cli_new(void *priv)
  return c;
}

void
cli_kick(cli *c)
{
  if (!c->cont && !c->tx_pos)
    ev_schedule(c->event);
}

static list cli_log_hooks;
static int cli_log_inited;

+6 −0
Original line number Diff line number Diff line
@@ -25,6 +25,8 @@ struct cli_out {
  byte buf[0];
};

struct coroutine;

typedef struct cli {
  node n;				/* Node in list of all log hooks */
  pool *pool;
@@ -45,6 +47,9 @@ typedef struct cli {
  uint log_mask;			/* Mask of allowed message levels */
  uint log_threshold;			/* When free < log_threshold, store only important messages */
  uint async_msg_size;			/* Total size of async messages queued in tx_buf */
  struct coroutine *coro;
  int sleeping_on_tx;
  int sleeping_on_yield;
} cli;

extern pool *cli_pool;
@@ -66,6 +71,7 @@ void cli_free(cli *);
void cli_kick(cli *);
void cli_written(cli *);
void cli_echo(uint class, byte *msg);
void cli_command(cli *c);

static inline int cli_access_restricted(void)
{
+110 −0
Original line number Diff line number Diff line
@@ -2377,3 +2377,113 @@ test_old_bird(char *path)
    die("I found another BIRD running.");
  close(fd);
}

/* EXPERIMENTAL: Support for coroutines */

#include <ucontext.h>

struct coroutine {
  resource r;
  ucontext_t ctx;
  void *stack;
  void (*entry_point)(void *arg);
  void *arg;
};

static ucontext_t *main_context;
static coroutine *coro_current;	// NULL for main context

static void
coro_free(resource *r)
{
  coroutine *c = (coroutine *) r;
  xfree(c->stack);
}

static void
coro_dump(resource *r UNUSED)
{
  debug("\n");
}

static struct resclass coro_class = {
  .name = "Coroutine",
  .size = sizeof(struct coroutine),
  .free = coro_free,
  .dump = coro_dump,
  // FIXME: Implement memsize
};

static void
coro_do_start(void)
{
  ASSERT(coro_current);
  coro_current->entry_point(coro_current->arg);
  bug("Coroutine returned unexpectedly");
}

struct coroutine *
coro_new(pool *p, void (*entry_point)(void *), void *arg)
{
  if (!main_context)
    {
      main_context = xmalloc(sizeof(*main_context));
      if (getcontext(main_context) < 0)
	bug("getcontext() failed");
    }

  coroutine *c = ralloc(p, &coro_class);
  c->entry_point = entry_point;
  c->arg = arg;
  if (getcontext(&c->ctx) < 0)
    bug("getcontext() failed");
  c->stack = xmalloc(65536);
  c->ctx.uc_stack.ss_sp = c->stack;
  c->ctx.uc_stack.ss_size = 65536;

  makecontext(&c->ctx, coro_do_start, 0);

  return c;
}

// Return to main context
void
coro_suspend(void)
{
  ASSERT(coro_current);
  ASSERT(main_context);
  coroutine *c = coro_current;
  coro_current = NULL;
  swapcontext(&c->ctx, main_context);
  ASSERT(coro_current == c);
}

// Resume context
void
coro_resume(coroutine *c)
{
  ASSERT(!coro_current);
  coro_current = c;
  swapcontext(main_context, &c->ctx);
  ASSERT(!coro_current);
}

static int
coro_sk_rx_hook(sock *sk, uint size UNUSED)
{
  ASSERT(sk->rx_coroutine);
  ASSERT(!coro_current);
  coro_resume(sk->rx_coroutine);
  return 0;
}

int
coro_sk_read(sock *s)
{
  ASSERT(coro_current);
  s->rx_coroutine = coro_current;
  s->rx_hook = coro_sk_rx_hook;
  coro_suspend();
  s->rx_hook = NULL;
  return s->rpos - s->rbuf;
}
+82 −34
Original line number Diff line number Diff line
@@ -393,6 +393,9 @@ cli_write(cli *c)
  /* Everything is written */
  s->tbuf = NULL;
  cli_written(c);

  if (c->sleeping_on_tx)
    coro_resume(c->coro);
}

void
@@ -410,53 +413,97 @@ cli_tx(sock *s)
  cli_write(s->data);
}

int
cli_get_command(cli *c)
{
  sock *s = c->priv;
  byte *t = c->rx_aux ? : s->rbuf;
  byte *tend = s->rpos;
  byte *d = c->rx_pos;
  byte *dend = c->rx_buf + CLI_RX_BUF_SIZE - 2;

  while (t < tend)
static void
cli_err(sock *s, int err)
{
      if (*t == '\r')
	t++;
      else if (*t == '\n')
  if (config->cli_debug)
    {
	  t++;
	  c->rx_pos = c->rx_buf;
	  c->rx_aux = t;
	  *d = 0;
	  return (d < dend) ? 1 : -1;
      if (err)
	log(L_INFO "CLI connection dropped: %s", strerror(err));
      else
	log(L_INFO "CLI connection closed");
    }
      else if (d < dend)
	*d++ = *t++;
  cli_free(s->data);
}

static int
cli_getchar(cli *c)
{
  sock *s = c->priv;

  if (c->rx_aux == s->rpos)
    {
      log(L_INFO "CLI wants to read");
      c->rx_aux = s->rpos = s->rbuf;
  c->rx_pos = d;
  return 0;
      int n = coro_sk_read(s);
      log(L_INFO "CLI read %d bytes", n);
      ASSERT(n);
    }
  return *c->rx_aux++;
}

static int
cli_rx(sock *s, uint size UNUSED)
static void
cli_yield(cli *c)
{
  cli_kick(s->data);
  return 0;
  log(L_INFO "CLI: Yield");
  c->sleeping_on_yield = 1;
  ev_schedule(c->event);
  coro_suspend();
  c->sleeping_on_yield = 0;
  log(L_INFO "CLI: Resumed after yield");
}

static void
cli_err(sock *s, int err)
cli_coroutine(void *_c)
{
  if (config->cli_debug)
  cli *c = _c;
  sock *s = c->priv;

  log(L_INFO "CLI coroutine reached");
  c->rx_aux = s->rbuf;
  for (;;)
    {
      if (err)
	log(L_INFO "CLI connection dropped: %s", strerror(err));
      else
	log(L_INFO "CLI connection closed");
      while (c->tx_pos)
	{
	  log(L_INFO "CLI sleeps on write");
	  c->sleeping_on_tx = 1;
	  coro_suspend();
	  c->sleeping_on_tx = 0;
	  log(L_INFO "CLI wakeup on write");
	}

      if (c->cont)
	{
	  c->cont(c);
	  cli_write_trigger(c);
	  cli_yield(c);
	  continue;
	}

      byte *d = c->rx_buf;
      byte *dend = c->rx_buf + CLI_RX_BUF_SIZE - 2;
      for (;;)
	{
	  int ch = cli_getchar(c);
	  if (ch == '\r')
	    ;
	  else if (ch == '\n')
	    break;
	  else if (d < dend)
	    *d++ = ch;
	}

      if (d >= dend)
	{
	  cli_printf(c, 9000, "Command too long");
	  cli_write_trigger(c);
	  continue;
	}

      *d = 0;
      cli_command(c);
      cli_write_trigger(c);
    }
  cli_free(s->data);
}

static int
@@ -466,7 +513,6 @@ cli_connect(sock *s, uint size UNUSED)

  if (config->cli_debug)
    log(L_INFO "CLI connect");
  s->rx_hook = cli_rx;
  s->tx_hook = cli_tx;
  s->err_hook = cli_err;
  s->data = c = cli_new(s);
@@ -475,6 +521,8 @@ cli_connect(sock *s, uint size UNUSED)
  c->rx_pos = c->rx_buf;
  c->rx_aux = NULL;
  rmove(s, c->pool);
  c->coro = coro_new(c->pool, cli_coroutine, c);
  coro_resume(c->coro);
  return 1;
}

Loading