Commit 4b443663 authored by Paul Asmuth's avatar Paul Asmuth
Browse files

implement simple GROUP OVER TIMEWINDOW

parent c0ccfe4f
Loading
Loading
Loading
Loading
+132 −3
Original line number Diff line number Diff line
@@ -8,6 +8,8 @@
 * <http://www.gnu.org/licenses/>.
 */
#include <fnordmetric/sql/runtime/groupovertimewindow.h>
#include <fnordmetric/sql/runtime/compile.h>
#include <fnordmetric/sql/runtime/execute.h>

namespace fnordmetric {
namespace query {
@@ -29,16 +31,143 @@ GroupOverTimewindow::GroupOverTimewindow(
    group_expr_(group_expr),
    scratchpad_size_(scratchpad_size),
    child_(child) {
  printf("new group over clause with %i, %i\n", window, step);
  scratchpad_.reset(malloc(scratchpad_size_));
  if (scratchpad_.get() == nullptr) {
    RAISE(kMallocError, "malloc() failed");
  }

  child->setTarget(this);
}

void GroupOverTimewindow::execute() {
  RAISE(kNotYetImplementedError, "NYI");
  child_->execute();

  for (auto& group : groups_) {
    emitGroup(&group.second);
  }
}

bool GroupOverTimewindow::nextRow(SValue* row, int row_len) {
  RAISE(kNotYetImplementedError, "NYI");
  SValue out[128]; // FIXPAUL
  int out_len;

  /* execute group expression */
  if (group_expr_ != nullptr) {
    executeExpression(group_expr_, nullptr, row_len, row, &out_len, out);
  }

  /* stringify expression results into group key */
  auto key_str = SValue::makeUniqueKey(out, out_len);

  /* get group */
  Group* group = nullptr;

  auto group_iter = groups_.find(key_str);
  if (group_iter == groups_.end()) {
    group = &groups_[key_str];
  } else {
    group = &group_iter->second;
  }

  /* execute time expression */
  executeExpression(time_expr_, nullptr, row_len, row, &out_len, out);
  if (out_len != 1) {
    RAISE(
        kRuntimeError,
        "time_expr in GROUP OVER TIMEWINDOW clause must return exactly one"
        " value, got %i",
        (int) out_len);
  }

  auto time = static_cast<uint64_t>(out[0].getTimestamp());

  /* add row to group */
  std::vector<SValue> row_vec;
  for (int i = 0; i < row_len; i++) {
    row_vec.push_back(row[i]);
  }

  group->rows.emplace_back(time, row_vec);

  return true;
}

void GroupOverTimewindow::emitGroup(Group* group) {
  auto& rows = group->rows;

  if (rows.size() == 0) {
    return;
  }

  /* sort rows */
  std::sort(
      rows.begin(),
      rows.end(),
      [] (
          const std::pair<uint64_t, std::vector<SValue>>& a,
          const std::pair<uint64_t, std::vector<SValue>>& b) {
            return a.first < b.first;
          });


  size_t window_start_idx = 0;
  size_t window_end_idx;
  uint64_t window_start_time = rows[0].first;

  do {
    /* search end of current window */
    auto window_end_time = window_start_time + window_ * 1000000;
    for (
        window_end_idx = window_start_idx;
        window_end_idx < rows.size() &&
            rows[window_end_idx].first < window_end_time;
        ++window_end_idx);

    printf("window %i, %i (%lu-%lu=%lu)\n",
        window_start_idx,
        window_end_idx,
        window_start_time,
        window_end_time,
        window_end_time - window_start_time);

    emitWindow(
        window_end_time,
        rows.begin() + window_start_idx,
        rows.begin() + window_end_idx);

    /* advance window */
    window_start_time += step_ * 1000000;
    while (window_start_idx <= window_end_idx &&
        rows[window_start_idx].first < window_start_time) {
      window_start_idx++;
    }

  } while (window_end_idx < rows.size());
}

void GroupOverTimewindow::emitWindow(
    uint64_t window_time,
    std::vector<std::pair<uint64_t, std::vector<SValue>>>::iterator
        window_begin,
    std::vector<std::pair<uint64_t, std::vector<SValue>>>::iterator
        window_end) {

  SValue out[128]; // FIXPAUL
  int out_len;

  memset(scratchpad_.get(), 0, scratchpad_size_);

  for (; window_begin != window_end; window_begin++) {
    executeExpression(
        select_expr_,
        scratchpad_.get(),
        window_begin->second.size(),
        window_begin->second.data(),
        &out_len,
        out);
  }

  emitRow(out, out_len);
}

size_t GroupOverTimewindow::getNumCols() const {
+11 −2
Original line number Diff line number Diff line
@@ -45,10 +45,18 @@ public:
protected:

  struct Group {
    std::vector<SValue> row;
    void* scratchpad;
    std::vector<std::pair<uint64_t, std::vector<SValue>>> rows;
  };

  void emitGroup(Group* group);

  void emitWindow(
      uint64_t window_time,
      std::vector<std::pair<uint64_t, std::vector<SValue>>>::iterator
          window_begin,
      std::vector<std::pair<uint64_t, std::vector<SValue>>>::iterator
          window_end);

  std::vector<std::string> columns_;
  CompiledExpression* time_expr_;
  fnordmetric::IntegerType window_;
@@ -57,6 +65,7 @@ protected:
  CompiledExpression* group_expr_;
  size_t scratchpad_size_;
  QueryPlanNode* child_;
  std::unique_ptr<void> scratchpad_;
  std::unordered_map<std::string, Group> groups_;
};

+8 −1
Original line number Diff line number Diff line
@@ -105,6 +105,10 @@ class TestTimeTableRef : public TableRef {

    for (int i = 0; i < 500; ++i) {
      std::vector<SValue> row;
      if (i == 300) {
        start_time += 120000000;
      }

      row.emplace_back(fnord::util::DateTime(start_time + 1000000 * i));
      row.emplace_back(SValue((fnordmetric::IntegerType) i));
      if (!scan->nextRow(row.data(), row.size())) {
@@ -1235,8 +1239,11 @@ TEST_CASE(SQLTest, TestRuntime, [] () {

TEST_CASE(SQLTest, TestSimpleGroupOverTimeWindow, [] () {
  auto result = executeTestQuery(
      "  SELECT sum(value) FROM timeseries GROUP OVER TIMEWINDOW(time, 60);");
      "  SELECT time, sum(value) "
      "      FROM timeseries"
      "      GROUP OVER TIMEWINDOW(time, 60);");

  result->debugPrint();
  EXPECT_EQ(result->getNumRows(), 1);
  EXPECT_EQ(result->getNumColumns(), 1);
  EXPECT_EQ(result->getRow(0)[0], "123");
+4 −1
Original line number Diff line number Diff line
@@ -14,7 +14,7 @@
[ SQL ]
    - match function names case-insensitively
    - sql time helpers (1hour, etc)
    - functions: sum, avg, percentile, mean, variance, stddev, delta, nth_derivate,
    - functions: avg, mean, max/min, percentile, variance, stddev, delta, nth_derivate,
    - support red/green/blue/hex colors
    - round fn

@@ -43,6 +43,9 @@
    - do I need an external database

[ feature q ]
    - union
    - cross join
    - GROUP OVER TIMEWINDOW on multiple tables
    - find time constraint from sql query
    - scan samples with range / sstable binary search
    - sql: in