Commit 1a5ea9df authored by Paul Asmuth's avatar Paul Asmuth
Browse files

seperated acceptors

parent 53ca457a
Loading
Loading
Loading
Loading
+50 −35
Original line number Diff line number Diff line
@@ -48,6 +48,8 @@ require "fnordmetric/version"
#
# geo_distribution_gauge
#
#  distribution gauge
#
#
# wiki
#
@@ -88,6 +90,10 @@ module FnordMetric
    @@server_configuration = configuration
  end

  def self.options=(configuration)
    @@server_configuration = configuration
  end

  def self.default_options(opts = {})
    {
      :redis_url => "redis://localhost:6379",
@@ -108,40 +114,6 @@ module FnordMetric
    default_options(@@server_configuration || {}).merge(opts)
  end

  def self.start_em(opts = {})
    EM.run do

      trap("TERM", &method(:shutdown))
      trap("INT",  &method(:shutdown))

      opts = options(opts)
      app = embedded(opts)

      if opts[:web_interface]
        server = opts[:web_interface_server].downcase

        unless ["thin", "hatetepe"].include? server
          raise "Need an EventMachine webserver, but #{server} isn't"
        end

        host, port = *opts[:web_interface]

        Rack::Server.start(
          :app => app,
          :server => server,
          :Host => host, 
          :Port => port
        ) && log("listening on http://#{host}:#{port}")
        
        FnordMetric::WebSocket.new(
          :host => host, 
          :port => (port.to_i+1)
        ) && log("listening on ws://#{host}:#{port.to_i+1}")

      end
    end
  end

  def self.log(msg)
    puts "[#{Time.now.strftime("%y-%m-%d %H:%M:%S")}] #{msg}"
  end
@@ -159,11 +131,49 @@ module FnordMetric
    sleep(1); run
  end

  def self.shutdown
  def self.shutdown(fnord=nil)
    log "shutting down, byebye"
    EM.stop
  end

  def self.start_em
    EM.run do

      trap("TERM", &method(:shutdown))
      trap("INT",  &method(:shutdown))

      EM.next_tick do
        ($fnordmetric || []).map(&:initialized)
      end

      # opts = options(opts)
      # app = embedded(opts)

      # if opts[:web_interface]
      #   server = opts[:web_interface_server].downcase

      #   unless ["thin", "hatetepe"].include? server
      #     raise "Need an EventMachine webserver, but #{server} isn't"
      #   end

      #   host, port = *opts[:web_interface]

      #   Rack::Server.start(
      #     :app => app,
      #     :server => server,
      #     :Host => host, 
      #     :Port => port
      #   ) && log("listening on http://#{host}:#{port}")
        
      #   FnordMetric::WebSocket.new(
      #     :host => host, 
      #     :port => (port.to_i+1)
      #   ) && log("listening on ws://#{host}:#{port.to_i+1}")

      # end
    end
  end

  def self.connect_redis(redis_url)
    EM::Hiredis.connect(redis_url)
  end
@@ -230,6 +240,11 @@ end
require "fnordmetric/backends/redis_backend"
require "fnordmetric/backends/memory_backend"

require "fnordmetric/acceptors/tcp_acceptor"
require "fnordmetric/acceptors/udp_acceptor"

require "fnordmetric/acceptor"

require "fnordmetric/api"
require "fnordmetric/udp_client"
require "fnordmetric/inbound_stream"
+30 −0
Original line number Diff line number Diff line
class FnordMetric::Acceptor

  def initialize(opts)
    @opts = opts

    $fnordmetric ||= []
    $fnordmetric << self
  end

  def initialized   
    inbound_class = if @opts[:protocol] == :udp 
      FnordMetric::UDPAcceptor
    else
      FnordMetric::TCPAcceptor
    end

    @opts[:listen] = [
      @opts[:host] || "0.0.0.0",
      @opts[:port] || 2323
    ]

    begin
      inbound_stream = inbound_class.start(@opts)
      FnordMetric.log "listening on #{@opts[:protocol]}://#{@opts[:listen][0..1].join(":")}"
    #rescue
    #  FnordMetric.log "cant start #{inbound_class.name}. port in use?"
    end
  end

end
 No newline at end of file
+54 −0
Original line number Diff line number Diff line
class FnordMetric::TCPAcceptor < EventMachine::Connection
  @@opts = nil

  def self.start(opts)
    @@opts = opts
    EM.start_server(*(opts[:listen] + [self]))
  end

  def self.options(opts)
    @@opts = opts
  end

  def receive_data(chunk)
    @buffer << chunk
    next_event
  end

  def next_event
    read_next_event
    push_next_event
  end

  def read_next_event
    while (event = @buffer.slice!(/^(.*)\n/))
      @events_buffered += 1
      @events << event
    end
  end

  def push_next_event
    return true if @events.empty?
    @events_buffered -= 1
    @backend.publish(@events.pop)
    close_connection?
    EM.next_tick(&method(:push_next_event))
  end

  def close_connection?
    @backend.hangup unless @streaming || (@events_buffered!=0)
  end

  def post_init
    @backend = FnordMetric.backend
    @events_buffered = 0
    @streaming = true
    @buffer = ""
    @events = []
  end

  def unbind
    @streaming = false
    close_connection?
  end
end
+37 −0
Original line number Diff line number Diff line
class FnordMetric::UDPAcceptor < EventMachine::Connection

  class << self
    attr_accessor :opts
  end

  def self.start(opts)
    self.opts = opts

    EM.open_datagram_socket(*(opts[:listen] << self << opts))
  end

  def receive_data(event)
    events << event
    push_next_event
  end

  def push_next_event
    return true if events.empty?
    ev = @events.pop
    backend.publish(ev)
    EM.next_tick(&method(:push_next_event))
  end

  def unbind
    backend.hangup
  end

  def backend
    @backend ||= FnordMetric.backend
  end

  def events
    @events ||= []
  end

end
 No newline at end of file