Commit 68928823 authored by Bharath Ramsundar's avatar Bharath Ramsundar Committed by GitHub
Browse files

Merge pull request #557 from peastman/rl

Support Reinforcement Learning
parents 09c97cd2 9c825b30
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -16,3 +16,4 @@ import deepchem.trans
import deepchem.utils
import deepchem.dock
import deepchem.molnet
import deepchem.rl
+20 −34
Original line number Diff line number Diff line
@@ -11,10 +11,10 @@ class Layer(object):
  layer_number_dict = {}

  def __init__(self, in_layers=None, **kwargs):
    if "name" not in kwargs:
      self.name = "%s_%s" % (self.__class__.__name__, self._get_layer_number())
    else:
    if "name" in kwargs:
      self.name = kwargs['name']
    else:
      self.name = None
    if "tensorboard" not in kwargs:
      self.tensorboard = False
    else:
@@ -47,19 +47,6 @@ class Layer(object):
  def create_tensor(self, in_layers=None, set_tensors=True, **kwargs):
    raise NotImplementedError("Subclasses must implement for themselves")

  def __key(self):
    return self.name

  def __eq__(x, y):
    if x is None or y is None:
      return False
    if type(x) != type(y):
      return False
    return x.__key() == y.__key()

  def __hash__(self):
    return hash(self.__key())

  def shared(self, in_layers):
    """
    Share weights with different in tensors and a new out tensor
@@ -143,8 +130,6 @@ class Dense(Layer):
      biases_initializer=tf.zeros_initializer,
      weights_initializer=tf.contrib.layers.variance_scaling_initializer,
      time_series=False,
      scope_name=None,
      reuse=False,
      **kwargs):
    """Create a dense layer.

@@ -166,10 +151,6 @@ class Dense(Layer):
      the initializer for weight values
    time_series: bool
      if True, the dense layer is applied to each element of a batch in sequence
    scope_name: str
      an optional scope name for the layer's variables
    reuse: bool
      whether or not the layer and its variables should be reused
    """
    super(Dense, self).__init__(**kwargs)
    self.out_channels = out_channels
@@ -178,10 +159,8 @@ class Dense(Layer):
    self.biases_initializer = biases_initializer
    self.weights_initializer = weights_initializer
    self.time_series = time_series
    self.reuse = reuse
    if scope_name is None:
      scope_name = self.name
    self.scope_name = scope_name
    self._reuse = False
    self._shared_with = None

  def create_tensor(self, in_layers=None, set_tensors=True, **kwargs):
    if in_layers is None:
@@ -201,8 +180,8 @@ class Dense(Layer):
          activation_fn=self.activation_fn,
          biases_initializer=biases_initializer,
          weights_initializer=self.weights_initializer(),
          scope=self.scope_name,
          reuse=self.reuse,
          scope=self._get_scope_name(),
          reuse=self._reuse,
          trainable=True)
      return self.out_tensor
    dense_fn = lambda x: tf.contrib.layers.fully_connected(x,
@@ -210,8 +189,8 @@ class Dense(Layer):
                                                           activation_fn=self.activation_fn,
                                                           biases_initializer=biases_initializer,
                                                           weights_initializer=self.weights_initializer(),
                                                           scope=self.scope_name,
                                                           reuse=self.reuse,
                                                           scope=self._get_scope_name(),
                                                           reuse=self._reuse,
                                                           trainable=True)
    out_tensor = tf.map_fn(dense_fn, parent.out_tensor)
    if set_tensors:
@@ -221,16 +200,23 @@ class Dense(Layer):
    return out_tensor

  def shared(self, in_layers):
    self.reuse = True
    return Dense(
    copy = Dense(
        self.out_channels,
        self.activation_fn,
        self.biases_initializer,
        self.weights_initializer,
        time_series=self.time_series,
        reuse=self.reuse,
        scope_name=self.scope_name,
        in_layers=in_layers)
    self._reuse = True
    copy._reuse = True
    copy._shared_with = self
    return copy

  def _get_scope_name(self):
    if self._shared_with is None:
      return self.name
    else:
      return self._shared_with._get_scope_name()


class Flatten(Layer):
+12 −4
Original line number Diff line number Diff line
@@ -26,6 +26,7 @@ class TensorGraph(Model):
               random_seed=None,
               use_queue=True,
               mode="regression",
               graph=None,
               **kwargs):
    """
    TODO(LESWING) allow a model to change its learning rate
@@ -47,6 +48,9 @@ class TensorGraph(Model):
      "regression" or "classification".  "classification" models on
      predict will do an argmax(axis=2) to determine the class of the
      prediction.
    graph: tensorflow.Graph
      the Graph in which to create Tensorflow objects.  If None, a new Graph
      is created.
    kwargs
    """

@@ -68,7 +72,7 @@ class TensorGraph(Model):
    # See TensorGraph._get_tf() for more details on lazy construction
    self.tensor_objects = {
        "FileWriter": None,
        "Graph": tf.Graph(),
        "Graph": graph,
        "train_op": None,
        "summary_op": None,
    }
@@ -87,6 +91,8 @@ class TensorGraph(Model):
    self.model_class = None

  def _add_layer(self, layer):
    if layer.name is None:
      layer.name = "%s_%s" % (layer.__class__.__name__, len(self.layers) + 1)
    if layer.name in self.layers:
      return
    if isinstance(layer, Feature):
@@ -98,8 +104,8 @@ class TensorGraph(Model):
    self.nxgraph.add_node(layer.name)
    self.layers[layer.name] = layer
    for in_layer in layer.in_layers:
      self.nxgraph.add_edge(in_layer.name, layer.name)
      self._add_layer(in_layer)
      self.nxgraph.add_edge(in_layer.name, layer.name)

  def fit(self,
          dataset,
@@ -314,7 +320,6 @@ class TensorGraph(Model):
        tf.set_random_seed(self.random_seed)
      self._install_queue()
      order = self.topsort()
      print(order)
      for node in order:
        with tf.name_scope(node):
          node_layer = self.layers[node]
@@ -344,6 +349,7 @@ class TensorGraph(Model):
    shapes = []
    pre_q_inputs = []
    q = InputFifoQueue(shapes, names, in_layers=pre_q_inputs)
    q.name = "%s_%s" % (q.__class__.__name__, len(self.layers) + 1)

    for layer in self.features + self.labels + self.task_weights:
      pre_q_input = layer.create_pre_q(self.batch_size)
@@ -448,8 +454,10 @@ class TensorGraph(Model):
      self.tensor_objects['Graph'] = tf.Graph()
    elif obj == "FileWriter":
      self.tensor_objects['FileWriter'] = tf.summary.FileWriter(self.model_dir)
    elif obj == 'Optimizer':
      self.tensor_objects['Optimizer'] = self.optimizer()
    elif obj == 'train_op':
      self.tensor_objects['train_op'] = self.optimizer().minimize(
      self.tensor_objects['train_op'] = self._get_tf('Optimizer').minimize(
          self.loss.out_tensor)
    elif obj == 'summary_op':
      self.tensor_objects['summary_op'] = tf.summary.merge_all(
+137 −0
Original line number Diff line number Diff line
"""Interface for reinforcement learning."""

from deepchem.rl.a3c import A3C


class Environment(object):
  """An environment in which an actor performs actions to accomplish a task.

  An environment has a current state, which is represented as a list of NumPy
  arrays.  When an action is taken, that causes the state to be updated.  Exactly
  what is meant by an "action" is defined by each subclass.  As far as this interface
  is concerned, it is simply an arbitrary object.  The environment also computes
  a reward for each action, and reports when the task has been terminated
  (meaning that no more actions may be taken).

  Environment objects should be written to support pickle and deepcopy operations.
  Many algorithms involve creating multiple copies of the Environment, possibly
  running in different processes or even on different computers.
  """

  def __init__(self, state_shape, n_actions):
    """Subclasses should call the superclass constructor in addition to doing their own initialization."""
    self._state_shape = state_shape
    self._n_actions = n_actions
    self._state = None
    self._terminated = None

  @property
  def state(self):
    """The current state of the environment, represented as a list of NumPy arrays.

    If reset() has not yet been called at least once, this is undefined.
    """
    return self._state

  @property
  def terminated(self):
    """Whether the task has reached its end.

    If reset() has not yet been called at least once, this is undefined.
    """
    return self._terminated

  @property
  def state_shape(self):
    """The shape of the arrays that describe a state.

    This returns a list of tuples, where each tuple is the shape of one array.
    """
    return self._state_shape

  @property
  def n_actions(self):
    """The number of possible actions that can be performed in this Environment."""
    return self._n_actions

  def reset(self):
    """Initialize the environment in preparation for doing calculations with it.

    This must be called before calling step() or querying the state.  You can call it
    again later to reset the environment back to its original state.
    """
    raise NotImplemented("Subclasses must implement this")

  def step(self, action):
    """Take a time step by performing an action.

    This causes the "state" and "terminated" properties to be updated.

    Parameters
    ----------
    action: object
      an object describing the action to take

    Returns
    -------
    the reward earned by taking the action, represented as a float point number
    (higher values are better)
    """
    raise NotImplemented("Subclasses must implement this")


class GymEnvironment(Environment):
  """This is a convenience class for working with environments from OpenAI Gym."""

  def __init__(self, name):
    """Create an Environment wrapping the OpenAI Gym environment with a specified name."""
    import gym
    self.env = gym.make(name)
    self.name = name
    super().__init__([self.env.observation_space.shape],
                     self.env.action_space.n)

  def reset(self):
    state = self.env.reset()
    self._state = [state]
    self._terminated = False

  def step(self, action):
    state, reward, self._terminated, info = self.env.step(action)
    self._state = [state]
    return reward

  def __deepcopy__(self, memo):
    return GymEnvironment(self.name)


class Policy(object):
  """A policy for taking actions within an environment.

  A policy is defined by a set of TensorGraph Layer objects that perform the
  necessary calculations.  There are many algorithms for reinforcement learning,
  and they differ in what values they require a policy to compute.  That makes
  it impossible to define a single interface allowing any policy to be optimized
  with any algorithm.  Instead, this interface just tries to be as flexible and
  generic as possible.  Each algorithm must document what values it expects
  create_layers() to return.

  Policy objects should be written to support pickling.  Many algorithms involve
  creating multiple copies of the Policy, possibly running in different processes
  or even on different computers.
  """

  def create_layers(self, state, **kwargs):
    """Create the TensorGraph Layers that define the policy.

    The arguments always include a list of Feature layers representing the current
    state of the environment (one layer for each array in the state).  Depending on
    the algorithm being used, other arguments might get passed as well.  It is up
    to each algorithm to document that.

    This method should construct and return a dict that maps strings to Layer
    objects.  Each algorithm must document what Layers it expects the policy to
    create.  If this method is called multiple times, it should create a new set
    of Layers every time.
    """
    raise NotImplemented("Subclasses must implement this")

deepchem/rl/a3c.py

0 → 100644
+284 −0
Original line number Diff line number Diff line
"""Asynchronous Advantage Actor-Critic (A3C) algorithm for reinforcement learning."""

from deepchem.models import TensorGraph
from deepchem.models.tensorgraph import TFWrapper
from deepchem.models.tensorgraph.layers import Feature, Weights, Label, Layer
import numpy as np
import tensorflow as tf
import copy
import multiprocessing
import os
import re
import threading


class A3CLoss(Layer):
  """This layer computes the loss function for A3C."""

  def __init__(self, value_weight, entropy_weight, **kwargs):
    super(A3CLoss, self).__init__(**kwargs)
    self.value_weight = value_weight
    self.entropy_weight = entropy_weight

  def create_tensor(self, **kwargs):
    reward, action, prob, value = [layer.out_tensor for layer in self.in_layers]
    log_prob = tf.log(prob)
    policy_loss = -tf.reduce_sum(
        (reward - value) * tf.reduce_sum(action * log_prob))
    value_loss = tf.reduce_sum(tf.square(reward - value))
    entropy = -tf.reduce_sum(prob * log_prob)
    self.out_tensor = policy_loss + self.value_weight * value_loss - self.entropy_weight * entropy
    return self.out_tensor


def _create_feed_dict(features, state):
  return dict((f.out_tensor, np.expand_dims(s, axis=0))
              for f, s in zip(features, state))


class A3C(object):
  """
  Implements the Asynchronous Advantage Actor-Critic (A3C) algorithm for reinforcement learning.

  This algorithm requires the policy to output two quantities: a vector giving the probability of
  taking each action, and an estimate of the value function for the current state.  It optimizes
  both outputs at once using a loss that is the sum of three terms:

  1. The policy loss, which seeks to maximize the discounted reward for each action.
  2. The value loss, which tries to make the value estimate match the actual discounted reward
     that was attained at each step.
  3. An entropy term to encourage exploration.

  This class only supports environments with discrete action spaces, not continuous ones.  The
  "action" argument passed to the environment is an integer, giving the index of the action to perform.
  """

  def __init__(self,
               env,
               policy,
               max_rollout_length=20,
               discount_factor=0.99,
               value_weight=1.0,
               entropy_weight=0.01,
               model_dir=None):
    """Create an object for optimizing a policy.

    Parameters
    ----------
    env: Environment
      the Environment to interact with
    policy: Policy
      the Policy to optimize.  Its create_layers() method must return a map containing the
      keys 'action_prob' and 'value', corresponding to the action probabilities and value estimate
    max_rollout_length: int
      the maximum length of rollouts to generate
    discount_factor: float
      the discount factor to use when computing rewards
    value_weight: float
      a scale factor for the value loss term in the loss function
    entropy_weight: float
      a scale factor for the entropy term in the loss function
    model_dir: str
      the directory in which the model will be saved.  If None, a temporary directory will be created.
    """
    self._env = env
    self._policy = policy
    self.max_rollout_length = max_rollout_length
    self.discount_factor = discount_factor
    self.value_weight = value_weight
    self.entropy_weight = entropy_weight
    self.optimizer = TFWrapper(
        tf.train.AdamOptimizer, learning_rate=0.001, beta1=0.9, beta2=0.999)
    (self._graph, self._features, rewards, actions, self._action_prob,
     self._value) = self._build_graph(None, 'global', model_dir)
    with self._graph._get_tf("Graph").as_default():
      self._session = tf.Session()

  def _build_graph(self, tf_graph, scope, model_dir):
    """Construct a TensorGraph containing the policy and loss calculations."""
    features = [Feature(shape=[None] + list(s)) for s in self._env.state_shape]
    policy_layers = self._policy.create_layers(features)
    action_prob = policy_layers['action_prob']
    value = policy_layers['value']
    rewards = Weights(shape=(None, 1))
    actions = Label(shape=(None, self._env.n_actions))
    loss = A3CLoss(
        self.value_weight,
        self.entropy_weight,
        in_layers=[rewards, actions, action_prob, value])
    graph = TensorGraph(
        batch_size=self.max_rollout_length,
        use_queue=False,
        graph=tf_graph,
        model_dir=model_dir)
    graph.add_output(action_prob)
    graph.add_output(value)
    graph.set_loss(loss)
    graph.set_optimizer(self.optimizer)
    with graph._get_tf("Graph").as_default():
      with tf.variable_scope(scope):
        graph.build()
    return graph, features, rewards, actions, action_prob, value

  def fit(self, total_steps, max_checkpoints_to_keep=5,
          checkpoint_interval=600):
    """Train the policy.

    Parameters
    ----------
    total_steps: int
      the total number of time steps to perform on the environment, across all rollouts
      on all threads
    max_checkpoints_to_keep: int
      the maximum number of checkpoint files to keep.  When this number is reached, older
      files are deleted.
    checkpoint_interval: float
      the time interval at which to save checkpoints, measured in seconds
    """
    with self._graph._get_tf("Graph").as_default():
      train_op = self._graph._get_tf('train_op')
      self._session.run(tf.global_variables_initializer())
      step_count = [0]
      workers = []
      threads = []
      for i in range(multiprocessing.cpu_count()):
        workers.append(_Worker(self, i))
      for worker in workers:
        thread = threading.Thread(
            name=worker.scope,
            target=lambda: worker.run(step_count, total_steps))
        threads.append(thread)
        thread.start()
      saver = tf.train.Saver(max_to_keep=max_checkpoints_to_keep)
      checkpoint_index = 0
      while True:
        threads = [t for t in threads if t.isAlive()]
        if len(threads) > 0:
          threads[0].join(checkpoint_interval)
        checkpoint_index += 1
        saver.save(
            self._session, self._graph.save_file, global_step=checkpoint_index)
        if len(threads) == 0:
          break

  def predict(self, state):
    """Compute the policy's output predictions for a state.

    Parameters
    ----------
    state: array
      the state of the environment for which to generate predictions

    Returns
    -------
    the array of action probabilities, and the estimated value function
    """
    with self._graph._get_tf("Graph").as_default():
      feed_dict = _create_feed_dict(self._features, state)
      return self._session.run(
          [self._action_prob.out_tensor, self._value.out_tensor],
          feed_dict=feed_dict)

  def select_action(self, state, deterministic=False):
    """Select an action to perform based on the environment's state.

    Parameters
    ----------
    state: array
      the state of the environment for which to select an action
    deterministic: bool
      if True, always return the best action (that is, the one with highest probability).
      If False, randomly select an action based on the computed probabilities.

    Returns
    -------
    the index of the selected action
    """
    with self._graph._get_tf("Graph").as_default():
      feed_dict = _create_feed_dict(self._features, state)
      probabilities = self._session.run(
          self._action_prob.out_tensor, feed_dict=feed_dict)
      if deterministic:
        return probabilities.argmax()
      else:
        return np.random.choice(
            np.arange(self._env.n_actions), p=probabilities[0])

  def restore(self):
    """Reload the model parameters from the most recent checkpoint file."""
    last_checkpoint = tf.train.latest_checkpoint(self._graph.model_dir)
    if last_checkpoint is None:
      raise ValueError('No checkpoint found')
    with self._graph._get_tf("Graph").as_default():
      saver = tf.train.Saver()
      saver.restore(self._session, last_checkpoint)


class _Worker(object):
  """A Worker object is created for each training thread."""

  def __init__(self, a3c, index):
    self.a3c = a3c
    self.index = index
    self.scope = 'worker%d' % index
    self.env = copy.deepcopy(a3c._env)
    self.env.reset()
    self.graph, self.features, self.rewards, self.actions, self.action_prob, self.value = a3c._build_graph(
        a3c._graph._get_tf('Graph'), self.scope, None)
    with a3c._graph._get_tf("Graph").as_default():
      local_vars = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES,
                                     self.scope)
      global_vars = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES,
                                      'global')
      gradients = tf.gradients(self.graph.loss.out_tensor, local_vars)
      grads_and_vars = list(zip(gradients, global_vars))
      self.train_op = a3c._graph._get_tf('Optimizer').apply_gradients(
          grads_and_vars)
      self.update_local_variables = tf.group(
          * [tf.assign(v1, v2) for v1, v2 in zip(local_vars, global_vars)])

  def run(self, step_count, total_steps):
    with self.graph._get_tf("Graph").as_default():
      session = self.a3c._session
      while step_count[0] < total_steps:
        session.run(self.update_local_variables)
        episode_states, episode_actions, episode_rewards = self.create_rollout()
        feed_dict = {}
        for f, s in zip(self.features, episode_states):
          feed_dict[f.out_tensor] = s
        feed_dict[self.rewards.out_tensor] = episode_rewards
        feed_dict[self.actions.out_tensor] = episode_actions
        session.run(self.train_op, feed_dict=feed_dict)
        step_count[0] += len(episode_actions)

  def create_rollout(self):
    """Generate a rollout."""
    n_actions = self.env.n_actions
    session = self.a3c._session
    states = [[] for i in range(len(self.features))]
    actions = []
    rewards = []
    for i in range(self.a3c.max_rollout_length):
      if self.env.terminated:
        break
      state = self.env.state
      for j in range(len(state)):
        states[j].append(state[j])
      feed_dict = _create_feed_dict(self.features, state)
      probabilities = session.run(
          self.action_prob.out_tensor, feed_dict=feed_dict)
      action = np.random.choice(np.arange(n_actions), p=probabilities[0])
      actions.append(np.zeros(n_actions))
      actions[i][action] = 1.0
      rewards.append(self.env.step(action))
    if not self.env.terminated:
      # Add an estimate of the reward for the rest of the episode.
      feed_dict = _create_feed_dict(self.features, self.env.state)
      rewards[-1] += self.a3c.discount_factor * session.run(
          self.value.out_tensor, feed_dict)
    for j in range(len(rewards) - 1, 0, -1):
      rewards[j - 1] += self.a3c.discount_factor * rewards[j]
    if self.env.terminated:
      self.env.reset()
    return np.array(states), np.array(actions), np.array(rewards).reshape(
        (len(rewards), 1))
Loading