Commit a624d832 authored by Maria Matejka's avatar Maria Matejka
Browse files

Worker: First running try of parallel worker subsystem

parent b8e01bc4
Loading
Loading
Loading
Loading
+116 −0
Original line number Diff line number Diff line
@@ -4,6 +4,7 @@ class BIRDPrinter:

    @classmethod
    def lookup(cls, val):
#        print(cls.typeCode, cls.typeTag, val.type.code, val.type.tag)
        if val.type.code != cls.typeCode:
            return None
        if val.type.tag != cls.typeTag:
@@ -140,6 +141,81 @@ class BIRDFExecStackPrinter(BIRDPrinter):
                    "n": n
                        } for n in range(cnt-1, -1, -1) ])

class BIRDWQStateLogPrinter(BIRDPrinter):
    "Print BIRD's worker queue struct worker_queue_state"
    typeCode = gdb.TYPE_CODE_STRUCT
    typeTag = "worker_queue_state"

    def queueval(self):
        return "%(worker) 4d %(what)s: pending %(pending)s, running %(running)s, stop %(stop)s" % {
                "worker": self.val['worker_id'],
                "what": str(self.val['what']),
                "pending": str(self.val['queue']['pending']),
                "running": str(self.val['queue']['running']),
                "stop": str(self.val['queue']['stop']),
                }

    def semval(self):
        semname = None
        semaddr = self.val["sem"]
        wq = gdb.lookup_symbol("wq")[0].value().dereference()
        for s in [ "waiting", "stopped", "yield", "available" ]:
            if semaddr == wq[s].address:
                if semname is not None:
                    raise Exception("Two matching semaphores!")

                semname = s

        if semname is None:
            raise Exception("No matching semaphore!")

        return "%(worker) 4d %(what)s %(semname)s" % {
                "worker": self.val['worker_id'],
                "what": str(self.val["what"]),
                "semname": semname
                }

    def domval(self):
        return "%(worker) 4d %(what)s: %(task)s on %(domain)s: RP=%(rdtasks_n)u WP=%(wrtasks_n)u RS=%(rdsem_n)u WS=%(wrsem_n)u RL=%(rdlocked)u PREP=%(prepended)u" % {
                "worker": self.val['worker_id'],
                "what": str(self.val["what"]),
                "task": str(self.val["domain"]["task"]),
                "domain": str(self.val["domain"]["domain"]),
                "rdtasks_n": self.val["domain"]["rdtasks_n"],
                "wrtasks_n": self.val["domain"]["wrtasks_n"],
                "rdsem_n": self.val["domain"]["rdsem_n"],
                "wrsem_n": self.val["domain"]["wrsem_n"],
                "rdlocked": self.val["domain"]["rdlocked"],
                "prepended": self.val["domain"]["prepended"],
                }

    def nothing(self):
        return "nothing"

    def to_string(self):
        return {
                "WQS_NOTHING": self.nothing,
                "WQS_LOCK": self.queueval,
                "WQS_UNLOCK": self.queueval,
                "WQS_YIELD": self.queueval,
                "WQS_CONTINUE": self.queueval,
                "WQS_SEM_POST": self.semval,
                "WQS_SEM_WAIT_REQUEST": self.semval,
                "WQS_SEM_WAIT_SUCCESS": self.semval,
                "WQS_SEM_TRYWAIT_SUCCESS": self.semval,
                "WQS_SEM_TRYWAIT_BLOCKED": self.semval,
                "WQS_DOMAIN_WRLOCK_REQUEST": self.domval,
                "WQS_DOMAIN_RDLOCK_REQUEST": self.domval,
                "WQS_DOMAIN_WRLOCK_SUCCESS": self.domval,
                "WQS_DOMAIN_RDLOCK_SUCCESS": self.domval,
                "WQS_DOMAIN_WRLOCK_BLOCKED": self.domval,
                "WQS_DOMAIN_RDLOCK_BLOCKED": self.domval,
                "WQS_DOMAIN_RDUNLOCK_REQUEST": self.domval,
                "WQS_DOMAIN_WRUNLOCK_REQUEST": self.domval,
                "WQS_DOMAIN_RDUNLOCK_DONE": self.domval,
                "WQS_DOMAIN_WRUNLOCK_DONE": self.domval,
                }[str(self.val['what'])]()

def register_printers(objfile):
    objfile.pretty_printers.append(BIRDFInstPrinter.lookup)
    objfile.pretty_printers.append(BIRDFValPrinter.lookup)
@@ -147,7 +223,47 @@ def register_printers(objfile):
    objfile.pretty_printers.append(BIRDFLineItemPrinter.lookup)
    objfile.pretty_printers.append(BIRDFLinePrinter.lookup)
    objfile.pretty_printers.append(BIRDFExecStackPrinter.lookup)
    objfile.pretty_printers.append(BIRDWQStateLogPrinter.lookup)

register_printers(gdb.current_objfile())

class BIRDCommand(gdb.Command):
    def __init__(self):
        super().__init__("bird", gdb.COMMAND_NONE, gdb.COMPLETE_COMMAND, True)

    def invoke(self, argument, from_tty):
        pass

class BIRDWQCommand(gdb.Command):
    def __init__(self):
        super().__init__("bird wq", gdb.COMMAND_NONE, gdb.COMPLETE_COMMAND, True)

    def invoke(self, argument, from_tty):
        pass

class BIRDWQStateLogCommand(gdb.Command):
    "Show worker queue last state log"
    def __init__(self):
        super().__init__("bird wq statelog", gdb.COMMAND_DATA, gdb.COMPLETE_EXPRESSION)

    def invoke(self, argument, from_tty):
        args = gdb.string_to_argv(argument)
        if len(args) > 1:
            raise Exception("Too many args")

        total = gdb.parse_and_eval(args[0]) if len(args) >= 1 else 10
        wq = gdb.lookup_symbol("wq")[0].value().dereference()
        statelog_size = gdb.lookup_symbol("STATELOG_SIZE")[0].value()
        statelog_pos = wq["statelog_pos"]

        print("   idx  WID command")

        for i in range(total):
            idx = (statelog_pos + statelog_size - i - 1) % statelog_size
            print("%(idx) 5d" % { "idx": idx }, wq["statelog"][idx])

BIRDCommand()
BIRDWQCommand()
BIRDWQStateLogCommand()

print("BIRD pretty printers loaded OK.")
+2 −1
Original line number Diff line number Diff line
@@ -45,7 +45,8 @@ struct config {
  u32 latency_limit;			/* Events with longer duration are logged (us) */
  u32 watchdog_warning;			/* I/O loop watchdog limit for warning (us) */
  u32 watchdog_timeout;			/* Watchdog timeout (in seconds, 0 = disabled) */
  uint workers;				/* How many workers should run */
  uint workers;				/* How many workers should run by default */
  uint max_workers;			/* How many workers should run at maximum */
  char *err_msg;			/* Parser error message */
  int err_lino;				/* Line containing error */
  int err_chno;				/* Character where the parser stopped */
+2 −3
Original line number Diff line number Diff line
@@ -86,6 +86,8 @@ main(int argc, char *argv[])
  bt_init(argc, argv);
  bt_bird_init();

  worker_queue_init();
  
  bt_assert_hook = bt_assert_filter;

  struct config *c = NULL;
@@ -93,9 +95,6 @@ main(int argc, char *argv[])
  bt_test_suite_base(parse_config_file, "conf", (const void *) &pcfa, 0, 0, "parse config file");
  bt_test_suite_base(parse_config_file, "reconf", (const void *) &pcfa, 0, 0, "reconfigure with the same file");

  while (ev_run_list(&global_event_list))
    ;

  bt_bird_cleanup();

  if (c)
+1 −0
Original line number Diff line number Diff line
@@ -139,6 +139,7 @@ typedef struct buffer {
void log_commit(int class, buffer *buf);
void log_msg(const char *msg, ...);
void log_rl(struct tbf *rl, const char *msg, ...);
void debug_flush(void);
void die(const char *msg, ...) NORET;
void bug(const char *msg, ...) NORET;

+23 −3
Original line number Diff line number Diff line
@@ -6,12 +6,25 @@
 *	Can be freely distributed and used under the terms of the GNU GPL.
 */

#ifndef _BIRD_WORKER_H_
#define _BIRD_WORKER_H_

#include "lib/birdlib.h"

struct config;

struct domain;
struct limiter;
struct domain *domain_new(pool *p);
void domain_read_lock(struct domain *);
void domain_read_unlock(struct domain *);
void domain_write_lock(struct domain *);
void domain_write_unlock(struct domain *);

extern _Thread_local struct domain *worker_domain;
extern _Thread_local enum task_flags worker_task_flags;

void domain_assert_write_locked(struct domain *);
void domain_assert_read_locked(struct domain *);
void domain_assert_unlocked(struct domain *);

enum task_flags {
  /* These flags can be set by the user */
@@ -31,10 +44,15 @@ struct task {
/* Initialize the worker queue. Run once and never more. */
void worker_queue_init(void);

/* Flush and cleanup the worker queue. Run only in tests. */
void worker_queue_destroy(void);

void worker_queue_init(void);

/* Update configuration for worker queue
 * @c: new config
 */
void worker_queue_update(struct config *c);
void worker_queue_update(const struct config *c);

/* Push some work to the queue.
 * @t: task to push
@@ -54,3 +72,5 @@ struct io_ping_handle *io_ping_new(void (*hook)(struct io_ping_handle *));

/* Issue the ping. Run from a worker thread. */
void io_ping(struct io_ping_handle *);

#endif
Loading