Commit c36c362d authored by Paul Asmuth's avatar Paul Asmuth
Browse files

pubsub backend refactoring wip

parent 32025673
Loading
Loading
Loading
Loading
+4 −13
Original line number Diff line number Diff line
@@ -72,15 +72,10 @@ module FnordMetric
  @@namespaces = {}
  @@server_configuration = nil

  @@chan_feed     = EM::Channel.new
  @@chan_upstream = EM::Channel.new
  @@firehose    = EM::Channel.new

  def self.chan_feed
    @@chan_feed 
  end

  def self.chan_upstream
    @@chan_upstream 
  def self.backend
    FnordMetric::RedisBackend.new(options)
  end

  def self.namespace(key=nil, &block)
@@ -120,8 +115,6 @@ module FnordMetric
      opts = options(opts)
      app = embedded(opts)

      @backend = FnordMetric::RedisBackend.new(opts)

      if opts[:web_interface]
        server = opts[:web_interface_server].downcase

@@ -140,9 +133,7 @@ module FnordMetric
        
        FnordMetric::WebSocket.new(
          :host => host, 
          :port => (port.to_i+1),
          :chan_upstream => @chan_upstream,
          :chan_feed => @chan_feed
          :port => (port.to_i+1)
        ) && log("listening on ws://#{host}:#{port.to_i+1}")

      end
+18 −8
Original line number Diff line number Diff line
class FnordMetric::RedisBackend

  def initialize(opts)
  	sub_redis = EM::Hiredis.connect(opts[:redis_url])
    pub_redis = EM::Hiredis.connect(opts[:redis_url])
    @callback = nil
    @redis_channel = opts[:redis_prefix]

    sub_redis.subscribe(opts[:redis_prefix])
  	@sub_redis = EM::Hiredis.connect(opts[:redis_url])
    @pub_redis = EM::Hiredis.connect(opts[:redis_url])

    sub_redis.on(:message) do |channel, message|
      FnordMetric.chan_feed.push(JSON.parse(message)) 
    @sub_redis.subscribe(@redis_channel)
  end

    FnordMetric.chan_upstream.subscribe do |message|
      pub_redis.publish(opts[:redis_prefix], message.to_json)
  def subscribe(&block)
    @sub_redis.on(:message) do |chan, message|
      block.call(message)
    end
  end

  def publish(message)
    @pub_redis.publish(@redis_channel, message.to_json)
  end

  def hangup
    @pub_redis.close
    @sub_redis.close
  end

end
 No newline at end of file
+4 −1
Original line number Diff line number Diff line
@@ -25,6 +25,7 @@ class FnordMetric::NumericGauge < FnordMetric::MultiGauge
  end

  def react(event)
    puts "REACT TO: #{event.inspect}"
    render! if event["_class"] == "render_request"
    process!(event) if event["_class"] == "request"
  end
@@ -34,7 +35,9 @@ class FnordMetric::NumericGauge < FnordMetric::MultiGauge
      event.merge(
        :values => Hash[series_count_gauges.map do |_skey, _series|
          gauge = fetch_gauge(_series[event["tick"].to_i])
          vals = gauge.values_at(event["ticks"]) rescue {}
          vals = {}
          event["ticks"].each{ |_tick| vals[_tick.to_i] ||= 0 } 
          vals.merge!(gauge.values_at(event["ticks"]))
          [_skey, vals]
        end]
      )
+6 −5
Original line number Diff line number Diff line
class FnordMetric::RemoteGauge

  def initialize(opts)
  	FnordMetric.chan_upstream.subscribe do |message|
  	  react(message) if message["_channel"] == name.to_s
    end
  	#FnordMetric.firehose.subscribe do |message|
    #  puts "RECEIVED: #{message.inspect}"
  	#  react(message) if message["_channel"] == name.to_s
    #end
  end

  def name
    raise NotYetImplementedError
  end

  def react
  def react(event)
    raise NotYetImplementedError
  end

@@ -18,7 +19,7 @@ private

  def respond(message)
  	message["_channel"] ||= name
  	FnordMetric.chan_upstream.push(message)
  	#FnordMetric.firehose.push(message)
  end

end
 No newline at end of file
+4 −0
Original line number Diff line number Diff line
@@ -32,6 +32,10 @@ class FnordMetric::ToplistGauge < FnordMetric::MultiGauge
    end
  end

  def react(event)

  end

  def observe(*args)
    ctx = args.delete_at(0)

Loading