Commit edfb4c61 authored by Paul Asmuth's avatar Paul Asmuth
Browse files

queryendpoint

parent 1cb41827
Loading
Loading
Loading
Loading
+2 −1
Original line number Diff line number Diff line
@@ -39,7 +39,8 @@ set(FNORDMETRIC_SOURCES
    $<TARGET_OBJECTS:fnord-json>
    $<TARGET_OBJECTS:fnord-metricdb>
    $<TARGET_OBJECTS:fnord-sstable>
    ../src/environment.cc)
    ../src/environment.cc
    ../src/queryendpoint.cc)
    #stage/src/fnordmetric/util/assets.cc
    #stage/src/fnordmetric/sql/backends/csv/csvbackend.cc
    #stage/src/fnordmetric/sql/backends/csv/csvinputstream.cc
+8 −8
Original line number Diff line number Diff line
@@ -10,10 +10,10 @@
#ifndef _FNORDMETRIC_QUERYSERVICE_H
#define _FNORDMETRIC_QUERYSERVICE_H
#include <fnord-chart/rendertarget.h>
#include <fnord/io/inputstream.h>
#include <fnord-base/io/inputstream.h>
#include <fnord-json/jsonoutputstream.h>
#include <chartsql/query.h>
#include <sql/runtime/defaultruntime.h>
#include "chartsql/query.h"
#include "sql/runtime/defaultruntime.h"

namespace fnordmetric {
namespace query {
@@ -44,9 +44,9 @@ public:
   * @param output_stream The output stream to write the results
   */
  void executeQuery(
      std::shared_ptr<fnord::io::InputStream> input_stream,
      std::shared_ptr<fnord::InputStream> input_stream,
      kFormat output_format,
      std::shared_ptr<fnord::io::OutputStream> output_stream);
      std::shared_ptr<fnord::OutputStream> output_stream);

  /**
   * Execute a query. This may raise an exception.
@@ -56,9 +56,9 @@ public:
   * @param output_stream The output stream to write the results
   */
  void executeQuery(
      std::shared_ptr<fnord::io::InputStream> input_stream,
      std::shared_ptr<fnord::InputStream> input_stream,
      kFormat output_format,
      std::shared_ptr<fnord::io::OutputStream> output_stream,
      std::shared_ptr<fnord::OutputStream> output_stream,
      std::unique_ptr<TableRepository> table_repo,
      int width = -1,
      int height = -1);
@@ -82,7 +82,7 @@ protected:
      int width,
      int height) const;

  void renderTables(Query* query, fnord::io::OutputStream* out) const;
  void renderTables(Query* query, fnord::OutputStream* out) const;

  DefaultRuntime runtime_;
};
+7 −4
Original line number Diff line number Diff line
@@ -31,6 +31,7 @@
#include <fnord-metricdb/metricservice.h>
#include <fnord-metricdb/httpapiservlet.h>
#include <environment.h>
#include <queryendpoint.h>

using fnord::metric_service::MetricService;
using namespace fnordmetric;
@@ -162,9 +163,7 @@ int main(int argc, const char** argv) {
  fnord::thread::EventLoop evloop;
  fnord::thread::ThreadPool server_pool;
  fnord::thread::ThreadPool worker_pool;

  fnord::json::JSONRPC rpc;
  fnord::json::JSONRPCHTTPAdapter rpc_http(&rpc);

  try {
    /* setup MetricService */
@@ -185,10 +184,14 @@ int main(int argc, const char** argv) {

    fnord::metric_service::HTTPAPIServlet metrics_api(&metric_service);
    http_router.addRouteByPrefixMatch("/metrics", &metrics_api);

    fnord::json::JSONRPCHTTPAdapter rpc_http(&rpc);
    http_router.addRouteByPrefixMatch("/rpc", &rpc_http);
    //auto http_api = new HTTPAPI(metric_service.metricRepository());

    QueryEndpoint query_api(metric_service.metricRepository());
    http_router.addRouteByPrefixMatch("/query", &query_api);

    //http_server->addHandler(AdminUI::getHandler());
    //http_server->addHandler(std::unique_ptr<http::HTTPHandler>(http_api));

    /* set up statsd server */
    fnord::statsd::StatsdServer statsd_server(&evloop, &evloop);
+123 −0
Original line number Diff line number Diff line
@@ -9,12 +9,12 @@
 */
#include <fnord-base/stringutil.h>
#include <fnord-metricdb/metricrepository.h>
#include <environment.h>
#include <httpapi.h>
#include <chartsql/queryservice.h>
#include <sql/backends/metricservice/metrictablerepository.h>
#include <sql/backends/csv/csvbackend.h>
#include <sql/backends/mysql/mysqlbackend.h>
#include "environment.h"
#include "queryendpoint.h"
#include "chartsql/queryservice.h"
#include "sql/backends/metricservice/metrictablerepository.h"
#include "sql/backends/csv/csvbackend.h"
#include "sql/backends/mysql/mysqlbackend.h"

namespace fnordmetric {

@@ -23,248 +23,30 @@ static const char kMetricsUrlPrefix[] = "/metrics/";
static const char kQueryUrl[] = "/query";
static const char kLabelParamPrefix[] = "label[";

HTTPAPI::HTTPAPI(
QueryEndpoint::QueryEndpoint(
    fnord::metric_service::IMetricRepository* metric_repo) :
    metric_repo_(metric_repo) {}

bool HTTPAPI::handleHTTPRequest(
void QueryEndpoint::handleHTTPRequest(
    http::HTTPRequest* request,
    http::HTTPResponse* response) {
  fnord::URI uri(request->getUrl());
  fnord::URI uri(request->uri());
  auto path = uri.path();
  fnord::StringUtil::stripTrailingSlashes(&path);

  response->addHeader("Access-Control-Allow-Origin", "*");

  // PATH: ^/metrics/?$
  if (path == kMetricsUrl) {
    switch (request->method()) {
      case http::HTTPRequest::M_GET:
        renderMetricList(request, response, &uri);
        return true;
      case http::HTTPRequest::M_POST:
        insertSample(request, response, &uri);
        return true;
      default:
        return false;
    }
  }

  // PATH: ^/metrics/.*
  if (path.compare(0, sizeof(kMetricsUrlPrefix) - 1, kMetricsUrlPrefix) == 0) {
    // PATH: ^/metrics/(.*)$
    switch (request->method()) {
      case http::HTTPRequest::M_GET:
        renderMetricSampleScan(request, response, &uri);
        return true;
      default:
        return false;
    }
  }

  // PATH: ^/query/?*
  if (path == kQueryUrl) {
    switch (request->method()) {
      case http::HTTPRequest::M_GET:
      case http::HTTPRequest::M_POST:
        executeQuery(request, response, &uri);
        return true;
      default:
        return false;
    }
    return true;
  }

  return false;
}

void HTTPAPI::renderMetricList(
    http::HTTPRequest* request,
    http::HTTPResponse* response,
    fnord::URI* uri) {
  response->setStatus(http::kStatusOK);
  response->addHeader("Content-Type", "application/json; charset=utf-8");
  json::JSONOutputStream jsons(response->getBodyOutputStream());

  jsons.beginObject();
  jsons.addObjectEntry("metrics");
  jsons.beginArray();

  fnord::URI::ParamList params = uri->queryParams();
  std::string filter_query;
  auto filter_enabled = fnord::URI::getParam(params, "filter", &filter_query);

  int limit = -1;
  std::string limit_string;
  if (fnord::URI::getParam(params, "limit", &limit_string)) {
    try {
      limit = std::stoi(limit_string);
    } catch (const std::exception& e) {
      /* fallthrough */
    }
  }

  int i = 0;
  for (const auto& metric : metric_repo_->listMetrics()) {
    if (filter_enabled &&
        metric->key().find(filter_query) == std::string::npos) {
      continue;
    }

    if (i++ > 0) {
      jsons.addComma();
    }

    renderMetricJSON(metric, &jsons);

    if (limit > 0 && i == limit) {
      break;
    }
  }

  jsons.endArray();
  jsons.endObject();
}

void HTTPAPI::insertSample(
    http::HTTPRequest* request,
    http::HTTPResponse* response,
    fnord::URI* uri) {
  const auto& postbody = request->getBody();
  fnord::URI::ParamList params;

  if (postbody.size() > 0) {
    fnord::URI::parseQueryString(postbody, &params);
  } else {
    params = uri->queryParams();
  }

  std::string metric_key;
  if (!fnord::URI::getParam(params, "metric", &metric_key)) {
    response->addBody("error: invalid metric key: " + metric_key);
    response->setStatus(http::kStatusBadRequest);
    return;
  }

  std::string value_str;
  if (!fnord::URI::getParam(params, "value", &value_str)) {
    response->addBody("error: missing ?value=... parameter");
    response->setStatus(http::kStatusBadRequest);
    return;
  }

  std::vector<std::pair<std::string, std::string>> labels;
  for (const auto& param : params) {
    const auto& key = param.first;
    const auto& value = param.second;

    if (key.compare(0, sizeof(kLabelParamPrefix) - 1, kLabelParamPrefix) == 0 &&
        key.back() == ']') {
      auto label_key = key.substr(
          sizeof(kLabelParamPrefix) - 1,
          key.size() - sizeof(kLabelParamPrefix));

      labels.emplace_back(label_key, value);
    }
  }

  double sample_value;
  try {
    sample_value = std::stod(value_str);
  } catch (std::exception& e) {
    response->addBody("error: invalid value: " + value_str);
    response->setStatus(http::kStatusBadRequest);
    return;
  }

  auto metric = metric_repo_->findOrCreateMetric(metric_key);
  metric->insertSample(sample_value, labels);
  response->setStatus(http::kStatusCreated);
}

void HTTPAPI::renderMetricSampleScan(
    http::HTTPRequest* request,
    http::HTTPResponse* response,
    fnord::URI* uri) {
  auto metric_key = uri->path().substr(sizeof(kMetricsUrlPrefix) - 1);
  if (metric_key.size() < 3) {
    response->addBody("error: invalid metric key: " + metric_key);
    response->setStatus(http::kStatusBadRequest);
    return;
  }

  auto metric = metric_repo_->findMetric(metric_key);
  if (metric == nullptr) {
    response->addBody("metric not found: " + metric_key);
    response->setStatus(http::kStatusNotFound);
    return;
  }

  response->setStatus(http::kStatusOK);
  response->addHeader("Content-Type", "application/json; charset=utf-8");
  json::JSONOutputStream jsons(response->getBodyOutputStream());

  jsons.beginObject();

  jsons.addObjectEntry("metric");
  renderMetricJSON(metric, &jsons);
  jsons.addComma();
  auto params = uri.queryParams();

  jsons.addObjectEntry("samples");
  jsons.beginArray();

  int i = 0;
  metric->scanSamples(
      fnord::DateTime::epoch(),
      fnord::DateTime::now(),
      [&jsons, &i] (fnord::metric_service::Sample* sample) -> bool {
        if (i++ > 0) { jsons.addComma(); }
        jsons.beginObject();

        jsons.addObjectEntry("time");
        jsons.addValue<uint64_t>(static_cast<uint64_t>(sample->time()));
        jsons.addComma();

        jsons.addObjectEntry("value");
        jsons.addValue<double>(sample->value());
        jsons.addComma();

        jsons.addObjectEntry("labels");
        jsons.beginObject();
        auto labels = sample->labels();
        for (int n = 0; n < labels.size(); n++) {
          if (n > 0) {
            jsons.addComma();
          }

          jsons.addObjectEntry(labels[n].first);
          jsons.addString(labels[n].second);
        }
        jsons.endObject();

        jsons.endObject();
        return true;
      });

  jsons.endArray();
  jsons.endObject();
}

void HTTPAPI::executeQuery(
    http::HTTPRequest* request,
    http::HTTPResponse* response,
    fnord::URI* uri) {
  auto params = uri->queryParams();

  std::shared_ptr<io::InputStream> input_stream;
  std::shared_ptr<fnord::InputStream> input_stream;
  std::string get_query;
  if (fnord::URI::getParam(params, "q", &get_query)) {
    input_stream.reset(new fnord::io::StringInputStream(get_query));
    input_stream.reset(new fnord::StringInputStream(get_query));
  } else {
    input_stream = request->getBodyInputStream();
  }

  std::shared_ptr<io::OutputStream> output_stream =
  std::shared_ptr<fnord::OutputStream> output_stream =
      response->getBodyOutputStream();

  query::QueryService query_service;
@@ -335,37 +117,7 @@ void HTTPAPI::executeQuery(
    json.addString(e.getMessage());
    json.endObject();
  }
}

void HTTPAPI::renderMetricJSON(
    fnord::metric_service::IMetric* metric,
    fnord::json::JSONOutputStream* json) const {
  json->beginObject();

  json->addObjectEntry("key");
  json->addString(metric->key());
  json->addComma();

  json->addObjectEntry("total_bytes");
  json->addValue<size_t>(metric->totalBytes());
  json->addComma();

  json->addObjectEntry("last_insert");
  json->addValue<uint64_t>(static_cast<uint64_t>(metric->lastInsertTime()));
  json->addComma();

  json->addObjectEntry("labels");
  json->beginArray();
  auto labels = metric->labels();
  for (auto cur = labels.begin(); cur != labels.end(); ++cur) {
    if (cur != labels.begin()) {
      json->addComma();
    }
    json->addString(*cur);
  }
  json->endArray();

  json->endObject();
}

}
+4 −28
Original line number Diff line number Diff line
@@ -11,7 +11,7 @@
#define _FNORDMETRIC_METRICDB_HTTPINTERFACE_H
#include <memory>
#include <fnord-base/uri.h>
#include <fnord-http/httphandler.h>
#include <fnord-http/httpservice.h>
#include <fnord-http/httprequest.h>
#include <fnord-http/httpresponse.h>
#include <fnord-json/jsonoutputstream.h>
@@ -21,41 +21,17 @@ namespace fnordmetric {
class IMetric;
class IMetricRepository;

class HTTPAPI : public http::HTTPHandler {
class QueryEndpoint : public http::HTTPService {
public:

  HTTPAPI(fnord::metric_service::IMetricRepository* metric_repo);
  QueryEndpoint(fnord::metric_service::IMetricRepository* metric_repo);

  bool handleHTTPRequest(
  void handleHTTPRequest(
      http::HTTPRequest* request,
      http::HTTPResponse* response) override;

protected:

  void renderMetricList(
      http::HTTPRequest* request,
      http::HTTPResponse* response,
      fnord::URI* uri);

  void renderMetricSampleScan(
      http::HTTPRequest* request,
      http::HTTPResponse* response,
      fnord::URI* uri);

  void insertSample(
      http::HTTPRequest* request,
      http::HTTPResponse* response,
      fnord::URI* uri);

  void executeQuery(
      http::HTTPRequest* request,
      http::HTTPResponse* response,
      fnord::URI* uri);

  void renderMetricJSON(
      fnord::metric_service::IMetric* metric,
      json::JSONOutputStream* json) const;

  fnord::metric_service::IMetricRepository* metric_repo_;
};

Loading