Commit 185fd36c authored by Paul Asmuth's avatar Paul Asmuth
Browse files

Revert "recursive mutex in remotefeedreader"

This reverts commit 7a9e25412ddb972ce076fc8bf178bcc3e357cd33.
parent 96711297
Loading
Loading
Loading
Loading
+8 −6
Original line number Diff line number Diff line
@@ -48,7 +48,7 @@ void RemoteFeedReader::addSourceFeed(
}

Option<FeedEntry> RemoteFeedReader::fetchNextEntry() {
  ScopedLock<std::recursive_mutex> lk(mutex_);
  ScopedLock<std::mutex> lk(mutex_);
  int idx = -1;
  uint64_t min_stream_time = std::numeric_limits<uint64_t>::max();

@@ -124,7 +124,7 @@ void RemoteFeedReader::maybeFillBuffer(SourceFeed* source) {
  rpc_client_->call(source->rpc_url, rpc.get());

  rpc->onSuccess([this, source] (const decltype(rpc)::ValueType& r) mutable {
    ScopedLock<std::recursive_mutex> lk(mutex_);
    ScopedLock<std::mutex> lk(mutex_);

    for (const auto& e : r.result()) {
      auto entry = e;
@@ -140,13 +140,15 @@ void RemoteFeedReader::maybeFillBuffer(SourceFeed* source) {

    if (source->read_buffer.size() > 0) {
      source->next_offset = source->read_buffer.back().next_offset;
      lk.unlock();
      data_available_wakeup_.wakeup();
    }
  });

  rpc->onError([this, source] (const Status& status) {
    ScopedLock<std::recursive_mutex> lk(mutex_);
    ScopedLock<std::mutex> lk(mutex_);
    source->is_fetching = false;
    lk.unlock();

    logError(
        "fnord.feeds.remotefeedreader",
@@ -161,7 +163,7 @@ void RemoteFeedReader::maybeFillBuffer(SourceFeed* source) {
}

void RemoteFeedReader::waitForNextEntry() {
  ScopedLock<std::recursive_mutex> lk(mutex_);
  ScopedLock<std::mutex> lk(mutex_);
  bool is_data_available = false;

  for (const auto& source : sources_) {
@@ -185,7 +187,7 @@ void RemoteFeedReader::waitForNextEntry() {
}

Vector<Pair<String, uint64_t>> RemoteFeedReader::streamOffsets() const {
  ScopedLock<std::recursive_mutex> lk(mutex_);
  ScopedLock<std::mutex> lk(mutex_);

  Vector<Pair<String, uint64_t>> offsets;
  for (const auto& source : sources_) {
@@ -196,7 +198,7 @@ Vector<Pair<String, uint64_t>> RemoteFeedReader::streamOffsets() const {
}

Pair<DateTime, DateTime> RemoteFeedReader::watermarks() const {
  ScopedLock<std::recursive_mutex> lk(mutex_);
  ScopedLock<std::mutex> lk(mutex_);

  if (sources_.size() == 0) {
    return std::make_pair(DateTime(0), DateTime(0));
+1 −1
Original line number Diff line number Diff line
@@ -77,7 +77,7 @@ protected:
  RefPtrVector<SourceFeed> sources_;

  Wakeup data_available_wakeup_;
  mutable std::recursive_mutex mutex_;
  mutable std::mutex mutex_;

  Function<DateTime (const FeedEntry& entry)> time_backfill_fn_;
  Duration max_spread_;