Commit 9a50d777 authored by Paul Asmuth's avatar Paul Asmuth
Browse files

better bucket_flush procedure

parent 0e19bad4
Loading
Loading
Loading
Loading
+5 −2
Original line number Diff line number Diff line
@@ -14,20 +14,23 @@ trait AbstractBucket {
  def sample(value: Double) : Unit
  def flush() : Double

  def flush_every(interval: Long, proc: (Long, Double) => Unit) = {
  def flush_every(interval: Long) : (Long, Double) = {
    val now = FnordMetric.now
    var triggered = (next_flush == 0)
    var ret : (Long, Double) = null

    if (triggered)
      next_flush = now

    while (next_flush <= now) {
      if (!triggered)
        proc(next_flush, flush)
        ret = ((next_flush, flush))

      next_flush += interval
      triggered = true
    }

    ret
  }

}
+9 −8
Original line number Diff line number Diff line
@@ -19,18 +19,19 @@ class Metric(key: MetricKey) {

  // adds a value to the metric's bucket and tries to flush the bucket
  def sample(value: Double) = this.synchronized {

    // call flush_bucket with the returned aggregated value for every
    // flush_interval since the last call to flush_every
    bucket.flush_every(key.flush_interval, (
      (time, value) => flush_bucket(time, value) ))

    bucket.sample(value)
    flush_bucket
  }

  // adds an aggregated value to the in memory ring buffer after it has
  // been flushed from the bucket
  private def flush_bucket(time: Long, value: Double) = {
  def flush_bucket : Unit = {
    val nxt = bucket.flush_every(key.flush_interval)

    // flush_every returns null if the current flush interval is not over
    // yet (makes this method idempotent)
    if (nxt == null)
      return

    // if the ring buffer is already full we need to clear up a slot
    if (rbuf.remaining == 0) {
@@ -53,7 +54,7 @@ class Metric(key: MetricKey) {

    // now at least one slot in the ring buffer is free so we can just
    // push our sample
    rbuf.push(((time, value)))
    rbuf.push(nxt)

    flush_rbuf // FIXPAUL: remove me
  }