Commit 7bf18cbf authored by Paul Asmuth's avatar Paul Asmuth
Browse files

basic background/scheduled flush

parent e1ea606d
Loading
Loading
Loading
Loading
+2 −5
Original line number Diff line number Diff line


+ first sampled value is not flushed (only after first flush interval was reached and new data is submitted)
+ background rbuf_flush -> swapfile every N seconds
+ rbuf size in time
+ warm up ring buffers on start
+ accept millisecond timestamps
+ background rbuf_flush thread every N seconds
+ rbuf size in time
+36 −0
Original line number Diff line number Diff line
// FnordMetric Enterprise
//   (c) 2011-2013 Paul Asmuth <paul@paulasmuth.com>
//
// Licensed under the MIT License (the "License"); you may not use this
// file except in compliance with the License. You may obtain a copy of
// the License at: http://opensource.org/licenses/MIT

package com.fnordmetric.enterprise

class BackgroundThread extends Thread {

  val TICK = 10

  // this runs in the background and tries to invoke some flushes in
  // the background. it is not neccessary for the liveliness of the
  // application that each iteration of the loop finishes in a fixed
  // time interval. even if this thread dies completely all data will
  // be correctly recorded, but the last sample on each metric won't
  // be flushed until new data arrives
  override def run : Unit =
    while (true) next

  private def next : Unit = {
    val now = FnordMetric.now

    // search for metrics with a pending flush interest and fush them
    // if the now > flush_interest
    for (metric <- MetricFactory.metric_map)
      if (metric._2.flush_interest > 0 && metric._2.flush_interest <= now)
        metric._2.flush_bucket

    // to avoid burning CPU we sleep for a few ms
    Thread.sleep(TICK)
  }

}
+2 −0
Original line number Diff line number Diff line
@@ -102,6 +102,8 @@ object FnordMetric {
    if (flock == null)
      error("cannot aquire server.lck", true)

    val bg_thread = new BackgroundThread
    bg_thread.start

    if (CONFIG contains 'http)
      error("FIXPAUL: not yet implemented: http-server", true)
+4 −4
Original line number Diff line number Diff line
@@ -28,7 +28,7 @@ class Metric(key: MetricKey) {

  // adds an aggregated value to the in memory ring buffer after it has
  // been flushed from the bucket
  def flush_bucket : Unit = {
  def flush_bucket : Unit = this.synchronized {
    val nxt = bucket.flush_every(key.flush_interval)

    // indicate to the background thread that this metric has pending data
@@ -62,6 +62,9 @@ class Metric(key: MetricKey) {
    // now at least one slot in the ring buffer is free so we can just
    // push our sample
    rbuf.push(nxt)

    // mark this metric as "no pending flushes"
    flush_interest = 0
  }

  // tries to persist as much data from the in memory ring buffer to disk
@@ -75,9 +78,6 @@ class Metric(key: MetricKey) {

    // mark the range as "read to be overwritten
    rbuf_seek_pos += flush_range

    // mark this metric as "no pending flushes"
    flush_interest = 0
  }

  // returns this metrics value at time0 if a value was recorded at that