Commit c2c63476 authored by Paul Asmuth's avatar Paul Asmuth
Browse files

swapfile load procedure WIP

parent 380b17be
Loading
Loading
Loading
Loading
+22 −5
Original line number Diff line number Diff line
@@ -14,7 +14,7 @@ case class MetricKey(key: String, mode: String, flush_interval: Long)
class Metric(key: MetricKey) {
  val bucket = BucketFactory.new_bucket(key.mode)
  val swap = new SwapFile(key)
  var rbuf = new RingBuffer[(Long, Double)](1000)
  var rbuf = new RingBuffer[(Long, Double)](10)
  var rbuf_seek_pos = 0

  // adds a value to the metric's bucket and tries to flush the bucket
@@ -137,11 +137,28 @@ class Metric(key: MetricKey) {
    if (rbuf_last <= time1)
      return lst.toList

    // FIXPAUL: cache swapfile start and end time to avoid fs rountrips if
    // the swapfile search can't return any results
    // start searching the swapfile backwards from the last write position
    var swap_pos = swap.write_pos

    // FIXPAUL: here be dragons -> search in swapfile
    println("SEARCH_SWPFILE")
    // we skip at least as many values as we've already seen in the rbuf. but
    // since this is not synchronized we might still load a few samples that
    // we have already seen
    swap_pos -= (rbuf_seek_pos * swap.BLOCK_SIZE)

    while (swap_pos > 0) {
      println("LOAD_SWAP")
      var nxt = ListBuffer[(Long, Double)]()

      // load the next chunk of samples from the swapfile
      swap_pos = swap.load_chunk(swap_pos, nxt)

      for (cur <- nxt)

        // skip if we already saw this sample in the rbuf search
        if (cur._1 < rbuf_last) {
          println("LOAD_SWAP", cur)
        }
    }

    lst.toList
  }
+44 −3
Original line number Diff line number Diff line
@@ -11,11 +11,15 @@ import java.io.RandomAccessFile
import java.io.File
import java.nio.ByteBuffer
import java.nio.ByteOrder
import scala.collection.mutable.ListBuffer

class SwapFile(metric_key: MetricKey) {

  var write_pos = 0

  // each sample is 18 bytes big (2 bytes header, 8 bytes time and
  // 8 bytes value as double precision ieee 754 float)
  val BLOCK_SIZE = 18

  val file_name = "metric-" + metric_key.key +
    metric_key.mode + "-" + metric_key.flush_interval

@@ -30,7 +34,7 @@ class SwapFile(metric_key: MetricKey) {
  def put(time: Long, value: Double) : Unit = {
    val bvalue = java.lang.Double.doubleToLongBits(value)

    if (buffer.remaining < 18)
    if (buffer.remaining < BLOCK_SIZE)
      flush

    buffer.putShort(0x1717)
@@ -46,9 +50,46 @@ class SwapFile(metric_key: MetricKey) {
      file.write(buffer.array)
    }

    write_pos += 18
    write_pos += BLOCK_SIZE
    buffer.rewind
  }

  // reads a chunk of of values from the swapfile at position into
  // the specified destionation list buffer
  def load_chunk(position: Int, dest: ListBuffer[(Long, Double)]) : Int = {
    var read_pos = 0
    println("load_chunk", position)

    // we read the data back in 540 byte blocks (30 samples per block)
    var chunk_size = BLOCK_SIZE * 30
    val chunk = ByteBuffer.allocate(chunk_size)

    if (position < chunk_size)
      chunk_size = position

    // read the next chunk into memory
    while (read_pos < chunk_size - 1) {

      // we need to seek before every read as calls to load_chunk don't
      // have to be synchronized with writes
      file.synchronized {
        file.seek(position - chunk_size)

        read_pos += file.read(buffer.array, read_pos,
          chunk_size - read_pos - 1)
      }
    }

    while (read_pos >= BLOCK_SIZE) {
      read_pos -= BLOCK_SIZE
      buffer.position(read_pos)

      // FIXPAUL: load the next chunk into lst
      println(buffer.getShort)
    }

    position - chunk_size
  }

}
+3 −2
Original line number Diff line number Diff line
@@ -21,10 +21,11 @@ class WebSocket extends org.eclipse.jetty.websocket.WebSocket.OnTextMessage {
  def onClose(code: Int, message: String) =
    FnordMetric.log_debug("[WebSocket] connection closed")

  def onMessage(message: String) {
  def onMessage(message: String) = try {
    val ins = InstructionFactory.parse(message)
    endpoint.sendMessage(ins.execute);
  } catch {
    case e: Exception => FnordMetric.exception(e, false)
  }


}