Commit 1b4b8547 authored by Paul Asmuth's avatar Paul Asmuth
Browse files

implement disk syncing via scheduler

parent 8474f553
Loading
Loading
Loading
Loading
+8 −3
Original line number Diff line number Diff line
@@ -13,6 +13,7 @@ case class MetricKey(key: String, mode: String, flush_interval: Long)

class Metric(key: MetricKey) {
  var flush_interest : Long = 0
  var next_disk_sync : Long = 0

  val bucket = BucketFactory.new_bucket(key.mode)
  val swap = new SwapFile(key)
@@ -47,7 +48,7 @@ class Metric(key: MetricKey) {
      // use, we need to flush some. this flushes as much data to disk as
      // possible and marks it as "ready for removal"
      if (rbuf_seek_pos < 1)
        flush_rbuf
        flush_rbuf()

      // exit if we couldn't free up any slots (this should never happen)
      if (rbuf_seek_pos < 1)
@@ -69,15 +70,19 @@ class Metric(key: MetricKey) {

  // tries to persist as much data from the in memory ring buffer to disk
  // as possible but doesnt remove it from the buffer yet
  def flush_rbuf = this.synchronized {
  def flush_rbuf(force_sync: Boolean = false) = this.synchronized {
    val flush_range = rbuf.size - rbuf_seek_pos

    // copy the flushable items from the rbuf to the swapfile
    for (sample <- rbuf.tail(flush_range))
      swap.put(sample._1, sample._2)

    // mark the range as "read to be overwritten
    // mark the range as "ready to be overwritten"
    rbuf_seek_pos += flush_range

    // force syncing the data to disk if requested
    if (force_sync)
      swap.flush
  }

  // returns this metrics value at time0 if a value was recorded at that
+15 −3
Original line number Diff line number Diff line
@@ -11,6 +11,9 @@ class Scheduler extends Thread {

  val TICK = 10

  // sync metric data to disk every N ms
  val SYNC_EVERY = 10000

  // this runs in the background and tries to invoke some flushes. it
  // is not neccessary for the liveliness of the application that each
  // iteration of the loop finishes in a fixed time interval. even if
@@ -23,12 +26,21 @@ class Scheduler extends Thread {
  private def next : Unit = {
    val now = FnordMetric.now

    for (metric <- MetricFactory.metric_map) {

      // search for metrics with a pending flush interest and flush them
    // if the now > flush_interest
    for (metric <- MetricFactory.metric_map)
      // if now > flush_interest
      if (metric._2.flush_interest > 0 && metric._2.flush_interest <= now)
        metric._2.flush_bucket

      // search for metrics with a pending disk sync and flush them
      if (metric._2.next_disk_sync <= now) {
        metric._2.next_disk_sync = now + SYNC_EVERY
        metric._2.flush_rbuf(true)
      }

    }

    // to avoid burning CPU we sleep for a few ms
    Thread.sleep(TICK)
  }