Commit 3e0d5234 authored by Laura Schlimmer's avatar Laura Schlimmer
Browse files
parents bf769e28 76d3cd2b
Loading
Loading
Loading
Loading
+30 −12
Original line number Diff line number Diff line
@@ -55,8 +55,6 @@ Option<FeedEntry> RemoteFeedReader::fetchNextEntry() {
  for (int i = 0; i < sources_.size(); ++i) {
    const auto& source = sources_[i];

    maybeFillBuffer(source.get());

    if (source->stream_time.unixMicros() < min_stream_time) {
      min_stream_time = source->stream_time.unixMicros();

@@ -98,14 +96,7 @@ Option<FeedEntry> RemoteFeedReader::fetchNextEntry() {
  }
}

void RemoteFeedReader::maybeFillBuffer(SourceFeed* source) {
  if (source->is_fetching ||
      source->read_buffer.size() >= source->max_buffer_size) {
    return;
  }

  source->is_fetching = true;

void RemoteFeedReader::fillBuffer(SourceFeed* source) {
#ifndef NDEBUG
  fnord::logTrace(
      "fnord.feeds.remotefeedreader",
@@ -163,15 +154,20 @@ void RemoteFeedReader::maybeFillBuffer(SourceFeed* source) {
}

void RemoteFeedReader::waitForNextEntry() {
  fillBuffers();

  ScopedLock<std::mutex> lk(mutex_);
  bool is_data_available = false;
  bool is_any_fetching = false;

  for (const auto& source : sources_) {
    maybeFillBuffer(source.get());

    if (source->read_buffer.size() > 0) {
      is_data_available = true;
    }

    if (source->is_fetching) {
      is_any_fetching = true;
    }
  }

  /* fastpath if there is data available on any feed */
@@ -179,6 +175,10 @@ void RemoteFeedReader::waitForNextEntry() {
    return;
  }

  if (!is_any_fetching) {
    return;
  }

  auto wakeup_gen = data_available_wakeup_.generation();
  lk.unlock();

@@ -186,6 +186,24 @@ void RemoteFeedReader::waitForNextEntry() {
  data_available_wakeup_.waitForWakeup(wakeup_gen);
}

void RemoteFeedReader::fillBuffers() {
  ScopedLock<std::mutex> lk(mutex_);
  auto sources = sources_;

  for (const auto& source : sources) {
    if (source->is_fetching ||
        source->read_buffer.size() >= source->max_buffer_size) {
      return;
    }

    source->is_fetching = true;

    lk.unlock();
    fillBuffer(source.get());
    lk.lock();
  }
}

Vector<Pair<String, uint64_t>> RemoteFeedReader::streamOffsets() const {
  ScopedLock<std::mutex> lk(mutex_);

+2 −1
Original line number Diff line number Diff line
@@ -71,7 +71,8 @@ protected:
    DateTime stream_time;
  };

  void maybeFillBuffer(SourceFeed* source);
  void fillBuffers();
  void fillBuffer(SourceFeed* source);

  RPCClient* rpc_client_;
  RefPtrVector<SourceFeed> sources_;
+17 −3
Original line number Diff line number Diff line
@@ -86,6 +86,15 @@ int main(int argc, const char** argv) {
      "max_spread_secs",
      "<num>");

  flags.defineFlag(
      "print_time",
      fnord::cli::FlagParser::T_SWITCH,
      false,
      NULL,
      NULL,
      "print_time",
      "");

  flags.defineFlag(
      "loglevel",
      fnord::cli::FlagParser::T_STRING,
@@ -114,6 +123,7 @@ int main(int argc, const char** argv) {
  size_t buffer_size = flags.getInt("buffer_size");
  size_t commit_size = flags.getInt("commit_size");
  size_t max_spread_secs = flags.getInt("max_spread_secs");
  size_t print_time = flags.isSet("print_time");

  /* set up input feed reader */
  feeds::RemoteFeedReader feed_reader(&rpc_client);
@@ -166,19 +176,23 @@ int main(int argc, const char** argv) {
  DateTime last_iter;
  uint64_t rate_limit_micros = 1 * kMicrosPerSecond;

  for (int i = 0; i < commit_size; ++i) {
  for (;;) {
    last_iter = WallClock::now();
    feed_reader.waitForNextEntry();

    for (;;) {
    for (int i = 0; i < commit_size; ++i) {
      auto entry = feed_reader.fetchNextEntry();

      if (entry.isEmpty()) {
        break;
      }

      if (print_time) {
        fnord::iputs("[$0] $1", entry.get().time, entry.get().data);
      } else {
        fnord::iputs("$0", entry.get().data);
      }
    }

    if (flags.isSet("statefile")) {
      auto stream_offsets = feed_reader.streamOffsets();
+0 −1
Original line number Diff line number Diff line
-> remotefeedwriter make max_spread configurable
-> timeouts
-> rpcchannel (rcpclient + servergroup_)
-> rpc numRetries/retryStrategie, onRetry(Duration delay) (instead of ready()/onError!)
+29 −0
Original line number Diff line number Diff line
@@ -51,6 +51,16 @@ bool FileUtil::isDirectory(const std::string& filename) {
  return S_ISDIR(fstat.st_mode);
}

size_t FileUtil::size(const std::string& filename) {
  struct stat fstat;

  if (stat(filename.c_str(), &fstat) < 0) {
    RAISE_ERRNO(kIOError, "fstat('%s') failed", filename.c_str());
  }

  return fstat.st_size;
}

/* The mkdir_p method was adapted from bash 4.1 */
void FileUtil::mkdir_p(const std::string& dirname) {
  char const* begin = dirname.c_str();
@@ -175,4 +185,23 @@ void FileUtil::cp(const std::string& src, const std::string& destination) {
  RAISE(kNotYetImplementedError);
}

size_t FileUtil::du_c(const std::string& path) {
  size_t size = 0;

  FileUtil::ls(path, [&path, &size] (const String& file) -> bool {
    auto filename = FileUtil::joinPaths(path, file);

    if (FileUtil::isDirectory(filename)) {
      size += FileUtil::du_c(filename);
    } else {
      size += FileUtil::size(filename);
    }

    return true;
  });

  return size;
}


}
Loading