Commit c8406481 authored by Laura Schlimmer's avatar Laura Schlimmer
Browse files

Merge branch 'header' into cockpit

parents bf705620 c7063454
Loading
Loading
Loading
Loading
+18 −0
Original line number Diff line number Diff line
project(FNORD_FEEDS)

include_directories(../)
include_directories(../3rdparty)

add_library(fnord-feeds OBJECT
    LocalFeed.cc
    RemoteFeed.cc
    RemoteFeedFactory.cc
    RemoteFeedReader.cc
    RemoteFeedWriter.cc
    FeedService.cc)

add_executable(fn-feedtail
    $<TARGET_OBJECTS:fnord-base>
    $<TARGET_OBJECTS:fnord-cli>
    $<TARGET_OBJECTS:fnord-comm>
    $<TARGET_OBJECTS:fnord-http>
    $<TARGET_OBJECTS:fnord-json>
    $<TARGET_OBJECTS:fnord-net>
    $<TARGET_OBJECTS:fnord-stats>
    $<TARGET_OBJECTS:fnord-sstable>
    $<TARGET_OBJECTS:fnord-feeds>
    $<TARGET_OBJECTS:fnord-util>
    $<TARGET_OBJECTS:fnord-rpc>
    fn-feedtail.cc)

target_link_libraries(fn-feedtail ${CMAKE_THREAD_LIBS_INIT})

#add_executable(fnord-logstream-service-example
#    $<TARGET_OBJECTS:fnord-base>
#    $<TARGET_OBJECTS:fnord-comm>
+1 −0
Original line number Diff line number Diff line
@@ -29,6 +29,7 @@ struct FeedEntry {
    meta->prop(&FeedEntry::offset, 1, "offset", false);
    meta->prop(&FeedEntry::next_offset, 2, "next_offset", false);
    meta->prop(&FeedEntry::data, 3, "data", false);
    meta->prop(&FeedEntry::time, 4, "time", false);
  }
};

+6 −2
Original line number Diff line number Diff line
@@ -117,6 +117,10 @@ std::vector<FeedEntry> LogStream::fetch(uint64_t offset, int batch_size) {
    cursor = table->writer->getCursor();
  }

  if (offset == std::numeric_limits<uint64_t>::max()) {
    offset = head_offset_;
  }

  if (offset > 0) {
#ifndef FNORD_NOTRACE
    fnord::logTrace(
@@ -144,10 +148,9 @@ std::vector<FeedEntry> LogStream::fetch(uint64_t offset, int batch_size) {
    fnord::logTrace(
        "fnord.feeds.localfeed",
        "request id=$0: reading entry at table_offset=$1 "
            "table_real_offset=$2 logical_offset=$3",
            "logical_offset=$2",
        request_id,
        cursor->position(),
        cursor->position() + reader->bodyOffset(),
        table->offset + cursor->position());
#endif

@@ -163,6 +166,7 @@ std::vector<FeedEntry> LogStream::fetch(uint64_t offset, int batch_size) {
    entry.offset = table->offset + cursor->position();
    entry.next_offset = table->offset + cursor->nextPosition();
    entry.data = cursor->getDataString();
    entry.time = 0;
    entries.emplace_back(std::move(entry));

    if (!cursor->next()) {
+264 −0
Original line number Diff line number Diff line
/**
 * This file is part of the "FnordMetric" project
 *   Copyright (c) 2014 Paul Asmuth, Google Inc.
 *
 * FnordMetric is free software: you can redistribute it and/or modify it under
 * the terms of the GNU General Public License v3.0. You should have received a
 * copy of the GNU General Public License along with this program. If not, see
 * <http://www.gnu.org/licenses/>.
 */
#include "fnord-feeds/RemoteFeedReader.h"
#include "fnord/base/logging.h"
#include "fnord/json/JSONRPCCodec.h"
#include "fnord-feeds/FeedService.h"

namespace fnord {
namespace feeds {

// TODO: retry timeout after error?

RemoteFeedReader::RemoteFeedReader(
    RPCClient* rpc_client) :
    rpc_client_(rpc_client),
    time_backfill_fn_(nullptr),
    max_spread_(1 * kMicrosPerSecond) {}

void RemoteFeedReader::addSourceFeed(
    URI rpc_url,
    String feed_name,
    uint64_t initial_offset,
    size_t batch_size /* = kDefaultBatchSize */,
    size_t max_buffer_size /* = kDefaultMaxBufferSize */) {
  for (const auto& source : sources_) {
    if (source->feed_name == feed_name) {
      RAISEF(kIndexError, "feed '$0' already exists", feed_name);
    }
  }

  auto source = new SourceFeed();
  source->rpc_url = rpc_url;
  source->feed_name = feed_name;
  source->batch_size = batch_size;
  source->max_buffer_size = max_buffer_size;
  source->is_fetching = false;
  source->next_offset = initial_offset;
  source->consumed_offset = initial_offset;
  source->stream_time = 0;
  sources_.emplace_back(source);
}

Option<FeedEntry> RemoteFeedReader::fetchNextEntry() {
  ScopedLock<std::mutex> lk(mutex_);
  int idx = -1;
  uint64_t min_stream_time = std::numeric_limits<uint64_t>::max();

  for (int i = 0; i < sources_.size(); ++i) {
    const auto& source = sources_[i];

    if (source->stream_time.unixMicros() < min_stream_time) {
      min_stream_time = source->stream_time.unixMicros();

      if (source->read_buffer.size() > 0) {
        idx = i;
      } else {
        idx = -1;
      }
    }
  }

  if (idx == -1) {
    min_stream_time += max_spread_.microseconds();

    for (int i = 0; i < sources_.size(); ++i) {
      const auto& source = sources_[i];

      if (source->stream_time.unixMicros() < min_stream_time &&
          source->read_buffer.size() > 0) {
        min_stream_time = source->stream_time.unixMicros();
        idx = i;
      }
    }
  }

  if (idx < 0) {
    return None<FeedEntry>();
  } else {
    const auto& source = sources_[idx];
    auto entry = source->read_buffer.front();
    source->read_buffer.pop_front();

    if (entry.time > source->stream_time) {
      source->stream_time = entry.time;
    }

    source->consumed_offset = entry.next_offset;
    return Some(entry);
  }
}

void RemoteFeedReader::fillBuffer(SourceFeed* source) {
#ifndef NDEBUG
  fnord::logTrace(
      "fnord.feeds.remotefeedreader",
      "Fetching from feed\n    name=$0\n    url=$1\n    offset=$2",
      source->feed_name,
      source->rpc_url.toString(),
      source->next_offset);
#endif

  auto rpc = fnord::mkRPC<json::JSONRPCCodec>(
      &FeedService::fetch,
      source->feed_name,
      source->next_offset,
      (int) source->batch_size);

  rpc_client_->call(source->rpc_url, rpc.get());

  rpc->onSuccess([this, source] (const decltype(rpc)::ValueType& r) mutable {
    ScopedLock<std::mutex> lk(mutex_);

    for (const auto& e : r.result()) {
      auto entry = e;

      if (time_backfill_fn_) {
        entry.time = time_backfill_fn_(entry);
      }

      source->read_buffer.emplace_back(std::move(entry));
    }

    source->is_fetching = false;

    if (source->read_buffer.size() > 0) {
      source->next_offset = source->read_buffer.back().next_offset;
      lk.unlock();
      data_available_wakeup_.wakeup();
    }
  });

  rpc->onError([this, source] (const Status& status) {
    ScopedLock<std::mutex> lk(mutex_);
    source->is_fetching = false;
    lk.unlock();

    logError(
        "fnord.feeds.remotefeedreader",
        "Error while fetching from feed:\n" \
        "    feed=$1\n    url=$0\n    error=$2",
        source->rpc_url.toString(),
        source->feed_name,
        status);

    data_available_wakeup_.wakeup();
  });
}

void RemoteFeedReader::waitForNextEntry() {
  fillBuffers();

  ScopedLock<std::mutex> lk(mutex_);
  bool is_data_available = false;
  bool is_any_fetching = false;

  for (const auto& source : sources_) {
    if (source->read_buffer.size() > 0) {
      is_data_available = true;
    }

    if (source->is_fetching) {
      is_any_fetching = true;
    }
  }

  /* fastpath if there is data available on any feed */
  if (is_data_available) {
    return;
  }

  if (!is_any_fetching) {
    return;
  }

  auto wakeup_gen = data_available_wakeup_.generation();
  lk.unlock();

  /* wait until there is any data available */
  data_available_wakeup_.waitForWakeup(wakeup_gen);
}

void RemoteFeedReader::fillBuffers() {
  ScopedLock<std::mutex> lk(mutex_);
  auto sources = sources_;

  for (const auto& source : sources) {
    if (source->is_fetching ||
        source->read_buffer.size() >= source->max_buffer_size) {
      return;
    }

    source->is_fetching = true;

    lk.unlock();
    fillBuffer(source.get());
    lk.lock();
  }
}

Vector<Pair<String, uint64_t>> RemoteFeedReader::streamOffsets() const {
  ScopedLock<std::mutex> lk(mutex_);

  Vector<Pair<String, uint64_t>> offsets;
  for (const auto& source : sources_) {
    offsets.emplace_back(source->feed_name, source->consumed_offset);
  }

  return offsets;
}

Pair<DateTime, DateTime> RemoteFeedReader::watermarks() const {
  ScopedLock<std::mutex> lk(mutex_);

  if (sources_.size() == 0) {
    return std::make_pair(DateTime(0), DateTime(0));
  }

  uint64_t low = std::numeric_limits<uint64_t>::max();
  uint64_t high = 0;

  for (const auto& source : sources_) {
    if (source->stream_time.unixMicros() < low) {
      low = source->stream_time.unixMicros();
    }

    if (source->stream_time.unixMicros() > high) {
      high = source->stream_time.unixMicros();
    }
  }

  return std::make_pair(DateTime(low), DateTime(high));
}

DateTime RemoteFeedReader::lowWatermark() const {
  return watermarks().first;
}

DateTime RemoteFeedReader::highWatermark() const {
  return watermarks().second;
}

void RemoteFeedReader::setTimeBackfill(
    Function<DateTime (const FeedEntry& entry)> fn) {
  time_backfill_fn_ = fn;
}

void RemoteFeedReader::setMaxSpread(Duration duration) {
  max_spread_ = duration;
}

void RemoteFeedReader::exportStats(
    const String& path_prefix /* = "/fnord/feeds/reader/" */,
    stats::StatsRepository* stats_repo /* = nullptr */) {
}

}
}
+94 −0
Original line number Diff line number Diff line
/**
 * This file is part of the "FnordMetric" project
 *   Copyright (c) 2014 Paul Asmuth, Google Inc.
 *
 * FnordMetric is free software: you can redistribute it and/or modify it under
 * the terms of the GNU General Public License v3.0. You should have received a
 * copy of the GNU General Public License along with this program. If not, see
 * <http://www.gnu.org/licenses/>.
 */
#ifndef _FNORD_FEEDS_REMOTEFEEDREADER_H
#define _FNORD_FEEDS_REMOTEFEEDREADER_H
#include "fnord/base/stdtypes.h"
#include "fnord/base/option.h"
#include "fnord-rpc/RPC.h"
#include "fnord-rpc/RPCClient.h"
#include "fnord-feeds/FeedEntry.h"

namespace fnord {
namespace feeds {

class RemoteFeedReader {
public:
  static const size_t kDefaultBatchSize = 1024;
  static const size_t kDefaultMaxBufferSize = 8192;

  RemoteFeedReader(RPCClient* rpc_client);

  Option<FeedEntry> fetchNextEntry();
  void waitForNextEntry();

  void addSourceFeed(
      URI rpc_url,
      String feed_name,
      uint64_t initial_offset,
      size_t batch_size = kDefaultBatchSize,
      size_t max_buffer_size = kDefaultMaxBufferSize);

  void exportStats(
      const String& path_prefix = "/fnord/feeds/reader/",
      stats::StatsRepository* stats_repo = nullptr);

  /**
   * Return the <low, high> time watermarks
   */
  Pair<DateTime, DateTime> watermarks() const;

  /**
   * Return the current <feed_name, offset> stream offsets
   */
  Vector<Pair<String, uint64_t>> streamOffsets() const;

  DateTime lowWatermark() const;
  DateTime highWatermark() const;

  void setTimeBackfill(Function<DateTime (const FeedEntry& entry)> fn);

  void setMaxSpread(Duration duration);

protected:

  class SourceFeed : public RefCounted {
  public:
    URI rpc_url;
    String feed_name;
    unsigned max_buffer_size;
    unsigned batch_size;
    Deque<FeedEntry> read_buffer;
    bool is_fetching;
    uint64_t next_offset;
    uint64_t consumed_offset;
    DateTime stream_time;
  };

  void fillBuffers();
  void fillBuffer(SourceFeed* source);

  RPCClient* rpc_client_;
  RefPtrVector<SourceFeed> sources_;

  Wakeup data_available_wakeup_;
  mutable std::mutex mutex_;

  Function<DateTime (const FeedEntry& entry)> time_backfill_fn_;
  Duration max_spread_;

  //fnord::stats::Counter<uint64_t> stat_entries_written_total_;
  //fnord::stats::Counter<uint64_t> stat_entries_written_success_;
  //fnord::stats::Counter<uint64_t> stat_entries_written_error_;
  //fnord::stats::Counter<uint64_t> stat_entries_written_retry_;
};

}
}
#endif
Loading