Commit dfa0148c authored by Paul Asmuth's avatar Paul Asmuth
Browse files

simple udp listener

parent 8809454d
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -4,3 +4,4 @@
+ warm up ring buffers on start
+ accept millisecond timestamps
+ background rbuf_flush thread every N seconds
+ rbuf size in time
+13 −4
Original line number Diff line number Diff line
@@ -102,10 +102,9 @@ object FnordMetric {
    if (flock == null)
      error("cannot aquire server.lck", true)

    val tcp_server = if (CONFIG contains 'tcp_port)
      new TCPServer(
        CONFIG('tcp_port).toInt,
        CONFIG('tcp_threads).toInt)

    if (CONFIG contains 'http)
      error("FIXPAUL: not yet implemented: http-server", true)

    val ws_server = if (CONFIG contains 'websocket_port)
      new HTTPServer(
@@ -113,6 +112,16 @@ object FnordMetric {
        CONFIG('websocket_threads).toInt,
        new WebSocketHandler)

    val tcp_server = if (CONFIG contains 'tcp_port)
      new TCPServer(
        CONFIG('tcp_port).toInt,
        CONFIG('tcp_threads).toInt)

    val udp_server = if (CONFIG contains 'udp_port)
      new UDPServer(
        CONFIG('udp_port).toInt,
        CONFIG('udp_threads).toInt)

  } catch {
    case e: Exception => exception(e, true)
  }
+1 −2
Original line number Diff line number Diff line
@@ -45,7 +45,7 @@ class Metric(key: MetricKey) {
      if (rbuf_seek_pos < 1)
        throw new Exception("flush_rbuf failed")

      // Mark the next value in the rbuf as ready to be overwritten. The
      // mark the next value in the rbuf as ready to be overwritten. The
      // order of these statements is significant!
      rbuf_seek_pos -= 1
      rbuf.seek(1)
@@ -170,7 +170,6 @@ class Metric(key: MetricKey) {

        // skip if we already saw this sample in the rbuf search
        if (cur._1 < rbuf_last) {
          println("LOAD_SWAP", cur)

          // if we are already beyond time1 we can exit
          if (time1 != 0 && (cur._1 < time1))
+0 −1
Original line number Diff line number Diff line
@@ -59,7 +59,6 @@ class SwapFile(metric_key: MetricKey) {
  // the specified destionation list buffer
  def load_chunk(position: Int, dst: ListBuffer[(Long, Double)]) : Int = {
    var read_pos = 0
    println("load_chunk", position)

    // we read the data back in 540 byte blocks (30 samples per block)
    var chunk_size = BLOCK_SIZE * 30
+49 −0
Original line number Diff line number Diff line
// FnordMetric Enterprise
//   (c) 2011-2013 Paul Asmuth <paul@paulasmuth.com>
//
// Licensed under the MIT License (the "License"); you may not use this
// file except in compliance with the License. You may obtain a copy of
// the License at: http://opensource.org/licenses/MIT

package com.fnordmetric.enterprise

import java.net.DatagramPacket
import java.net.DatagramSocket
import java.util.concurrent._

class UDPServer(port: Int, threads: Int){

  val pool = Executors.newFixedThreadPool(threads)
  val sock = new DatagramSocket(port)

  val buffer_size = 65535
  val buffer = new Array[Byte](buffer_size)
  val packet = new DatagramPacket(buffer, buffer_size)

  FnordMetric.log("Listening on tcp://0.0.0.0:" + port)

  while (true) {
    sock.receive(packet)

    dispatch(
      new String(packet.getData, 0, packet.getLength))
  }

  private def dispatch(body: String) =
    pool.execute(new Runnable { def run : Unit = {
      for (msg <- body.split("\n")) execute(msg)
    }})

  private def execute(msg: String) =
    InstructionFactory.parse(msg) match {
      case e: ErrorInstruction =>
        FnordMetric.error("[UDP] " + e.execute, false)

      case e: SampleInstruction =>
        e.execute

      case e: AbstractInstruction =>
        FnordMetric.error("[UDP] received non-sample instruction", false)
    }

}