Commit e1ea606d authored by Paul Asmuth's avatar Paul Asmuth
Browse files

Metric#flush_interest and SwapFile#last_flush

parent 9a50d777
Loading
Loading
Loading
Loading
+12 −5
Original line number Diff line number Diff line
@@ -12,8 +12,11 @@ import scala.collection.mutable.ListBuffer
case class MetricKey(key: String, mode: String, flush_interval: Long)

class Metric(key: MetricKey) {
  var flush_interest : Long = 0

  val bucket = BucketFactory.new_bucket(key.mode)
  val swap = new SwapFile(key)

  var rbuf = new RingBuffer[(Long, Double)](10)
  var rbuf_seek_pos = 0

@@ -28,6 +31,10 @@ class Metric(key: MetricKey) {
  def flush_bucket : Unit = {
    val nxt = bucket.flush_every(key.flush_interval)

    // indicate to the background thread that this metric has pending data
    // in the bucket and when it can be flushed
    flush_interest = bucket.next_flush

    // flush_every returns null if the current flush interval is not over
    // yet (makes this method idempotent)
    if (nxt == null)
@@ -55,8 +62,6 @@ class Metric(key: MetricKey) {
    // now at least one slot in the ring buffer is free so we can just
    // push our sample
    rbuf.push(nxt)

    flush_rbuf // FIXPAUL: remove me
  }

  // tries to persist as much data from the in memory ring buffer to disk
@@ -64,14 +69,16 @@ class Metric(key: MetricKey) {
  def flush_rbuf = this.synchronized {
    val flush_range = rbuf.size - rbuf_seek_pos

    // FIXPAUL: wrong order!
    // copy the flushable items from the rbuf to the swapfile
    for (sample <- rbuf.tail(flush_range))
      swap.put(sample._1, sample._2)

    swap.flush
    // mark the range as "read to be overwritten
    rbuf_seek_pos += flush_range
  }

    // mark this metric as "no pending flushes"
    flush_interest = 0
  }

  // returns this metrics value at time0 if a value was recorded at that
  // point in time
+3 −0
Original line number Diff line number Diff line
@@ -14,6 +14,7 @@ import java.nio.ByteOrder
import scala.collection.mutable.ListBuffer

class SwapFile(metric_key: MetricKey) {
  var last_flush : Long = 0

  val buffer = ByteBuffer.allocate(512)
  buffer.order(ByteOrder.BIG_ENDIAN)
@@ -46,6 +47,8 @@ class SwapFile(metric_key: MetricKey) {
  // fluhes the queued writes from the buffer to disk. this method
  // is not thread safe!
  def flush : Unit = {
    last_flush = FnordMetric.now

    file.synchronized {
      file.seek(write_pos)
      file.write(buffer.array, 0, buffer.position)