Commit 28357be4 authored by Paul Asmuth's avatar Paul Asmuth
Browse files

Merge branch 'stream_based' of github.com:paulasmuth/fnordmetric2 into stream_based

parents 485ce00f 56c2cbdc
Loading
Loading
Loading
Loading
+11 −17
Original line number Diff line number Diff line
@@ -74,15 +74,10 @@ module FnordMetric
  @@namespaces = {}
  @@server_configuration = nil

  @@chan_feed     = EM::Channel.new
  @@chan_upstream = EM::Channel.new
  @@firehose    = EM::Channel.new

  def self.chan_feed
    @@chan_feed 
  end

  def self.chan_upstream
    @@chan_upstream 
  def self.backend
    FnordMetric::RedisBackend.new(options)
  end

  def self.namespace(key=nil, &block)
@@ -122,8 +117,6 @@ module FnordMetric
      opts = options(opts)
      app = embedded(opts)

      @backend = FnordMetric::RedisBackend.new(opts)

      if opts[:web_interface]
        server = opts[:web_interface_server].downcase

@@ -142,9 +135,7 @@ module FnordMetric
        
        FnordMetric::WebSocket.new(
          :host => host, 
          :port => (port.to_i+1),
          :chan_upstream => @chan_upstream,
          :chan_feed => @chan_feed
          :port => (port.to_i+1)
        ) && log("listening on ws://#{host}:#{port.to_i+1}")

      end
@@ -205,10 +196,12 @@ module FnordMetric
    end

    EM.next_tick do
      if opts[:start_worker]
        worker = Worker.new(@@namespaces.clone, opts)
        worker.ready!
      end

      # FIXPAUL: this is re-instantiating all gauges. why?
      #if opts[:start_worker]
      #  worker = Worker.new(@@namespaces.clone, opts)
      #  worker.ready!
      #end

      if opts[:inbound_stream]
        inbound_class = opts[:inbound_protocol] == :udp ? InboundDatagram : InboundStream
@@ -235,6 +228,7 @@ end


require "fnordmetric/backends/redis_backend"
require "fnordmetric/backends/memory_backend"

require "fnordmetric/api"
require "fnordmetric/udp_client"
+31 −0
Original line number Diff line number Diff line
class FnordMetric::MemoryBackend

  def initialize(opts)
    @subscriptions = []
  end

  def subscribe(&block)
    sid = channel.subscribe do |message|
      block.call(message)
    end

    @subscriptions.push(sid)
  end

  def publish(message)
    channel.push(message)
  end

  def hangup
    @subscriptions.each do |sid|
      channel.unsubscribe(sid)
    end
  end

private

  def channel
    $fm_channel ||= EM::Channel.new
  end

end
 No newline at end of file
+21 −8
Original line number Diff line number Diff line
class FnordMetric::RedisBackend

  def initialize(opts)
  	sub_redis = EM::Hiredis.connect(opts[:redis_url])
    pub_redis = EM::Hiredis.connect(opts[:redis_url])
    @redis_channel = opts[:redis_prefix]
    @redis = EM::Hiredis.connect(opts[:redis_url])
    @pub_redis = EM::Hiredis.connect(opts[:redis_url])

    sub_redis.subscribe(opts[:redis_prefix])
    @redis.subscribe(@redis_channel)
  end

    sub_redis.on(:message) do |channel, message|
      FnordMetric.chan_feed.push(JSON.parse(message)) 
  def subscribe(&block)
    @redis.on(:message) do |chan, raw|
      begin
        message = JSON.parse(raw)
      rescue
        puts "redisbackend: received invalid json"
      else
        block.call(message)
      end
    end
  end

    FnordMetric.chan_upstream.subscribe do |message|
      pub_redis.publish(opts[:redis_prefix], message.to_json)
  def publish(message)
    @pub_redis.publish(@redis_channel, message.to_json)
  end

  def hangup
    @redis.close
  end

end
 No newline at end of file
+1 −0
Original line number Diff line number Diff line
@@ -121,6 +121,7 @@ class FnordMetric::Namespace
  def opt_multigauge(gauge_type, gauge_key, opts={})
    opts.merge!(:key => gauge_key, :key_prefix => key_prefix)
    klass = "FnordMetric::#{gauge_type.to_s.camelize}"
    @gauges[gauge_key].try(:hangup)
    @gauges[gauge_key] ||= klass.constantize.new(opts)   
  end

+8 −2
Original line number Diff line number Diff line
@@ -30,18 +30,24 @@ class FnordMetric::NumericGauge < FnordMetric::MultiGauge
  end

  def process!(event)
    sleep 2
    resp = if event["widget"] == "total_timeline"
      event.merge(
        :values => Hash[series_count_gauges.map do |_skey, _series|
          gauge = fetch_gauge(_series[event["tick"].to_i])
          vals = gauge.values_at(event["ticks"]) rescue {}
          vals = {}
          event["ticks"].each{ |_tick| vals[_tick.to_i] ||= 0 } 
          vals.merge!(gauge.values_at(event["ticks"]))
          [_skey, vals]
        end]
      )
    end

    if resp
      resp.merge!("_class" => "response")
      resp.merge!(
        "_class" => "response",
        "_sender" => @uuid
      )
      resp.delete("ticks")
      respond(resp)
    end
Loading