Commit 3c608a71 authored by Paul Asmuth's avatar Paul Asmuth
Browse files

backported worker

parent b5c36742
Loading
Loading
Loading
Loading
+17 −1
Original line number Diff line number Diff line
$: << ::File.expand_path("../../../fnordmetric/lib/", __FILE__)
require "fnordmetric"

FnordMetric.options = {
  :event_queue_ttl  => 10, # all data that isn't processed within 10s is discarded to prevent memory overruns
  :event_data_ttl   => 10,
  :session_data_ttl => 1,  # we don't care about session data for now
  :redis_prefix => "fnordmetric-dawanda" 
}

FnordMetric.namespace :ulm do

  gauge :skip_votes, :tick => 1.day.to_i, :title => "Skip-Votes"
@@ -61,4 +68,13 @@ FnordMetric.options = {
  :print_stats => 3
}

FnordMetric.standalone
FnordMetric::Logger.new(
  :file => '/home/paul/fnordmetric.log',
  :channels => ["dawanda-firehose"]
)

FnordMetric::Web.new(:port => 4242)

FnordMetric::Acceptor.new(:protocol => :tcp, :port => 2323)

FnordMetric.run
 No newline at end of file
+2 −12
Original line number Diff line number Diff line
@@ -114,18 +114,7 @@ module FnordMetric

  def self.standalone
    puts "DEPRECATION WARNING - FIXPAUL"

    FnordMetric::Web.new(
      :host => options[:web_interface][0],
      :port => options[:web_interface][1]
    )

    FnordMetric::Acceptor.new(
      :protocol => options[:inbound_protocol],
      :host => options[:inbound_stream][0],
      :port => options[:inbound_stream][1]
    )

    require "fnordmetric/standalone"
    start_em
  end

@@ -142,6 +131,7 @@ require "fnordmetric/gauge"
require "fnordmetric/context"

require "fnordmetric/api"
require "fnordmetric/worker"

require "fnordmetric/web/web"
require "fnordmetric/web/app_helpers"
+41 −37
Original line number Diff line number Diff line
module FnordMetric::GaugeModifiers

  def incr(time, value=1)
    assure_two_dimensional!(self)
    if self.unique? 
      incr_uniq(time, value)
    elsif self.average? 
      incr_avg(time, value)
  def incr(gauge_name, value=1)
    gauge = fetch_gauge(gauge_name)
    assure_two_dimensional!(gauge)
    if gauge.unique? 
      incr_uniq(gauge, value)
    elsif gauge.average? 
      incr_avg(gauge, value)
    else
      incr_tick(time, value)
      incr_tick(gauge, value)
    end
  end

  def incr_tick(time, value)
    if self.progressive?      
      redis.incrby(self.key(:head), value).callback do |head|
        redis.hsetnx(self.key, self.tick_at(time), head).callback do |_new|
          redis.hincrby(self.key, self.tick_at(time), value) unless _new
  def incr_tick(gauge, value)
    if gauge.progressive?      
      @redis.incrby(gauge.key(:head), value).callback do |head|
        @redis.hsetnx(gauge.key, gauge.tick_at(time), head).callback do |_new|
          @redis.hincrby(gauge.key, gauge.tick_at(time), value) unless _new
        end
      end
    else
      redis.hsetnx(self.key, self.tick_at(time), 0).callback do
        redis.hincrby(self.key, self.tick_at(time), value)
      @redis.hsetnx(gauge.key, gauge.tick_at(time), 0).callback do
        @redis.hincrby(gauge.key, gauge.tick_at(time), value)
      end
    end
  end  

  def incr_uniq(time, value, field_name=nil)
  def incr_uniq(gauge, value, field_name=nil)
    return false if session_key.blank?
    redis.sadd(self.tick_key(time, :sessions), session_key).callback do |_new|
      redis.expire(self.tick_key(time, :sessions), self.tick)
    @redis.sadd(gauge.tick_key(time, :sessions), session_key).callback do |_new|
      @redis.expire(gauge.tick_key(time, :sessions), gauge.tick)
      if (_new == 1) || (_new == true) #redis vs. em-redis
        redis.incr(time, self.tick_key(time, :"sessions-count")).callback do |sc|
          field_name ? incr_field_by(time, field_name, value) : incr_tick(time, value)
        @redis.incr(gauge.tick_key(time, :"sessions-count")).callback do |sc|
          field_name ? incr_field_by(gauge, field_name, value) : incr_tick(gauge, value)
        end
      end
    end
  end

  def incr_avg(time, value)
    redis.incr(self.tick_key(time, :"value-count")).callback do
      incr_tick(time, value)
  def incr_avg(gauge, value)
    @redis.incr(gauge.tick_key(time, :"value-count")).callback do
      incr_tick(gauge, value)
    end
  end

  def incr_field(time, field_name, value=1)
    assure_three_dimensional!(self)
    if self.unique? 
      incr_uniq(time, value, field_name)
  def incr_field(gauge_name, field_name, value=1)
    gauge = fetch_gauge(gauge_name)
    assure_three_dimensional!(gauge)
    if gauge.unique? 
      incr_uniq(gauge, value, field_name)
    else
      incr_field_by(time, field_name, value)
      incr_field_by(gauge, field_name, value)
    end
  end

  def incr_field_by(time, field_name, value)
    redis.zincrby(self.tick_key(time), value, field_name).callback do
      redis.incrby(self.tick_key(time, :count), 1)
  def incr_field_by(gauge, field_name, value)
    @redis.zincrby(gauge.tick_key(time), value, field_name).callback do
      @redis.incrby(gauge.tick_key(time, :count), 1)
    end
  end  

  def set_value(time, value)
    assure_two_dimensional!(self)
    redis.hset(self.key, self.tick_at(time), value)
  def set_value(gauge_name, value)
    gauge = fetch_gauge(gauge_name)
    assure_two_dimensional!(gauge)
    @redis.hset(gauge.key, gauge.tick_at(time), value)
  end

  def set_field(time, field_name, value)
    assure_three_dimensional!(self)
    redis.zadd(self.tick_key(time), value, field_name)
  def set_field(gauge_name, field_name, value)
    gauge = fetch_gauge(gauge_name)
    assure_three_dimensional!(gauge)
    @redis.zadd(gauge.tick_key(time), value, field_name)
  end
  
  def assure_two_dimensional!(gauge)
+15 −35
Original line number Diff line number Diff line
require 'rake'
require 'redis'
_opts = FnordMetric.options

task :run do  
  FnordMetric.run
if _opts[:web_interface]
  FnordMetric::Web.new(
    :host => _opts[:web_interface][0],
    :port => _opts[:web_interface][1]
  )
end

task :worker do
  FnordMetric.server_configuration = {
    :web_interface  => false,
    :inbound_stream => false,
    :start_worker   => true
  }
  FnordMetric.run
if _opts[:inbound_stream]
  FnordMetric::Acceptor.new(
    :protocol => _opts[:inbound_protocol],
    :host => _opts[:inbound_stream][0],
    :port => _opts[:inbound_stream][1]
  )
end

task :log do
  FnordMetric::Logger.start(dump_file_path)
if _opts[:start_worker]
  FnordMetric::Worker.new()
end
 No newline at end of file

task :import do
  FnordMetric::Logger.import(dump_file_path)
end

task :help do
  puts "usage: #{$0} {run|worker|log|import} [DUMP_FILE=fm_dump.json]"
end

task :default => :help

def dump_file_path
  if ENV["DUMP_FILE"].blank?
    Rake::Task[:help].execute; exit!
  else
    ::File.expand_path(ENV["DUMP_FILE"], ::File.dirname($0))
  end
end

Rake.application.init('fnordmetric')
Rake.application.top_level
 No newline at end of file
+21 −21
Original line number Diff line number Diff line
class FnordMetric::Worker

  def initialize(namespaces, opts)        
    @namespaces = {}
    @opts = opts
    configure(namespaces)
  end
  def initialize
    @namespaces = FnordMetric.namespaces
    @opts = FnordMetric.options

  def ready!
    @redis = EM::Hiredis.connect(@opts[:redis_url])
    tick
    FnordMetric.register(self)
  end

  def configure(namespaces)   
    namespaces.each do |key, block|
      @namespaces[key] = FnordMetric::Namespace.new(key, @opts.clone)
      @namespaces[key].instance_eval(&block)
    end
  def initialized
    FnordMetric.log("worker started")
    tick
  end

  def tick
    @redis.blpop(queue_key, 1).callback do |list, event_id|           
    redis.blpop(queue_key, 1).callback do |list, event_id|           
      EM.next_tick(&method(:tick))
      if event_id
        @redis.get(event_key(event_id)).callback do |event_data|                     
        redis.get(event_key(event_id)).callback do |event_data|                     
          process_event(event_id, event_data) if event_data        
          FnordMetric.log("event_lost: event_data not found for event-id '#{event_id}' - maybe expired?") unless event_data
          @redis.hincrby(stats_key, :events_processed, 1)
          redis.hincrby(stats_key, :events_processed, 1)
        end
      end
    end
  end

  def process_event(event_id, event_data)
    EM.defer do      
    puts "OUTER #{event_data}"
    EM.next_tick do      
      parse_json(event_data).tap do |event|                
        event[:_time] ||= Time.now.to_i
        event[:_eid] = event_id
        puts "INNER"
        announce_event(event)
        publish_event(event)        
        expire_event(event_id)       
@@ -60,15 +56,15 @@ class FnordMetric::Worker
  end

  def announce_event(event)
    namespace(event[:_namespace]).ready!(@redis).announce(event)
    namespace(event[:_namespace]).ready!(redis).announce(event)
  end

  def expire_event(event_id)
    @redis.expire(event_key(event_id), @opts[:event_data_ttl])
    redis.expire(event_key(event_id), @opts[:event_data_ttl])
  end

  def publish_event(event)    
    @redis.publish(pubsub_key, event.to_json)
    redis.publish(pubsub_key, event.to_json)
  end

  def namespace(key)
@@ -81,4 +77,8 @@ class FnordMetric::Worker
    event
  end

  def redis
    @redis ||= EM::Hiredis.connect(FnordMetric.options[:redis_url]) # FIXPAUL
  end

end