Commit 838f7eb1 authored by Paul Asmuth's avatar Paul Asmuth
Browse files

make max_spread configurable

parent a57f64b8
Loading
Loading
Loading
Loading
+7 −3
Original line number Diff line number Diff line
@@ -20,7 +20,8 @@ namespace feeds {
RemoteFeedReader::RemoteFeedReader(
    RPCClient* rpc_client) :
    rpc_client_(rpc_client),
    time_backfill_fn_(nullptr) {}
    time_backfill_fn_(nullptr),
    max_spread_(1 * kMicrosPerSecond) {}

void RemoteFeedReader::addSourceFeed(
    URI rpc_url,
@@ -50,7 +51,6 @@ Option<FeedEntry> RemoteFeedReader::fetchNextEntry() {
  ScopedLock<std::mutex> lk(mutex_);
  int idx = -1;
  uint64_t min_stream_time = std::numeric_limits<uint64_t>::max();
  Duration max_spread(3600 * kMicrosPerSecond);

  for (int i = 0; i < sources_.size(); ++i) {
    const auto& source = sources_[i];
@@ -69,7 +69,7 @@ Option<FeedEntry> RemoteFeedReader::fetchNextEntry() {
  }

  if (idx == -1) {
    min_stream_time += max_spread.microseconds();
    min_stream_time += max_spread_.microseconds();

    for (int i = 0; i < sources_.size(); ++i) {
      const auto& source = sources_[i];
@@ -233,6 +233,10 @@ void RemoteFeedReader::setTimeBackfill(
  time_backfill_fn_ = fn;
}

void RemoteFeedReader::setMaxSpread(Duration duration) {
  max_spread_ = duration;
}

void RemoteFeedReader::exportStats(
    const String& path_prefix /* = "/fnord/feeds/reader/" */,
    stats::StatsRepository* stats_repo /* = nullptr */) {
+3 −0
Original line number Diff line number Diff line
@@ -54,6 +54,8 @@ public:

  void setTimeBackfill(Function<DateTime (const FeedEntry& entry)> fn);

  void setMaxSpread(Duration duration);

protected:

  class SourceFeed : public RefCounted {
@@ -78,6 +80,7 @@ protected:
  mutable std::mutex mutex_;

  Function<DateTime (const FeedEntry& entry)> time_backfill_fn_;
  Duration max_spread_;

  //fnord::stats::Counter<uint64_t> stat_entries_written_total_;
  //fnord::stats::Counter<uint64_t> stat_entries_written_success_;
+11 −0
Original line number Diff line number Diff line
@@ -77,6 +77,15 @@ int main(int argc, const char** argv) {
      "commit_size",
      "<num>");

  flags.defineFlag(
      "max_spread_secs",
      fnord::cli::FlagParser::T_INTEGER,
      false,
      NULL,
      "10",
      "max_spread_secs",
      "<num>");

  flags.defineFlag(
      "loglevel",
      fnord::cli::FlagParser::T_STRING,
@@ -104,9 +113,11 @@ int main(int argc, const char** argv) {
  size_t batch_size = flags.getInt("batch_size");
  size_t buffer_size = flags.getInt("buffer_size");
  size_t commit_size = flags.getInt("commit_size");
  size_t max_spread_secs = flags.getInt("max_spread_secs");

  /* set up input feed reader */
  feeds::RemoteFeedReader feed_reader(&rpc_client);
  feed_reader.setMaxSpread(max_spread_secs * kMicrosPerSecond);

  /* get source urls */
  Vector<String> uris = flags.getArgv();