Commit 83d3a0fc authored by Paul Asmuth's avatar Paul Asmuth
Browse files

merged Only parsable events are added to event queue by inbound stream from...

merged Only parsable events are added to event queue by inbound stream from kazjote - don't accept invalid json in inbound_stream. closes #27
parent ac885aa8
Loading
Loading
Loading
Loading
+12 −3
Original line number Diff line number Diff line
@@ -12,9 +12,18 @@ class FnordMetric::API
  end
  
  def event(event_data)
    event_data = event_data.to_json if event_data.is_a?(Hash)
    begin
      if event_data.is_a?(Hash)
        event_data = event_data.to_json  
      else
        JSON.parse(event_data) # void ;)
      end
    rescue JSON::ParserError
      FnordMetric.log("event_lost: can't parse json")
    else
      push_event(get_next_uuid, event_data)
    end
  end

  def disconnect
    @redis.quit
+5 −1
Original line number Diff line number Diff line
@@ -6,6 +6,10 @@ class FnordMetric::InboundStream < EventMachine::Connection
    EM.start_server(*opts[:inbound_stream], self)    
  end

  def self.options(opts)
    @@opts = opts
  end

  def receive_data(chunk)     
    @buffer << chunk         
    EM.defer{ next_event }
+1 −1
Original line number Diff line number Diff line
@@ -22,7 +22,7 @@ class FnordMetric::Worker
    @redis.blpop(queue_key, 0).callback do |list, event_id|           
      @redis.get(event_key(event_id)).callback do |event_data|                     
        process_event(event_id, event_data) if event_data        
        FnordMetric.log("oops, lost an event :(") unless event_data
        FnordMetric.log("event_lost: event_data not found for event-id '#{event_id}'") unless event_data
        EM.next_tick(&method(:tick))      
        @redis.hincrby(stats_key, :events_processed, 1)
      end
+34 −0
Original line number Diff line number Diff line
require ::File.expand_path('../spec_helper.rb', __FILE__)

describe FnordMetric::InboundStream do

  before(:all) do
    @redis = Redis.new
    @redis_wrap = RedisWrap.new(@redis)
    FnordMetric::InboundStream.options(
      :redis_url => "redis://localhost:6379",
      :redis_prefix => "fnordmetric-test",
      :event_queue_ttl => 120
    )
    @inbound_stream = FnordMetric::InboundStream.new(nil)
  end

  describe "pushing new events" do
    it "should add parsable event to the queue" do
      data = %Q{{"_type": "started"}\n}

      lambda {
        @inbound_stream.receive_data data
      }.should change { @redis.llen("fnordmetric-test-queue") }.by +1
    end

    it "should reject non parsable events" do
      broken_data = %Q{{"_type": \n"started"}\n}
    
      lambda {
        @inbound_stream.receive_data broken_data
      }.should_not change { @redis.llen("fnordmetric-test-queue") }
    end
  end
end
+3 −0
Original line number Diff line number Diff line
@@ -14,6 +14,9 @@ require "fnordmetric"
require "eventmachine"
require 'em-hiredis'

def EM.defer; yield; end
def EM.next_tick; yield; end

class RedisWrap

  def initialize(redis, callbackable=true)