Commit d5fa1d53 authored by Paul Asmuth's avatar Paul Asmuth
Browse files

fn-feedtail

parent f547f0ce
Loading
Loading
Loading
Loading
+17 −0
Original line number Diff line number Diff line
project(FNORD_FEEDS)

include_directories(../)
include_directories(../3rdparty)

add_library(fnord-feeds OBJECT
    LocalFeed.cc
@@ -10,6 +11,22 @@ add_library(fnord-feeds OBJECT
    RemoteFeedWriter.cc
    FeedService.cc)

add_executable(fn-feedtail
    $<TARGET_OBJECTS:fnord-base>
    $<TARGET_OBJECTS:fnord-cli>
    $<TARGET_OBJECTS:fnord-comm>
    $<TARGET_OBJECTS:fnord-http>
    $<TARGET_OBJECTS:fnord-json>
    $<TARGET_OBJECTS:fnord-net>
    $<TARGET_OBJECTS:fnord-stats>
    $<TARGET_OBJECTS:fnord-sstable>
    $<TARGET_OBJECTS:fnord-feeds>
    $<TARGET_OBJECTS:fnord-util>
    $<TARGET_OBJECTS:fnord-rpc>
    fn-feedtail.cc)

target_link_libraries(fn-feedtail ${CMAKE_THREAD_LIBS_INIT})

#add_executable(fnord-logstream-service-example
#    $<TARGET_OBJECTS:fnord-base>
#    $<TARGET_OBJECTS:fnord-comm>
+193 −0
Original line number Diff line number Diff line
/**
 * This file is part of the "FnordMetric" project
 *   Copyright (c) 2014 Paul Asmuth, Google Inc.
 *
 * FnordMetric is free software: you can redistribute it and/or modify it under
 * the terms of the GNU General Public License v3.0. You should have received a
 * copy of the GNU General Public License along with this program. If not, see
 * <http://www.gnu.org/licenses/>.
 */
#include <stdlib.h>
#include <unistd.h>
#include "fnord/base/io/filerepository.h"
#include "fnord/base/io/fileutil.h"
#include "fnord/base/application.h"
#include "fnord/base/logging.h"
#include "fnord/base/random.h"
#include "fnord/base/thread/eventloop.h"
#include "fnord/base/thread/threadpool.h"
#include "fnord/base/wallclock.h"
#include "fnord-rpc/ServerGroup.h"
#include "fnord-rpc/RPC.h"
#include "fnord-rpc/RPCClient.h"
#include "fnord/cli/flagparser.h"
#include "fnord/json/json.h"
#include "fnord/json/jsonrpc.h"
#include "fnord/net/http/httprouter.h"
#include "fnord/net/http/httpserver.h"
#include "fnord-feeds/FeedService.h"
#include "fnord-feeds/RemoteFeedFactory.h"
#include "fnord-feeds/RemoteFeedReader.h"
#include "fnord/stats/statsdagent.h"
#include "fnord-mdb/MDB.h"
#include "cm-common/CustomerNamespace.h"
#include "cm-logjoin/LogJoin.h"

using namespace fnord;

int main(int argc, const char** argv) {
  fnord::Application::init();
  fnord::Application::logToStderr();

  fnord::cli::FlagParser flags;

  flags.defineFlag(
      "statefile",
      fnord::cli::FlagParser::T_STRING,
      false,
      NULL,
      NULL,
      "statefile",
      "<filename>");

  flags.defineFlag(
      "batch_size",
      fnord::cli::FlagParser::T_INTEGER,
      false,
      NULL,
      "2048",
      "batch_size",
      "<num>");

  flags.defineFlag(
      "buffer_size",
      fnord::cli::FlagParser::T_INTEGER,
      false,
      NULL,
      "8192",
      "buffer_size",
      "<num>");

  flags.defineFlag(
      "commit_size",
      fnord::cli::FlagParser::T_INTEGER,
      false,
      NULL,
      "1024",
      "commit_size",
      "<num>");

  flags.defineFlag(
      "loglevel",
      fnord::cli::FlagParser::T_STRING,
      false,
      NULL,
      "INFO",
      "loglevel",
      "<level>");

  flags.parseArgv(argc, argv);

  Logger::get()->setMinimumLogLevel(
      strToLogLevel(flags.getString("loglevel")));

  /* start event loop */
  fnord::thread::EventLoop ev;

  auto evloop_thread = std::thread([&ev] {
    ev.run();
  });

  /* set up rpc client */
  HTTPRPCClient rpc_client(&ev);

  size_t batch_size = flags.getInt("batch_size");
  size_t buffer_size = flags.getInt("buffer_size");
  size_t commit_size = flags.getInt("commit_size");

  /* set up input feed reader */
  feeds::RemoteFeedReader feed_reader(&rpc_client);

  /* get source urls */
  Vector<String> uris = flags.getArgv();
  if (flags.isSet("statefile")) {
    auto statefile = FileUtil::read(flags.getString("statefile")).toString();
    for (const auto& uri : StringUtil::split(statefile, "\n")) {
      if (uri.size() > 0) {
        uris.emplace_back(uri);
      }
    }
  }

  HashMap<String, String> feed_urls;
  for (const auto& uri_raw : uris) {
    URI uri(uri_raw);
    const auto& params = uri.queryParams();

    std::string feed;
    if (!URI::getParam(params, "feed", &feed)) {
      RAISE(kIllegalArgumentError, "feed url missing ?feed query param");
    }

    feed_urls.emplace(feed, uri_raw.substr(0, uri_raw.find("?")));

    std::string offset_str;
    uint64_t offset = 0;
    if (URI::getParam(params, "offset", &offset_str)) {
      offset = std::stoul(offset_str);
    }

    feed_reader.addSourceFeed(
        uri,
        feed,
        offset,
        batch_size,
        buffer_size);
  }

  DateTime last_iter;
  uint64_t rate_limit_micros = 1 * kMicrosPerSecond;

  for (int i = 0; i < commit_size; ++i) {
    last_iter = WallClock::now();
    feed_reader.waitForNextEntry();

    for (;;) {
      auto entry = feed_reader.fetchNextEntry();

      if (entry.isEmpty()) {
        break;
      }

      fnord::iputs("$0", entry.get().data);
    }

    if (flags.isSet("statefile")) {
      auto stream_offsets = feed_reader.streamOffsets();
      Buffer statefile;

      for (const auto& soff : stream_offsets) {
        statefile.append(
            StringUtil::format(
                "$0?feed=$1&offset=$2\n",
                feed_urls[soff.first],
                soff.first,
                soff.second));
      }

      FileUtil::write(flags.getString("statefile") + "~", statefile);
      FileUtil::mv(
          flags.getString("statefile") + "~",
          flags.getString("statefile"));
    }

    auto etime = WallClock::now().unixMicros() - last_iter.unixMicros();
    if (etime < rate_limit_micros) {
      usleep(rate_limit_micros - etime);
    }
  }

  evloop_thread.join();
  return 0;
}
+12 −0
Original line number Diff line number Diff line
@@ -115,6 +115,18 @@ size_t File::read(Buffer* buf) {
  return read(buf->data(), buf->size());
}

void File::write(const void* buf, size_t buf_len) {
  int res = ::write(fd_, buf, buf_len);

  if (res < 0 || res != buf_len) {
    RAISE_ERRNO(kIOError, "write(%i) failed", fd_);
  }
}

void File::write(const Buffer& buf) {
  write(buf.data(), buf.size());
}

File File::clone() const {
  int new_fd = dup(fd_);

+3 −0
Original line number Diff line number Diff line
@@ -80,6 +80,9 @@ public:
  size_t read(void* buf, size_t buf_len);
  size_t read(Buffer* buf);

  void write(const Buffer& buf);
  void write(const void* buf, size_t buf_len);

  int fd() const;
  size_t size() const;

+15 −2
Original line number Diff line number Diff line
@@ -144,6 +144,11 @@ void FileUtil::rm(const std::string& filename) {
  unlink(filename.c_str());
}

void FileUtil::mv(const std::string& src, const std::string& dst) {
  if (::rename(src.c_str(), dst.c_str()) < 0) {
    RAISE_ERRNO(kIOError, "rename(%s, %s) failed", src.c_str(), dst.c_str());
  }
}

void FileUtil::truncate(const std::string& filename, size_t new_size) {
  if (::truncate(filename.c_str(), new_size) < 0) {
@@ -151,11 +156,19 @@ void FileUtil::truncate(const std::string& filename, size_t new_size) {
  }
}

std::string FileUtil::read(const std::string& filename) {
Buffer FileUtil::read(const std::string& filename) {
  auto file = File::openFile(filename, File::O_READ);
  Buffer buf(file.size());
  file.read(&buf);
  return buf.toString();
  return buf;
}

void FileUtil::write(const std::string& filename, const Buffer& data) {
  auto file = File::openFile(
      filename,
      File::O_WRITE | File::O_CREATEOROPEN | File::O_TRUNCATE);

  file.write(data);
}

void FileUtil::cp(const std::string& src, const std::string& destination) {
Loading