Commit c1aeab03 authored by Paul Asmuth's avatar Paul Asmuth
Browse files

basic netty integration

parent 3f9eeaa7
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -48,4 +48,4 @@ Requests that can not be served from memory require one sequential disk read.
+ much much more scalable
+ highly customizable with css
+ requires only a single deployment
+ i18n (proper timezones in graphs due to in browser rendering etc)
+2 −0
Original line number Diff line number Diff line
@@ -22,6 +22,8 @@ libraryDependencies += "org.eclipse.jetty" % "jetty-websocket" % "8.1.8.v2012110

libraryDependencies += "org.scalatest" %% "scalatest" % "2.0.M1" % "test"

libraryDependencies += "io.netty" % "netty" % "3.6.0.Final"

assemblySettings

jarName in assembly <<= (version) { v => "FnordMetric-Enterprise-v" + v + ".jar" }
+24 −6
Original line number Diff line number Diff line
@@ -21,7 +21,9 @@ object FnordMetric {

  var DEFAULTS = HashMap[Symbol, String](
    'http_threads      -> "4",
    'websocket_threads -> "4"
    'websocket_threads -> "4",
    'tcp_threads       -> "4",
    'udp_threads       -> "4"
  )

  var debug = false
@@ -44,6 +46,18 @@ object FnordMetric {
      else if(args(n) == "--websocket-threads")
        { CONFIG += (('websocket_threads, args(n+1))); n += 2 }

      else if(args(n) == "--tcp")
        { CONFIG += (('tcp_port, args(n+1))); n += 2 }

      else if(args(n) == "--tcp-threads")
        { CONFIG += (('tcp_threads, args(n+1))); n += 2 }

      else if(args(n) == "--udp")
        { CONFIG += (('udp_port, args(n+1))); n += 2 }

      else if(args(n) == "--udp-threads")
        { CONFIG += (('udp_threads, args(n+1))); n += 2 }

      else if((args(n) == "-d") || (args(n) == "--debug"))
        { debug = true; n += 1 }

@@ -69,12 +83,16 @@ object FnordMetric {
  def boot = try {
    FnordMetric.log("Booting...")

    val websocket_threads = CONFIG('websocket_threads).toInt
    val websocket_port = CONFIG.getOrElse('websocket_port, "0")
      .asInstanceOf[String].toInt
    val ws_server = if (CONFIG contains 'websocket_port)
      new HTTPServer(
        CONFIG('websocket_port).toInt,
        CONFIG('websocket_threads).toInt,
        new WebSocketHandler)

    val websocket = if (websocket_port > 0)
      new HTTPServer(websocket_port, websocket_threads, new WebSocketHandler)
    val tcp_server = if (CONFIG contains 'tcp_port)
      new TCPServer(
        CONFIG('tcp_port).toInt,
        CONFIG('tcp_threads).toInt)

  } catch {
    case e: Exception => exception(e, true)
+47 −0
Original line number Diff line number Diff line
package com.fnordmetric.enterprise
// FnordMetric Enterprise
//   (c) 2011-2013 Paul Asmuth <paul@paulasmuth.com>
//
// Licensed under the MIT License (the "License"); you may not use this
// file except in compliance with the License. You may obtain a copy of
// the License at: http://opensource.org/licenses/MIT

import java.net.InetSocketAddress
import java.util.concurrent._
import org.jboss.netty.bootstrap.ServerBootstrap
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
import org.jboss.netty.channel.Channels
import org.jboss.netty.channel.ChannelPipeline
import org.jboss.netty.channel.ChannelPipelineFactory
import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder
import org.jboss.netty.handler.codec.frame.Delimiters
import org.jboss.netty.handler.codec.string.StringDecoder
import org.jboss.netty.handler.codec.string.StringEncoder

class TCPServer(port: Int, threads: Int) {

  val MAX_FRAME_LENGTH = 8192

  val bootstrap = new ServerBootstrap(
    new NioServerSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool))

  bootstrap.setPipelineFactory(new TCPServerPipelineFactory)
  bootstrap.bind(new InetSocketAddress(port))

  class TCPServerPipelineFactory extends ChannelPipelineFactory {

    override def getPipeline : ChannelPipeline = {
      val pipeline = Channels.pipeline

      pipeline.addLast("framer", new DelimiterBasedFrameDecoder(MAX_FRAME_LENGTH,
        (Delimiters.lineDelimiter): _*))

      pipeline.addLast("decoder", new StringDecoder)
      pipeline.addLast("eccoder", new StringEncoder)

      pipeline
    }

  }

}