Commit 96fad753 authored by Martin Mares's avatar Martin Mares Committed by Maria Matejka
Browse files

Rewritten CLI based on coroutines

I also moved the boundary between generic parts of the CLI and sysdep
code: the generic parts now assume that CLI runs over a socket, but
the actual creation of the socket is still kept in sysdep.

The documentation does not reflect these changes yet.
parent ae2be13a
Loading
Loading
Loading
Loading
+249 −75
Original line number Diff line number Diff line
/*
 *	BIRD Internet Routing Daemon -- Command-Line Interface
 *
 *	(c) 1999--2000 Martin Mares <mj@ucw.cz>
 *	(c) 1999--2017 Martin Mares <mj@ucw.cz>
 *
 *	Can be freely distributed and used under the terms of the GNU GPL.
 */
@@ -63,6 +63,8 @@
 *
 */

#undef LOCAL_DEBUG

#include "nest/bird.h"
#include "nest/cli.h"
#include "conf/conf.h"
@@ -71,6 +73,13 @@

pool *cli_pool;

/* Hack for scheduled undo notification */
extern cli *cmd_reconfig_stored_cli;

/*
 *	Output buffering
 */

static byte *
cli_alloc_out(cli *c, int size)
{
@@ -223,95 +232,60 @@ cli_free_out(cli *c)
  c->async_msg_size = 0;
}

void
cli_written(cli *c)
static void
cli_write(cli *c)
{
  cli_free_out(c);
  ev_schedule(c->event);
}
  sock *s = c->socket;

  while (c->tx_pos)
    {
      struct cli_out *o = c->tx_pos;

static byte *cli_rh_pos;
static uint cli_rh_len;
static int cli_rh_trick_flag;
struct cli *this_cli;
      int len = o->wpos - o->outpos;
      s->tbuf = o->outpos;
      o->outpos = o->wpos;

static int
cli_cmd_read_hook(byte *buf, uint max, UNUSED int fd)
{
  if (!cli_rh_trick_flag)
    {
      cli_rh_trick_flag = 1;
      buf[0] = '!';
      return 1;
      if (sk_send(s, len) <= 0)
	return;

      c->tx_pos = o->next;
    }
  if (max > cli_rh_len)
    max = cli_rh_len;
  memcpy(buf, cli_rh_pos, max);
  cli_rh_pos += max;
  cli_rh_len -= max;
  return max;

  /* Everything is written */
  s->tbuf = NULL;
  cli_free_out(c);
  ev_schedule(c->event);
}

void
cli_command(cli *c)
cli_write_trigger(cli *c)
{
  struct config f;
  int res;

  if (config->cli_debug > 1)
    log(L_TRACE "CLI: %s", c->rx_buf);
  bzero(&f, sizeof(f));
  f.mem = c->parser_pool;
  f.pool = rp_new(c->pool, "Config");
  init_list(&f.symbols);
  cf_read_hook = cli_cmd_read_hook;
  cli_rh_pos = c->rx_buf;
  cli_rh_len = strlen(c->rx_buf);
  cli_rh_trick_flag = 0;
  this_cli = c;
  lp_flush(c->parser_pool);
  res = cli_parse(&f);
  if (!res)
    cli_printf(c, 9001, f.err_msg);

  config_free(&f);
  if (c->tx_pos && c->socket->tbuf == NULL)
    cli_write(c);
}

static void
cli_event(void *data)
cli_tx_hook(sock *s)
{
  cli *c = data;

  while (c->ring_read != c->ring_write &&
      c->async_msg_size < CLI_MAX_ASYNC_QUEUE)
    cli_copy_message(c);

  cli_write_trigger(c);

  if (c->sleeping_on_yield)
    coro_resume(c->coro);
  cli_write(s->data);
}

cli *
cli_new(void *priv)
static void
cli_err_hook(sock *s, int err)
{
  pool *p = rp_new(cli_pool, "CLI");
  cli *c = mb_alloc(p, sizeof(cli));

  bzero(c, sizeof(cli));
  c->pool = p;
  c->priv = priv;
  c->event = ev_new(p);
  c->event->hook = cli_event;
  c->event->data = c;
  c->cont = cli_hello;
  c->parser_pool = lp_new_default(c->pool);
  c->show_pool = lp_new_default(c->pool);
  c->rx_buf = mb_alloc(c->pool, CLI_RX_BUF_SIZE);
  ev_schedule(c->event);
  return c;
  if (config->cli_debug)
    {
      if (err)
	log(L_INFO "CLI connection dropped: %s", strerror(err));
      else
	log(L_INFO "CLI connection closed");
    }
  cli_free(s->data);
}

/*
 *	Echoing of asynchronous messages
 */

static list cli_log_hooks;
static int cli_log_inited;
@@ -383,12 +357,212 @@ cli_echo(uint class, byte *msg)
    }
}

/* Hack for scheduled undo notification */
extern cli *cmd_reconfig_stored_cli;
/*
 *	Reading of input
 */

static int
cli_getchar(cli *c)
{
  sock *s = c->socket;

  if (c->rx_aux == s->rpos)
    {
      DBG("CLI: Waiting on read\n");
      c->rx_aux = s->rpos = s->rbuf;
      c->state = CLI_STATE_WAIT_RX;
      int n = coro_sk_read(s);
      c->state = CLI_STATE_RUN;
      DBG("CLI: Read returned %d bytes\n", n);
      ASSERT(n);
    }
  return *c->rx_aux++;
}

static int
cli_read_line(cli *c)
{
  byte *d = c->rx_buf;
  byte *dend = c->rx_buf + CLI_RX_BUF_SIZE - 2;
  for (;;)
    {
      int ch = cli_getchar(c);
      if (ch == '\r')
	;
      else if (ch == '\n')
	break;
      else if (d < dend)
	*d++ = ch;
    }

  if (d >= dend)
    return 0;

  *d = 0;
  return 1;
}

/*
 *	Execution of commands
 */

static byte *cli_rh_pos;
static uint cli_rh_len;
static int cli_rh_trick_flag;
struct cli *this_cli;

static int
cli_cmd_read_hook(byte *buf, uint max, UNUSED int fd)
{
  if (!cli_rh_trick_flag)
    {
      cli_rh_trick_flag = 1;
      buf[0] = '!';
      return 1;
    }
  if (max > cli_rh_len)
    max = cli_rh_len;
  memcpy(buf, cli_rh_pos, max);
  cli_rh_pos += max;
  cli_rh_len -= max;
  return max;
}

static void
cli_command(cli *c)
{
  struct config f;
  int res;

  if (config->cli_debug > 1)
    log(L_TRACE "CLI: %s", c->rx_buf);
  bzero(&f, sizeof(f));
  f.mem = c->parser_pool;
  f.pool = rp_new(c->pool, "Config");
  init_list(&f.symbols);
  cf_read_hook = cli_cmd_read_hook;
  cli_rh_pos = c->rx_buf;
  cli_rh_len = strlen(c->rx_buf);
  cli_rh_trick_flag = 0;
  this_cli = c;
  lp_flush(c->parser_pool);
  res = cli_parse(&f);
  if (!res)
    cli_printf(c, 9001, f.err_msg);

  config_free(&f);
}

/*
 *	Session control
 */

static void
cli_event(void *data)
{
  cli *c = data;
  DBG("CLI: Event in state %u\n", (int) c->state);

  while (c->ring_read != c->ring_write &&
      c->async_msg_size < CLI_MAX_ASYNC_QUEUE)
    cli_copy_message(c);

  cli_write_trigger(c);

  if (c->state == CLI_STATE_YIELD ||
      c->state == CLI_STATE_WAIT_TX && !c->tx_pos)
    coro_resume(c->coro);
}

void
cli_yield(cli *c)
{
  c->state = CLI_STATE_YIELD;
  DBG("CLI: Yielding\n");
  ev_schedule(c->event);
  coro_suspend();
  c->state = CLI_STATE_RUN;
  DBG("CLI: Yield resumed\n");
}

static void
cli_coroutine(void *_c)
{
  cli *c = _c;
  sock *s = c->socket;

  DBG("CLI: Coroutine started\n");
  c->rx_aux = s->rbuf;

  for (;;)
    {
      while (c->tx_pos)
	{
	  DBG("CLI: Sleeping on write\n");
	  c->state = CLI_STATE_WAIT_TX;
	  coro_suspend();
	  c->state = CLI_STATE_RUN;
	  DBG("CLI: Woke up on write\n");
	}

      if (c->cont)
	{
	  c->cont(c);
	  cli_write_trigger(c);
	  cli_yield(c);
	  continue;
	}

      if (!cli_read_line(c))
	cli_printf(c, 9000, "Command too long");
      else
	cli_command(c);
      cli_write_trigger(c);
    }
}

cli *
cli_new(sock *s)
{
  pool *p = rp_new(cli_pool, "CLI session");
  cli *c = mb_alloc(p, sizeof(cli));
  DBG("CLI: Created new session\n");

  bzero(c, sizeof(cli));
  c->pool = p;
  c->socket = s;
  c->event = ev_new(p);
  c->event->hook = cli_event;
  c->event->data = c;
  c->cont = cli_hello;
  c->parser_pool = lp_new_default(c->pool);
  c->show_pool = lp_new_default(c->pool);
  c->rx_buf = mb_alloc(c->pool, CLI_RX_BUF_SIZE);

  s->pool = c->pool;		/* We need to have all the socket buffers allocated in the cli pool */
  rmove(s, c->pool);
  s->tx_hook = cli_tx_hook;
  s->err_hook = cli_err_hook;
  s->data = c;

  return c;
}

void
cli_run(cli *c)
{
  DBG("CLI: Running\n");
  c->state = CLI_STATE_RUN;
  c->rx_pos = c->rx_buf;
  c->rx_aux = NULL;
  c->coro = coro_new(c->pool, cli_coroutine, c);
  coro_resume(c->coro);
}

void
cli_free(cli *c)
{
  DBG("CLI: Destroying session\n");
  cli_set_log_echo(c, 0, 0);
  if (c->cleanup)
    c->cleanup(c);
+27 −17
Original line number Diff line number Diff line
/*
 *	BIRD Internet Routing Daemon -- Command-Line Interface
 *
 *	(c) 1999--2000 Martin Mares <mj@ucw.cz>
 *	(c) 1999--2017 Martin Mares <mj@ucw.cz>
 *
 *	Can be freely distributed and used under the terms of the GNU GPL.
 */
@@ -10,7 +10,9 @@
#define _BIRD_CLI_H_

#include "lib/resource.h"
#include "lib/coroutine.h"
#include "lib/event.h"
#include "lib/socket.h"

#define CLI_RX_BUF_SIZE 4096
#define CLI_TX_BUF_SIZE 4096
@@ -25,31 +27,44 @@ struct cli_out {
  byte buf[0];
};

struct coroutine;
enum cli_state {
  CLI_STATE_INIT,
  CLI_STATE_RUN,
  CLI_STATE_WAIT_RX,
  CLI_STATE_WAIT_TX,
  CLI_STATE_YIELD,
};

typedef struct cli {
  node n;				/* Node in list of all log hooks */
  pool *pool;
  void *priv;				/* Private to sysdep layer */
  byte *rx_buf, *rx_pos, *rx_aux;	/* sysdep */
  coroutine *coro;
  enum cli_state state;
  int restricted;			/* CLI is restricted to read-only commands */

  /* I/O */
  sock *socket;
  byte *rx_buf, *rx_pos, *rx_aux;
  struct cli_out *tx_buf, *tx_pos, *tx_write;
  event *event;

  /* Continuation mechanism */
  void (*cont)(struct cli *c);
  void (*cleanup)(struct cli *c);
  void *rover;				/* Private to continuation routine */
  int last_reply;
  int restricted;			/* CLI is restricted to read-only commands */

  /* Pools */
  struct linpool *parser_pool;		/* Pool used during parsing */
  struct linpool *show_pool;		/* Pool used during route show */

  /* Asynchronous messages */
  byte *ring_buf;			/* Ring buffer for asynchronous messages */
  byte *ring_end, *ring_read, *ring_write;	/* Pointers to the ring buffer */
  uint ring_overflow;			/* Counter of ring overflows */
  uint log_mask;			/* Mask of allowed message levels */
  uint log_threshold;			/* When free < log_threshold, store only important messages */
  uint async_msg_size;			/* Total size of async messages queued in tx_buf */
  struct coroutine *coro;
  int sleeping_on_tx;
  int sleeping_on_yield;
} cli;

extern pool *cli_pool;
@@ -61,17 +76,17 @@ extern struct cli *this_cli; /* Used during parsing */

void cli_printf(cli *, int, char *, ...);
#define cli_msg(x...) cli_printf(this_cli, x)
void cli_write_trigger(cli *c);
void cli_set_log_echo(cli *, uint mask, uint size);
void cli_yield(cli *c);

/* Functions provided to sysdep layer */

cli *cli_new(void *);
void cli_init(void);
cli *cli_new(sock *s);
void cli_run(cli *);
void cli_free(cli *);
void cli_kick(cli *);
void cli_written(cli *);
void cli_echo(uint class, byte *msg);
void cli_command(cli *c);

static inline int cli_access_restricted(void)
{
@@ -81,9 +96,4 @@ static inline int cli_access_restricted(void)
    return 0;
}

/* Functions provided by sysdep layer */

void cli_write_trigger(cli *);
int cli_get_command(cli *);

#endif
+2 −145
Original line number Diff line number Diff line
@@ -371,142 +371,6 @@ cmd_reconfig_status(void)
static sock *cli_sk;
static char *path_control_socket = PATH_CONTROL_SOCKET;


static void
cli_write(cli *c)
{
  sock *s = c->priv;

  while (c->tx_pos)
    {
      struct cli_out *o = c->tx_pos;

      int len = o->wpos - o->outpos;
      s->tbuf = o->outpos;
      o->outpos = o->wpos;

      if (sk_send(s, len) <= 0)
	return;

      c->tx_pos = o->next;
    }

  /* Everything is written */
  s->tbuf = NULL;
  cli_written(c);

  if (c->sleeping_on_tx)
    coro_resume(c->coro);
}

void
cli_write_trigger(cli *c)
{
  sock *s = c->priv;

  if (s->tbuf == NULL)
    cli_write(c);
}

static void
cli_tx(sock *s)
{
  cli_write(s->data);
}

static void
cli_err(sock *s, int err)
{
  if (config->cli_debug)
    {
      if (err)
	log(L_INFO "CLI connection dropped: %s", strerror(err));
      else
	log(L_INFO "CLI connection closed");
    }
  cli_free(s->data);
}

static int
cli_getchar(cli *c)
{
  sock *s = c->priv;

  if (c->rx_aux == s->rpos)
    {
      log(L_INFO "CLI wants to read");
      c->rx_aux = s->rpos = s->rbuf;
      int n = coro_sk_read(s);
      log(L_INFO "CLI read %d bytes", n);
      ASSERT(n);
    }
  return *c->rx_aux++;
}

static void
cli_yield(cli *c)
{
  log(L_INFO "CLI: Yield");
  c->sleeping_on_yield = 1;
  ev_schedule(c->event);
  coro_suspend();
  c->sleeping_on_yield = 0;
  log(L_INFO "CLI: Resumed after yield");
}

static void
cli_coroutine(void *_c)
{
  cli *c = _c;
  sock *s = c->priv;

  log(L_INFO "CLI coroutine reached");
  c->rx_aux = s->rbuf;
  for (;;)
    {
      while (c->tx_pos)
	{
	  log(L_INFO "CLI sleeps on write");
	  c->sleeping_on_tx = 1;
	  coro_suspend();
	  c->sleeping_on_tx = 0;
	  log(L_INFO "CLI wakeup on write");
	}

      if (c->cont)
	{
	  c->cont(c);
	  cli_write_trigger(c);
	  cli_yield(c);
	  continue;
	}

      byte *d = c->rx_buf;
      byte *dend = c->rx_buf + CLI_RX_BUF_SIZE - 2;
      for (;;)
	{
	  int ch = cli_getchar(c);
	  if (ch == '\r')
	    ;
	  else if (ch == '\n')
	    break;
	  else if (d < dend)
	    *d++ = ch;
	}

      if (d >= dend)
	{
	  cli_printf(c, 9000, "Command too long");
	  cli_write_trigger(c);
	  continue;
	}

      *d = 0;
      cli_command(c);
      cli_write_trigger(c);
    }
}

static int
cli_connect(sock *s, uint size UNUSED)
{
@@ -514,16 +378,9 @@ cli_connect(sock *s, uint size UNUSED)

  if (config->cli_debug)
    log(L_INFO "CLI connect");
  s->tx_hook = cli_tx;
  s->err_hook = cli_err;
  s->data = c = cli_new(s);
  s->pool = c->pool;		/* We need to have all the socket buffers allocated in the cli pool */
  c = cli_new(s);
  s->fast_rx = 1;
  c->rx_pos = c->rx_buf;
  c->rx_aux = NULL;
  rmove(s, c->pool);
  c->coro = coro_new(c->pool, cli_coroutine, c);
  coro_resume(c->coro);
  cli_run(c);
  return 1;
}

+0 −1
Original line number Diff line number Diff line
@@ -530,5 +530,4 @@ void sysdep_shutdown_done(void) {}

#include "nest/cli.h"
int cli_get_command(cli *c UNUSED) { return 0; }
void cli_write_trigger(cli *c UNUSED) {}
cli *cmd_reconfig_stored_cli;