Commit 322d1356 authored by Paul Asmuth's avatar Paul Asmuth
Browse files

Added FyrehoseAcceptor (github.com/paulasmuth/fyrehose)

parent ae95157a
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -146,6 +146,7 @@ require "fnordmetric/web/dashboard"
require "fnordmetric/acceptors/acceptor"
require "fnordmetric/acceptors/tcp_acceptor"
require "fnordmetric/acceptors/udp_acceptor"
require "fnordmetric/acceptors/fyrehose_acceptor"
require "fnordmetric/widget"
require "fnordmetric/widgets/timeseries_widget"
require "fnordmetric/widgets/numbers_widget"
+11 −6
Original line number Diff line number Diff line
@@ -9,8 +9,12 @@ class FnordMetric::Acceptor
  def initialized
    inbound_class = if @opts[:protocol] == :udp
      FnordMetric::UDPAcceptor
    else
    elsif @opts[:protocol] == :tcp
      FnordMetric::TCPAcceptor
    elsif @opts[:protocol] == :fyrehose
      FnordMetric::FyrehoseAcceptor
    else
      raise "unknown protocol: #{@opts[:protocol]}"
    end

    @opts[:listen] = [
@@ -21,8 +25,9 @@ class FnordMetric::Acceptor
    begin
      inbound_stream = inbound_class.start(@opts)
      FnordMetric.log "listening on #{@opts[:protocol]}://#{@opts[:listen][0..1].join(":")}"
    #rescue
    #  FnordMetric.log "cant start #{inbound_class.name}. port in use?"
    rescue Exception => e
      raise e if ENV["FNORDMETRIC_ENV"] == "dev"
      FnordMetric.log "cant start #{inbound_class.name} on #{@opts[:protocol]}://#{@opts[:listen][0..1].join(":")}. port in use?"
    end
  end

+39 −0
Original line number Diff line number Diff line
class FnordMetric::FyrehoseAcceptor

  def self.start(opts)
    require "fyrehose"
    require "fyrehose/reactor"

    new(opts)
  end

  def initialize(opts)
    reactor = EM.connect(opts[:host], opts[:port], Fyrehose::Reactor)

    reactor.on_message do |channel, data|
      event = JSON.parse(data)
      event["_type"] ||= channel
      events << event
      push_next_event
    end

    opts[:channels].each do |channel|
      reactor.subscribe(channel)
    end
  end

  def push_next_event
    return true if events.empty?
    api.event(@events.pop)
    EM.next_tick(&method(:push_next_event))
  end

  def events
    @events ||= []
  end

  def api
    @api ||= FnordMetric::API.new(FnordMetric.options)
  end

end