Commit 39e0b414 authored by Paul Asmuth's avatar Paul Asmuth
Browse files

FnordMetric::STOMPAcceptor

parent 9a6828c4
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -148,6 +148,7 @@ require "fnordmetric/acceptors/tcp_acceptor"
require "fnordmetric/acceptors/udp_acceptor"
require "fnordmetric/acceptors/fyrehose_acceptor"
require "fnordmetric/acceptors/amqp_acceptor"
require "fnordmetric/acceptors/stomp_acceptor"
require "fnordmetric/widget"
require "fnordmetric/widgets/timeseries_widget"
require "fnordmetric/widgets/numbers_widget"
+7 −1
Original line number Diff line number Diff line
@@ -15,6 +15,8 @@ class FnordMetric::Acceptor
      FnordMetric::FyrehoseAcceptor
    elsif @opts[:protocol] == :amqp
      FnordMetric::AMQPAcceptor
    elsif @opts[:protocol] == :stomp
      FnordMetric::STOMPAcceptor
    else
      raise "unknown protocol: #{@opts[:protocol]}"
    end
@@ -26,7 +28,7 @@ class FnordMetric::Acceptor

    begin
      inbound_stream = inbound_class.start(@opts)
      if inbound_class == FnordMetric::AMQPAcceptor
      if inbound_class.outbound?
        FnordMetric.log "connected to #{@opts[:protocol]}://#{@opts[:listen][0..1].join(":")}"
      else
        FnordMetric.log "listening on #{@opts[:protocol]}://#{@opts[:listen][0..1].join(":")}"
@@ -37,4 +39,8 @@ class FnordMetric::Acceptor
    end
  end

  def self.outboud?
    false
  end

end
+5 −1
Original line number Diff line number Diff line
@@ -3,7 +3,7 @@ class FnordMetric::AMQPAcceptor
  def self.start(opts)
    begin
      require "amqp"
    rescue
    rescue LoadError
      FnordMetric.error("require 'amqp' failed, you need the amqp gem")
      exit 1
    end
@@ -49,4 +49,8 @@ class FnordMetric::AMQPAcceptor
    @api ||= FnordMetric::API.new(FnordMetric.options)
  end

  def self.outbound?
    true
  end

end
+71 −0
Original line number Diff line number Diff line
class FnordMetric::STOMPAcceptor

  def self.start(opts)
    begin
      require "stomp"
    rescue LoadError
      FnordMetric.error("require 'stomp' failed, you need the stomp gem")
      exit 1
    end

    new(opts)
  end

  def initialize(opts)
    @mutex = Mutex.new

    client = Stomp::Client.new(:hosts => [{
      :host => opts[:host],
      :port => opts[:port],
      :passcode => opts[:password],
      :login => opts[:username]}])

    msg_handler = lambda do |topic, msg|
      data = msg.body

      event = begin
        JSON.parse(data)
      rescue
        FnordMetric.log("[STOMP] received invalid JSON: #{data[0..60]}")
      end

      if event
        event["_type"] ||= topic.gsub(/^\/topic\//, '')
        @mutex.synchronize{ events << event }
      end
    end

    opts[:topics].each do |topic|
      client.subscribe(topic){ |data| msg_handler[topic, data] }
    end

    Thread.new do
      client.join
    end

    EM.next_tick(&method(:push_next_event))
  end

  def push_next_event
    nxt = @mutex.synchronize{ events.pop }
    unless nxt
      EM::Timer.new(0.01, &method(:push_next_event))
      return true
    end
    api.event(nxt)
    EM.next_tick(&method(:push_next_event))
  end

  def events
    @events ||= []
  end

  def api
    @api ||= FnordMetric::API.new(FnordMetric.options)
  end

  def self.outbound?
    true
  end

end
+1 −2
Original line number Diff line number Diff line
@@ -2,7 +2,6 @@ Todos
=====

  → bugfix: widgets w/o gauges
  → stomp acceptor
  → js api (<div data-gauge="...">)
  → bignumberwidget
  → exceptions gauge
@@ -19,6 +18,6 @@ Done
====

  → gauge human_titles, units, scale_by
  → added amqp acceptor, fyrehose acceptor
  → added stomp acceptor, amqp acceptor, fyrehose acceptor
  → explicit dashboard initialization with options -> dashboard grouping
  → bugfix: lot's of small bugfixes