Commit ba92542b authored by Paul Asmuth's avatar Paul Asmuth
Browse files

redis event playback

parent 6f82f193
Loading
Loading
Loading
Loading
+12 −5
Original line number Diff line number Diff line
@@ -20,15 +20,22 @@ class FnordQuery::RedisBackend
      end
    end
    if query.since != :now
      # playback
      q_until = query.until.is_a?(Symbol)? Time.now.to_i : query.until 
      puts "zrangebyscore #{@prefix} #{query.since} #{q_until}"
      @redis.zrangebyscore(@prefix, query.since, q_until) do |res|
        res.each do |event|
          block.call(event) if query.matches?(event)
        end
        if query.until != :stream
        # call on_finish
          on_finish
        end
      end
    end
  end

  def on_finish(&block)    

    return @on_finish = block if block_given?
    @on_finish.call() if @on_finish
  end

  def publish(message, opts={})
+8 −2
Original line number Diff line number Diff line
@@ -19,6 +19,9 @@ class FnordQuery::Query
    str.scan(X_EXTRACT) do |part|
      eval_part(*part[1..-1])
    end

    @since ||= :now
    @until ||= :now
  end

  def execute(runner, _backend)
@@ -69,7 +72,10 @@ private
  end

  def parse_time(str)
    str.to_i # FIXPAUL validate
    return :now     if str == "now"
    return :stream  if str == "stream"
    return str.to_i if str =~ /^[0-9]+$/
    raise InvalidQueryError.new("invalid time: #{str}")
  end
    
end