Commit 8ae0d9e3 authored by Paul Asmuth's avatar Paul Asmuth
Browse files

simple ringbuffer flush logic

parent fbf42705
Loading
Loading
Loading
Loading
+26 −2
Original line number Diff line number Diff line
@@ -12,17 +12,41 @@ case class MetricKey(key: String, mode: String, flush_interval: Long)
class Metric(key: MetricKey) {
  val bucket = BucketFactory.new_bucket(key.mode)
  var rbuf = new RingBuffer[Double](10)
  var rbuf_seek_pos = 0

  // adds a value to this metric
  def sample(value: Double) = this.synchronized {
    bucket.flush_every(key.flush_interval, (
      (time, value) => flush(time, value) ))
      (time, value) => flush_bucket(time, value) ))

    bucket.sample(value)
  }

  def flush(time: Long, value: Double) = this.synchronized {
  // adds an aggregated value to the in memory ring buffer after it has been
  // flushed from the bucket
  private def flush_bucket(time: Long, value: Double) = {
    if (rbuf.remaining == 0) {

      if (rbuf_seek_pos < 1)
        flush_rbuf

      if (rbuf_seek_pos < 1)
        throw new Exception("flush_rbuf failed")

      rbuf.seek(1)
      rbuf_seek_pos -= 1
    }

    rbuf.push(value)
    println("RINGBUF", rbuf.tail(10))
  }

  // tries to persist as much data from the in memory ring buffer to disk
  // as possible but doesnt remove it from the buffer yet
  def flush_rbuf = this.synchronized {
    val flush_range = rbuf.size - rbuf_seek_pos
    println("RBUF_FLUSH", rbuf.tail(flush_range))
    rbuf_seek_pos += flush_range
  }

}
+4 −0
Original line number Diff line number Diff line
@@ -58,4 +58,8 @@ class RingBuffer[T: Manifest](capacity: Int) {
    size -= num
  }

  // Returns the remaning number of free slots in the ringbuffer
  def remaining : Int =
    capacity - size

}