Unverified Commit cf32351f authored by Bharath Ramsundar's avatar Bharath Ramsundar Committed by GitHub
Browse files

Merge pull request #952 from peastman/queue

Use queue for prediction
parents 766139be 86328d56
Loading
Loading
Loading
Loading
+3 −4
Original line number Diff line number Diff line
@@ -920,8 +920,8 @@ class Input(Layer):
      self.out_tensor = out_tensor
    return out_tensor

  def create_pre_q(self, batch_size):
    q_shape = (batch_size,) + self._shape[1:]
  def create_pre_q(self):
    q_shape = (None,) + self._shape[1:]
    return Input(shape=q_shape, name="%s_pre_q" % self.name, dtype=self.dtype)

  def get_pre_q_name(self):
@@ -2014,8 +2014,7 @@ class InputFifoQueue(Layer):
      in_layers = self.in_layers
    in_layers = convert_to_layers(in_layers)
    self.dtypes = [x.out_tensor.dtype for x in in_layers]
    self.queue = tf.FIFOQueue(
        self.capacity, self.dtypes, shapes=self.shapes, names=self.names)
    self.queue = tf.FIFOQueue(self.capacity, self.dtypes, names=self.names)
    feed_dict = {x.name: x.out_tensor for x in in_layers}
    self.out_tensor = self.queue.enqueue(feed_dict)
    self.close_op = self.queue.close()
+54 −22
Original line number Diff line number Diff line
@@ -178,16 +178,6 @@ class TensorGraph(Model):
    -------
    the average loss over the most recent checkpoint interval
    """

    def create_feed_dict():
      if self.use_queue:
        while True:
          yield {self._training_placeholder: 1.0}
      for d in feed_dict_generator:
        feed_dict = dict(d)
        feed_dict[self._training_placeholder] = 1.0
        yield feed_dict

    if not self.built:
      self.build()
    with self._get_tf("Graph").as_default():
@@ -207,14 +197,14 @@ class TensorGraph(Model):
      n_samples = 0
      n_enqueued = [0]
      final_sample = [None]
      if self.use_queue:
      if self.queue_installed:
        enqueue_thread = threading.Thread(
            target=_enqueue_batch,
            args=(self, feed_dict_generator, self._get_tf("Graph"),
                  self.session, n_enqueued, final_sample))
        enqueue_thread.start()
      for feed_dict in create_feed_dict():
        if self.use_queue:
      for feed_dict in self._create_feed_dicts(feed_dict_generator, True):
        if self.queue_installed:
          # Don't let this thread get ahead of the enqueue thread, since if
          # we try to read more batches than the total number that get queued,
          # this thread will hang indefinitely.
@@ -327,12 +317,27 @@ class TensorGraph(Model):
    with self._get_tf("Graph").as_default():
      # Gather results for each output
      results = [[] for out in outputs]
      for feed_dict in generator:
        feed_dict = {
            self.layers[k.name].out_tensor: v
            for k, v in six.iteritems(feed_dict)
        }
        feed_dict[self._training_placeholder] = 0.0
      n_samples = 0
      n_enqueued = [0]
      final_sample = [None]
      if self.queue_installed:
        enqueue_thread = threading.Thread(
            target=_enqueue_batch,
            args=(self, generator, self._get_tf("Graph"), self.session,
                  n_enqueued, final_sample))
        enqueue_thread.start()
      for feed_dict in self._create_feed_dicts(generator, False):
        if self.queue_installed:
          # Don't let this thread get ahead of the enqueue thread, since if
          # we try to read more batches than the total number that get queued,
          # this thread will hang indefinitely.
          while n_enqueued[0] <= n_samples:
            if n_samples == final_sample[0]:
              break
            time.sleep(0)
          if n_samples == final_sample[0]:
            break
        n_samples += 1
        feed_results = self.session.run(outputs, feed_dict=feed_dict)
        if len(feed_results) > 1:
          if len(transformers):
@@ -519,14 +524,17 @@ class TensorGraph(Model):
      for layer in self.features + self.labels + self.task_weights:
        layer.pre_queue = True
      return
    inputs = self.features + self.labels + self.task_weights
    if len(inputs) == 0:
      return
    names = []
    shapes = []
    pre_q_inputs = []
    q = InputFifoQueue(shapes, names, in_layers=pre_q_inputs)
    q.name = "%s_%s" % (q.__class__.__name__, len(self.layers) + 1)

    for layer in self.features + self.labels + self.task_weights:
      pre_q_input = layer.create_pre_q(self.batch_size)
    for layer in inputs:
      pre_q_input = layer.create_pre_q()
      shapes.append(pre_q_input.shape)
      names.append(pre_q_input.name)
      pre_q_inputs.append(pre_q_input)
@@ -792,6 +800,25 @@ class TensorGraph(Model):
  def __del__(self):
    pass

  def _create_feed_dicts(self, generator, training):
    """Create feed dicts for use in fitting or prediction.

    Parameters
    ----------
    generator: Generator
      the feed dict generator that was passed to fit_generator() or predict_on_generator()
    training: bool
      True during training, False during prediction
    """
    train_value = 1.0 if training else 0.0
    if self.queue_installed:
      while True:
        yield {self._training_placeholder: train_value}
    for d in generator:
      feed_dict = dict(d)
      feed_dict[self._training_placeholder] = train_value
      yield feed_dict


def _enqueue_batch(tg, generator, graph, sess, n_enqueued, final_sample):
  """
@@ -813,7 +840,12 @@ def _enqueue_batch(tg, generator, graph, sess, n_enqueued, final_sample):
      enq = {}
      enq[tg._training_placeholder] = 1.0
      for layer in tg.features + tg.labels + tg.task_weights:
        enq[tg.get_pre_q_input(layer).out_tensor] = feed_dict[layer]
        if layer in feed_dict:
          value = feed_dict[layer]
        else:
          value = np.zeros(
              [0] + list(layer.shape[1:]), dtype=layer.dtype.as_numpy_dtype)
        enq[tg.get_pre_q_input(layer).out_tensor] = value
      sess.run(tg.input_queue.out_tensor, feed_dict=enq)
      n_enqueued[0] += 1
    final_sample[0] = n_enqueued[0]
+1 −1
Original line number Diff line number Diff line
@@ -388,7 +388,7 @@ class TestTensorGraph(unittest.TestCase):

  def test_submodels(self):
    """Test optimizing submodels."""
    tg = dc.models.TensorGraph(learning_rate=0.1, batch_size=1)
    tg = dc.models.TensorGraph(learning_rate=0.1, batch_size=1, use_queue=False)
    features = Feature(shape=(None, 1))
    var1 = Variable([2.0])
    var2 = Variable([2.0])