Commit 0ff32680 authored by Paul Asmuth's avatar Paul Asmuth
Browse files

feed iface improvements

parent 661aca11
Loading
Loading
Loading
Loading
+4 −0
Original line number Diff line number Diff line
@@ -41,6 +41,10 @@ public:
  virtual Future<bool> appendEntry(const String& entry_data) = 0;

  virtual Future<Option<FeedEntry>> fetchEntry(const FeedOffset& offset) = 0;
  virtual Future<Vector<FeedEntry>> fetchEntries(
      const FeedOffset& offset,
      int batch_size) = 0;

  virtual Future<Option<FeedEntry>> fetchNextEntry(const FeedEntry& entry) = 0;
  virtual Future<Option<FeedEntry>> fetchFirstEntry() = 0;
  virtual Future<Option<FeedEntry>> fetchLastEntry() = 0;
+63 −1
Original line number Diff line number Diff line
@@ -47,7 +47,69 @@ Future<bool> LogStreamServiceFeed::appendEntry(const String& entry) {

Future<Option<LogStreamServiceFeed::FeedEntry>>
    LogStreamServiceFeed::fetchEntry(const FeedOffset& offset) {
  RAISE(kNotYetImplementedError);
  Promise<Option<fnord::comm::Feed::FeedEntry>> promise;

  auto rpc = fnord::comm::mkRPC(
      &LogStreamService::fetch,
      name(),
      (uint64_t) std::stoul(offset),
      (int) 1);

  rpc->call(rpc_channel_);

  rpc->onSuccess([promise] (const decltype(rpc)::ValueType& r) mutable {
    if (r.result().size() == 1) {
      comm::Feed::FeedEntry entry;
      entry.offset = StringUtil::toString(r.result()[0].offset);
      entry.next_offset = StringUtil::toString(r.result()[0].next_offset);
      entry.entry_data = StringUtil::toString(r.result()[0].data);

      promise.success(Some<fnord::comm::Feed::FeedEntry>(entry));
    } else {
      promise.success(None<fnord::comm::Feed::FeedEntry>());
    }
  });

  rpc->onError([promise] (const Status& status) mutable {
    promise.failure(status);
  });

  return promise.future();
}

Future<Vector<LogStreamServiceFeed::FeedEntry>>
    LogStreamServiceFeed::fetchEntries(
    const FeedOffset& offset,
    int batch_size) {
  Promise<Vector<fnord::comm::Feed::FeedEntry>> promise;

  auto rpc = fnord::comm::mkRPC(
      &LogStreamService::fetch,
      name(),
      (uint64_t) std::stoul(offset),
      (int) batch_size);

  rpc->call(rpc_channel_);

  rpc->onSuccess([promise] (const decltype(rpc)::ValueType& r) mutable {
    Vector<comm::Feed::FeedEntry> entries;

    for (const auto& e : r.result()) {
      comm::Feed::FeedEntry entry;
      entry.offset = StringUtil::toString(e.offset);
      entry.next_offset = StringUtil::toString(e.next_offset);
      entry.entry_data = StringUtil::toString(e.data);
      entries.emplace_back(entry);
    }

    promise.success(entries);
  });

  rpc->onError([promise] (const Status& status) mutable {
    promise.failure(status);
  });

  return promise.future();
}

Future<Option<LogStreamServiceFeed::FeedEntry>>
+4 −0
Original line number Diff line number Diff line
@@ -34,6 +34,10 @@ public:
  Future<Option<FeedEntry>> fetchFirstEntry() override;
  Future<Option<FeedEntry>> fetchLastEntry() override;

  Future<Vector<FeedEntry>> fetchEntries(
      const FeedOffset& offset,
      int batch_size) override;

  void setOption(const String& optname, const String& optval) override;

protected: