Commit ee85130a authored by Paul Asmuth's avatar Paul Asmuth
Browse files

finished numeric gauge

parent f086bd22
Loading
Loading
Loading
Loading
+12 −2
Original line number Diff line number Diff line
@@ -21,8 +21,18 @@ class FnordMetric::RedisBackend
  end

  def publish(message)
    out = message.is_a?(String) ? message : message.to_json
    @pub_redis.publish(@redis_channel, out)
    if message.is_a?(String)
      begin
        message = JSON.parse(message)
      rescue
        puts "redisbackend: published invalid json"
      end
    end

    message["_time"] ||= Time.now.to_i
    message["_channel"] ||= "inbound"
    
    @pub_redis.publish(@redis_channel, message.to_json)
  end

  def hangup
+0 −14
Original line number Diff line number Diff line
@@ -79,20 +79,6 @@ protected
    FnordMetric.error!(msg)
  end

  def assure_two_dimensional!(gauge)
    return true if gauge.two_dimensional?
    error! "error: #{caller[0].split(" ")[-1]} can only be used with 2-dimensional gauges" 
  end

  def assure_three_dimensional!(gauge)
    return true unless gauge.two_dimensional?
    error! "error: #{caller[0].split(" ")[-1]} can only be used with 3-dimensional gauges" 
  end

  def assure_non_progressive!(gauge)
    return true unless gauge.progressive?
    error! "error: #{caller[0].split(" ")[-1]} can only be used with non-progressive gauges" 
  end

end
    
+5 −4
Original line number Diff line number Diff line
class FnordMetric::Gauge
  
  include FnordMetric::GaugeCalculations
  include FnordMetric::GaugeModifiers

  def initialize(opts)
    opts.fetch(:key) && opts.fetch(:key_prefix)
@@ -55,12 +56,12 @@ class FnordMetric::Gauge
    !!@opts[:average]
  end

  def add_redis(_redis)
    @redis = _redis
  def redis
    @redis ||= EM::Hiredis.connect(FnordMetric.options[:redis_url]) # FIXPAUL
  end

  def redis
    @redis || @opts[:redis]
  def sync_redis
    @sync_redis ||= Redis.new # FIXPAUL
  end

  def ticks_in(r)
+7 −7
Original line number Diff line number Diff line
module FnordMetric::GaugeCalculations

  @@avg_per_session_proc = proc{ |_v, _t|
    (_v.to_f / (redis.get(tick_key(_t, :"sessions-count"))||1).to_i)
    (_v.to_f / (sync_redis.get(tick_key(_t, :"sessions-count"))||1).to_i)
  }

  @@count_per_session_proc = proc{ |_v, _t|
    (redis.get(tick_key(_t, :"sessions-count"))||0).to_i
    (sync_redis.get(tick_key(_t, :"sessions-count"))||0).to_i
  }

  @@avg_per_count_proc = proc{ |_v, _t|
    (_v.to_f / (redis.get(tick_key(_t, :"value-count"))||1).to_i)
    (_v.to_f / (sync_redis.get(tick_key(_t, :"value-count"))||1).to_i)
  }

  def value_at(time, opts={}, &block)
@@ -18,7 +18,7 @@ module FnordMetric::GaugeCalculations
    _v = if respond_to?(:_value_at)
      _value_at(key, _t)
    else
      redis.hget(key, _t)
      sync_redis.hget(key, _t)
    end

    calculate_value(_v, _t, opts, block)
@@ -30,7 +30,7 @@ module FnordMetric::GaugeCalculations
      if respond_to?(:_values_at)
        _values_at(times, opts={}, &block)
      else
        redis.hmget(key, *times)
        sync_redis.hmget(key, *times)
      end.each_with_index do |_v, _n|
        _t = times[_n]
        ret[_t] = calculate_value(_v, _t, opts, block)
@@ -56,7 +56,7 @@ module FnordMetric::GaugeCalculations

  def field_values_at(time, opts={}, &block)
    opts[:max_fields] ||= 50
    redis.zrevrange(
    sync_redis.zrevrange(
      tick_key(time), 
      0, opts[:max_fields]-1, 
      :withscores => true
@@ -66,7 +66,7 @@ module FnordMetric::GaugeCalculations
  end

  def field_values_total(time)
    (redis.get(tick_key(time, :count))||0).to_i
    (sync_redis.get(tick_key(time, :count))||0).to_i
  end

end
 No newline at end of file
+50 −40
Original line number Diff line number Diff line
module FnordMetric::GaugeModifiers

  def incr(gauge_name, value=1)
    gauge = fetch_gauge(gauge_name)
    assure_two_dimensional!(gauge)
    if gauge.unique? 
      incr_uniq(gauge, value)
    elsif gauge.average? 
      incr_avg(gauge, value)
  def incr(time, value=1)
    assure_two_dimensional!(self)
    if self.unique? 
      incr_uniq(time, value)
    elsif self.average? 
      incr_avg(time, value)
    else
      incr_tick(gauge, value)
      incr_tick(time, value)
    end
  end

  def incr_tick(gauge, value)
    if gauge.progressive?      
      @redis.incrby(gauge.key(:head), value).callback do |head|
        @redis.hsetnx(gauge.key, gauge.tick_at(time), head).callback do |_new|
          @redis.hincrby(gauge.key, gauge.tick_at(time), value) unless _new
  def incr_tick(time, value)
    if self.progressive?      
      redis.incrby(self.key(:head), value).callback do |head|
        redis.hsetnx(self.key, self.tick_at(time), head).callback do |_new|
          redis.hincrby(self.key, self.tick_at(time), value) unless _new
        end
      end
    else
      @redis.hsetnx(gauge.key, gauge.tick_at(time), 0).callback do
        @redis.hincrby(gauge.key, gauge.tick_at(time), value)
      redis.hsetnx(self.key, self.tick_at(time), 0).callback do
        redis.hincrby(self.key, self.tick_at(time), value)
      end
    end
  end  

  def incr_uniq(gauge, value, field_name=nil)
  def incr_uniq(time, value, field_name=nil)
    return false if session_key.blank?
    @redis.sadd(gauge.tick_key(time, :sessions), session_key).callback do |_new|
      @redis.expire(gauge.tick_key(time, :sessions), gauge.tick)
    redis.sadd(self.tick_key(time, :sessions), session_key).callback do |_new|
      redis.expire(self.tick_key(time, :sessions), self.tick)
      if (_new == 1) || (_new == true) #redis vs. em-redis
        @redis.incr(gauge.tick_key(time, :"sessions-count")).callback do |sc|
          field_name ? incr_field_by(gauge, field_name, value) : incr_tick(gauge, value)
        redis.incr(time, self.tick_key(time, :"sessions-count")).callback do |sc|
          field_name ? incr_field_by(time, field_name, value) : incr_tick(time, value)
        end
      end
    end
  end

  def incr_avg(gauge, value)
    @redis.incr(gauge.tick_key(time, :"value-count")).callback do
      incr_tick(gauge, value)
  def incr_avg(time, value)
    redis.incr(self.tick_key(time, :"value-count")).callback do
      incr_tick(time, value)
    end
  end

  def incr_field(gauge_name, field_name, value=1)
    gauge = fetch_gauge(gauge_name)
    assure_three_dimensional!(gauge)
    if gauge.unique? 
      incr_uniq(gauge, value, field_name)
  def incr_field(time, field_name, value=1)
    assure_three_dimensional!(self)
    if self.unique? 
      incr_uniq(time, value, field_name)
    else
      incr_field_by(gauge, field_name, value)
      incr_field_by(time, field_name, value)
    end
  end

  def incr_field_by(gauge, field_name, value)
    @redis.zincrby(gauge.tick_key(time), value, field_name).callback do
      @redis.incrby(gauge.tick_key(time, :count), 1)
  def incr_field_by(time, field_name, value)
    redis.zincrby(self.tick_key(time), value, field_name).callback do
      redis.incrby(self.tick_key(time, :count), 1)
    end
  end  

  def set_value(gauge_name, value)
    gauge = fetch_gauge(gauge_name)
    assure_two_dimensional!(gauge)
    @redis.hset(gauge.key, gauge.tick_at(time), value)
  def set_value(time, value)
    assure_two_dimensional!(self)
    redis.hset(self.key, self.tick_at(time), value)
  end

  def set_field(gauge_name, field_name, value)
    gauge = fetch_gauge(gauge_name)
    assure_three_dimensional!(gauge)
    @redis.zadd(gauge.tick_key(time), value, field_name)
  def set_field(time, field_name, value)
    assure_three_dimensional!(self)
    redis.zadd(self.tick_key(time), value, field_name)
  end

  def assure_two_dimensional!(gauge)
    return true if gauge.two_dimensional?
    error! "error: #{caller[0].split(" ")[-1]} can only be used with 2-dimensional gauges" 
  end

  def assure_three_dimensional!(gauge)
    return true unless gauge.two_dimensional?
    error! "error: #{caller[0].split(" ")[-1]} can only be used with 3-dimensional gauges" 
  end

  def assure_non_progressive!(gauge)
    return true unless gauge.progressive?
    error! "error: #{caller[0].split(" ")[-1]} can only be used with non-progressive gauges" 
  end

end
 No newline at end of file
Loading