Commit 2cc45d36 authored by Paul Asmuth's avatar Paul Asmuth
Browse files

documentation, cleaning up

parent 0fda05ea
Loading
Loading
Loading
Loading
+3 −0
Original line number Diff line number Diff line


+ first sampled value is not flushed (only after first flush interval was reached and new data is submitted)
+ warm up ring buffers on start
+ accept millisecond timestamps
+29 −10
Original line number Diff line number Diff line
@@ -20,6 +20,8 @@ class Metric(key: MetricKey) {
  // adds a value to the metric's bucket and tries to flush the bucket
  def sample(value: Double) = this.synchronized {

    // call flush_bucket with the returned aggregated value for every
    // flush_interval since the last call to flush_every
    bucket.flush_every(key.flush_interval, (
      (time, value) => flush_bucket(time, value) ))

@@ -29,18 +31,28 @@ class Metric(key: MetricKey) {
  // adds an aggregated value to the in memory ring buffer after it has
  // been flushed from the bucket
  private def flush_bucket(time: Long, value: Double) = {

    // if the ring buffer is already full we need to clear up a slot
    if (rbuf.remaining == 0) {

      // if there is no slot that is already flushed to disk which we can
      // use, we need to flush some. this flushes as much data to disk as
      // possible and marks it as "ready for removal"
      if (rbuf_seek_pos < 1)
        flush_rbuf

      // exit if we couldn't free up any slots (this should never happen)
      if (rbuf_seek_pos < 1)
        throw new Exception("flush_rbuf failed")

      rbuf.seek(1)
      // Mark the next value in the rbuf as ready to be overwritten. The
      // order of these statements is significant!
      rbuf_seek_pos -= 1
      rbuf.seek(1)
    }

    // now at least one slot in the ring buffer is free so we can just
    // push our sample
    rbuf.push(((time, value)))

    flush_rbuf // FIXPAUL: remove me
@@ -72,20 +84,23 @@ class Metric(key: MetricKey) {
  }

  // returns all aggregated values for this metric in the specified time
  // range. if time1 is 0 then only the first value at time0 is returned
  // note that time0 > time1! this method is threadsafe
  // range. if time1 is 0 then only the first value at time0 is returned.
  // note that time0 > time1! this method is threadsafe. reads within the
  // in memory ring buffer are lock-free. reads that hit the on disk swap
  // file use a striped lock and may block
  def values_in(time0: Long, time1: Long) : List[(Long, Double)] = {
    val lst = ListBuffer[(Long, Double)]()

    // FIXPAUL: skip n flush_intervals if time0 is older than now, maybe
    // skip rbuf_search completely

    // FIXPAUL: do this without snapshotting the ringbuffer
    val rbuf_snap = rbuf.tail(rbuf.size)

    var rbuf_last : Long = java.lang.Long.MAX_VALUE
    var rbuf_pos = 0

    // take a "snapshot" of the ring buffers current state. this may race
    // (len may be smaller than the real value) but this only means that
    // we may have to load one more value from the swapfile instead from
    // the in memory ring buffer
    val rbuf_snap_len = rbuf.size
    val rbuf_snap_pos = rbuf.position

    // search the ring buffer backwards without synchronization. the basic
    // assumption here is that the system time will only progress forward.
    // if the system time should jump backwards this would race
@@ -93,7 +108,11 @@ class Metric(key: MetricKey) {
      val cur = rbuf_snap(rbuf_pos)

      // since this is not synchronized, we need to check if we hit the
      // rbuf wrapping point and exit if so.
      // rbuf wrapping point and exit if so. this code would race if the
      // ring buffer did one full revolution in the time between taking
      // the initial snapshot (rbuf_snap_pos) and the first assignment to
      // rbuf_last. we assume that this thread isn't preempted for longer
      // than 60 seconds (the min. flush_interval) and ignore this...
      if (cur._1 < rbuf_last)
        rbuf_last = cur._1
      else
+4 −4
Original line number Diff line number Diff line
@@ -16,8 +16,8 @@ class RingBuffer[T: Manifest](capacity: Int) {
  // the numer of elements that this ring buffer currently contains
  var size  : Int = 0

  // appends a new item. is_full must be called before appending to check if
  // the ringbuffer is already full
  // appends a new item. the remaining number of free slots must be checked
  // before appending
  def push(item: T) : Unit = {
    if (size == capacity)
      throw new Exception("ring buffer is full")
@@ -51,14 +51,14 @@ class RingBuffer[T: Manifest](capacity: Int) {
    lst.toList
  }

  // Removes the first num items from the start of the ring buffer (oldest
  // removes the first num items from the start of the ring buffer (oldest
  // items get removed first)
  def seek(num: Int) = {
    start = (start + num) % capacity
    size -= num
  }

  // Returns the remaning number of free slots in the ringbuffer
  // returns the remaning number of free slots in the ringbuffer
  def remaining : Int =
    capacity - size