Commit 133071f9 authored by Paul Asmuth's avatar Paul Asmuth
Browse files

fix deadlock on RemoteFeedReader (onSuccess races with the rpc response. if...

fix deadlock on RemoteFeedReader (onSuccess races with the rpc response. if the rpc returns before onSuccess is called, the onSuccess handler executes the passed closure on it's own stack, the success handler tries to grab the (still held) mutex and deadlock occurs)
parent f42149a8
Loading
Loading
Loading
Loading
+21 −12
Original line number Diff line number Diff line
@@ -55,8 +55,6 @@ Option<FeedEntry> RemoteFeedReader::fetchNextEntry() {
  for (int i = 0; i < sources_.size(); ++i) {
    const auto& source = sources_[i];

    maybeFillBuffer(source.get());

    if (source->stream_time.unixMicros() < min_stream_time) {
      min_stream_time = source->stream_time.unixMicros();

@@ -98,14 +96,7 @@ Option<FeedEntry> RemoteFeedReader::fetchNextEntry() {
  }
}

void RemoteFeedReader::maybeFillBuffer(SourceFeed* source) {
  if (source->is_fetching ||
      source->read_buffer.size() >= source->max_buffer_size) {
    return;
  }

  source->is_fetching = true;

void RemoteFeedReader::fillBuffer(SourceFeed* source) {
#ifndef NDEBUG
  fnord::logTrace(
      "fnord.feeds.remotefeedreader",
@@ -163,13 +154,13 @@ void RemoteFeedReader::maybeFillBuffer(SourceFeed* source) {
}

void RemoteFeedReader::waitForNextEntry() {
  fillBuffers();

  ScopedLock<std::mutex> lk(mutex_);
  bool is_data_available = false;
  bool is_any_fetching = false;

  for (const auto& source : sources_) {
    maybeFillBuffer(source.get());

    if (source->read_buffer.size() > 0) {
      is_data_available = true;
    }
@@ -195,6 +186,24 @@ void RemoteFeedReader::waitForNextEntry() {
  data_available_wakeup_.waitForWakeup(wakeup_gen);
}

void RemoteFeedReader::fillBuffers() {
  ScopedLock<std::mutex> lk(mutex_);
  auto sources = sources_;

  for (const auto& source : sources) {
    if (source->is_fetching ||
        source->read_buffer.size() >= source->max_buffer_size) {
      return;
    }

    source->is_fetching = true;

    lk.unlock();
    fillBuffer(source.get());
    lk.lock();
  }
}

Vector<Pair<String, uint64_t>> RemoteFeedReader::streamOffsets() const {
  ScopedLock<std::mutex> lk(mutex_);

+2 −1
Original line number Diff line number Diff line
@@ -71,7 +71,8 @@ protected:
    DateTime stream_time;
  };

  void maybeFillBuffer(SourceFeed* source);
  void fillBuffers();
  void fillBuffer(SourceFeed* source);

  RPCClient* rpc_client_;
  RefPtrVector<SourceFeed> sources_;