Unverified Commit e20c1451 authored by Bharath Ramsundar's avatar Bharath Ramsundar Committed by GitHub
Browse files

Merge pull request #1022 from peastman/continuous

A3C supports continuous action spaces
parents 8f329dff f1a46112
Loading
Loading
Loading
Loading
+55 −9
Original line number Diff line number Diff line
@@ -10,20 +10,47 @@ class Environment(object):

  An environment has a current state, which is represented as either a single NumPy
  array, or optionally a list of NumPy arrays.  When an action is taken, that causes
  the state to be updated.  Exactly what is meant by an "action" is defined by each
  subclass.  As far as this interface is concerned, it is simply an arbitrary object.
  The environment also computes a reward for each action, and reports when the task
  has been terminated (meaning that no more actions may be taken).
  the state to be updated.  The environment also computes a reward for each action,
  and reports when the task has been terminated (meaning that no more actions may
  be taken).

  Two types of actions are supported.  For environments with discrete action spaces,
  the action is an integer specifying the index of the action to perform (out of a
  fixed list of possible actions).  For environments with continuous action spaces,
  the action is a NumPy array.

  Environment objects should be written to support pickle and deepcopy operations.
  Many algorithms involve creating multiple copies of the Environment, possibly
  running in different processes or even on different computers.
  """

  def __init__(self, state_shape, n_actions, state_dtype=None):
    """Subclasses should call the superclass constructor in addition to doing their own initialization."""
  def __init__(self,
               state_shape,
               n_actions=None,
               state_dtype=None,
               action_shape=None):
    """Subclasses should call the superclass constructor in addition to doing their own initialization.

    A value should be provided for either n_actions (for discrete action spaces)
    or action_shape (for continuous action spaces), but not both.

    Parameters
    ----------
    state_shape: tuple or list of tuples
      the shape(s) of the array(s) making up the state
    n_actions: int
      the number of discrete actions that can be performed.  If the action space
      is continuous, this should be None.
    state_dtype: dtype or list of dtypes
      the type(s) of the array(s) making up the state.  If this is None, all
      arrays are assumed to be float32.
    action_shape: tuple
      the shape of the array describing an action.  If the action space
      is discrete, this should be none.
    """
    self._state_shape = state_shape
    self._n_actions = n_actions
    self._action_shape = action_shape
    self._state = None
    self._terminated = None
    if state_dtype is None:
@@ -74,9 +101,20 @@ class Environment(object):

  @property
  def n_actions(self):
    """The number of possible actions that can be performed in this Environment."""
    """The number of possible actions that can be performed in this Environment.

    If the environment uses a continuous action space, this returns None.
    """
    return self._n_actions

  @property
  def action_shape(self):
    """The expected shape of NumPy arrays representing actions.

    If the environment uses a discrete action space, this returns None.
    """
    return self._action_shape

  def reset(self):
    """Initialize the environment in preparation for doing calculations with it.

@@ -111,8 +149,13 @@ class GymEnvironment(Environment):
    import gym
    self.env = gym.make(name)
    self.name = name
    space = self.env.action_space
    if 'n' in dir(space):
      super(GymEnvironment, self).__init__(self.env.observation_space.shape,
                                         self.env.action_space.n)
                                           space.n)
    else:
      super(GymEnvironment, self).__init__(
          self.env.observation_space.shape, action_shape=space.shape)

  def reset(self):
    self._state = self.env.reset()
@@ -122,6 +165,9 @@ class GymEnvironment(Environment):
    self._state, reward, self._terminated, info = self.env.step(action)
    return reward

  def __deepcopy__(self, memo):
    return GymEnvironment(self.name)


class Policy(object):
  """A policy for taking actions within an environment.
+145 −69
Original line number Diff line number Diff line
@@ -13,11 +13,11 @@ import re
import threading


class A3CLoss(Layer):
  """This layer computes the loss function for A3C."""
class A3CLossDiscrete(Layer):
  """This layer computes the loss function for A3C with discrete action spaces."""

  def __init__(self, value_weight, entropy_weight, **kwargs):
    super(A3CLoss, self).__init__(**kwargs)
    super(A3CLossDiscrete, self).__init__(**kwargs)
    self.value_weight = value_weight
    self.entropy_weight = entropy_weight

@@ -35,23 +35,50 @@ class A3CLoss(Layer):
    return self.out_tensor


class A3CLossContinuous(Layer):
  """This layer computes the loss function for A3C with continuous action spaces."""

  def __init__(self, value_weight, entropy_weight, **kwargs):
    super(A3CLossContinuous, self).__init__(**kwargs)
    self.value_weight = value_weight
    self.entropy_weight = entropy_weight

  def create_tensor(self, **kwargs):
    reward, action, mean, std, value, advantage = [
        layer.out_tensor for layer in self.in_layers
    ]
    distrib = tf.distributions.Normal(mean, std)
    reduce_axes = list(range(1, len(action.shape)))
    log_prob = tf.reduce_sum(distrib.log_prob(action), reduce_axes)
    policy_loss = -tf.reduce_mean(advantage * log_prob)
    value_loss = tf.reduce_mean(tf.square(reward - value))
    entropy = tf.reduce_mean(distrib.entropy())
    self.out_tensor = policy_loss + self.value_weight * value_loss - self.entropy_weight * entropy
    return self.out_tensor


class A3C(object):
  """
  Implements the Asynchronous Advantage Actor-Critic (A3C) algorithm for reinforcement learning.

  The algorithm is described in Mnih et al, "Asynchronous Methods for Deep Reinforcement Learning"
  (https://arxiv.org/abs/1602.01783).  This class requires the policy to output two quantities:
  a vector giving the probability of taking each action, and an estimate of the value function for
  the current state.  It optimizes both outputs at once using a loss that is the sum of three terms:
  (https://arxiv.org/abs/1602.01783).  This class supports environments with both discrete and
  continuous action spaces.  For discrete action spaces, the "action" argument passed to the
  environment is an integer giving the index of the action to perform.  The policy must output
  a vector called "action_prob" giving the probability of taking each action.  For continous
  action spaces, the action is an array where each element is chosen independently from a
  normal distribution.  The policy must output two arrays of the same shape: "action_mean"
  gives the mean value for each element, and "action_std" gives the standard deviation for
  each element.  In either case, the policy must also output a scalar called "value" which
  is an estimate of the value function for the current state.

  The algorithm optimizes all outputs at once using a loss that is the sum of three terms:

  1. The policy loss, which seeks to maximize the discounted reward for each action.
  2. The value loss, which tries to make the value estimate match the actual discounted reward
     that was attained at each step.
  3. An entropy term to encourage exploration.

  This class only supports environments with discrete action spaces, not continuous ones.  The
  "action" argument passed to the environment is an integer, giving the index of the action to perform.

  This class supports Generalized Advantage Estimation as described in Schulman et al., "High-Dimensional
  Continuous Control Using Generalized Advantage Estimation" (https://arxiv.org/abs/1506.02438).
  This is a method of trading off bias and variance in the advantage estimate, which can sometimes
@@ -97,7 +124,8 @@ class A3C(object):
      the Environment to interact with
    policy: Policy
      the Policy to optimize.  Its create_layers() method must return a dict containing the
      keys 'action_prob' and 'value', corresponding to the action probabilities and value estimate
      keys 'action_prob' and 'value' (for discrete action spaces) or 'action_mean', 'action_std',
      and 'value' (for continuous action spaces)
    max_rollout_length: int
      the maximum length of rollouts to generate
    discount_factor: float
@@ -128,9 +156,14 @@ class A3C(object):
      self._optimizer = Adam(learning_rate=0.001, beta1=0.9, beta2=0.999)
    else:
      self._optimizer = optimizer
    fields = self._build_graph(None, 'global', model_dir)
    if self.continuous:
      (self._graph, self._features, self._rewards, self._actions,
       self._action_mean, self._action_std, self._value,
       self._advantages) = fields
    else:
      (self._graph, self._features, self._rewards, self._actions,
     self._action_prob, self._value, self._advantages) = self._build_graph(
         None, 'global', model_dir)
       self._action_prob, self._value, self._advantages) = fields
    with self._graph._get_tf("Graph").as_default():
      self._session = tf.Session()
    self._rnn_states = self._graph.rnn_zero_states
@@ -146,15 +179,9 @@ class A3C(object):
    for s, d in zip(state_shape, state_dtype):
      features.append(Feature(shape=[None] + list(s), dtype=tf.as_dtype(d)))
    policy_layers = self._policy.create_layers(features)
    action_prob = policy_layers['action_prob']
    value = policy_layers['value']
    rewards = Weights(shape=(None,))
    advantages = Weights(shape=(None,))
    actions = Label(shape=(None, self._env.n_actions))
    loss = A3CLoss(
        self.value_weight,
        self.entropy_weight,
        in_layers=[rewards, actions, action_prob, value, advantages])
    graph = TensorGraph(
        batch_size=self.max_rollout_length,
        use_queue=False,
@@ -162,13 +189,37 @@ class A3C(object):
        model_dir=model_dir)
    for f in features:
      graph._add_layer(f)
    if 'action_prob' in policy_layers:
      self.continuous = False
      action_prob = policy_layers['action_prob']
      actions = Label(shape=(None, self._env.n_actions))
      loss = A3CLossDiscrete(
          self.value_weight,
          self.entropy_weight,
          in_layers=[rewards, actions, action_prob, value, advantages])
      graph.add_output(action_prob)
    else:
      self.continuous = True
      action_mean = policy_layers['action_mean']
      action_std = policy_layers['action_std']
      actions = Label(shape=[None] + list(self._env.action_shape))
      loss = A3CLossContinuous(
          self.value_weight,
          self.entropy_weight,
          in_layers=[
              rewards, actions, action_mean, action_std, value, advantages
          ])
      graph.add_output(action_mean)
      graph.add_output(action_std)
    graph.add_output(value)
    graph.set_loss(loss)
    graph.set_optimizer(self._optimizer)
    with graph._get_tf("Graph").as_default():
      with tf.variable_scope(scope):
        graph.build()
    if self.continuous:
      return graph, features, rewards, actions, action_mean, action_std, value, advantages
    else:
      return graph, features, rewards, actions, action_prob, value, advantages

  def fit(self,
@@ -246,17 +297,11 @@ class A3C(object):
    -------
    the array of action probabilities, and the estimated value function
    """
    if not self._state_is_list:
      state = [state]
    with self._graph._get_tf("Graph").as_default():
      feed_dict = self._create_feed_dict(state, use_saved_states)
      tensors = [self._action_prob.out_tensor, self._value.out_tensor]
      if save_states:
        tensors += self._graph.rnn_final_states
      results = self._session.run(tensors, feed_dict=feed_dict)
      if save_states:
        self._rnn_states = results[2:]
      return results[:2]
    if self.continuous:
      outputs = [self._action_mean, self._action_std, self._value]
    else:
      outputs = [self._action_prob, self._value]
    return self._predict_outputs(outputs, state, use_saved_states, save_states)

  def select_action(self,
                    state,
@@ -290,22 +335,13 @@ class A3C(object):
    -------
    the index of the selected action
    """
    if not self._state_is_list:
      state = [state]
    with self._graph._get_tf("Graph").as_default():
      feed_dict = self._create_feed_dict(state, use_saved_states)
      tensors = [self._action_prob.out_tensor]
      if save_states:
        tensors += self._graph.rnn_final_states
      results = self._session.run(tensors, feed_dict=feed_dict)
      probabilities = results[0]
      if save_states:
        self._rnn_states = results[1:]
      if deterministic:
        return probabilities.argmax()
    if self.continuous:
      tensors = [self._action_mean, self._action_std]
    else:
        return np.random.choice(
            np.arange(self._env.n_actions), p=probabilities[0])
      tensors = [self._action_prob]
    outputs = self._predict_outputs(tensors, state, use_saved_states,
                                    save_states)
    return self._select_action_from_outputs(outputs, deterministic)

  def restore(self):
    """Reload the model parameters from the most recent checkpoint file."""
@@ -330,6 +366,37 @@ class A3C(object):
      feed_dict[placeholder] = value
    return feed_dict

  def _predict_outputs(self, outputs, state, use_saved_states, save_states):
    """Compute a set of outputs for a state. """
    if not self._state_is_list:
      state = [state]
    with self._graph._get_tf("Graph").as_default():
      feed_dict = self._create_feed_dict(state, use_saved_states)
      if save_states:
        tensors = outputs + self._graph.rnn_final_states
      else:
        tensors = outputs
      results = self._session.run(tensors, feed_dict=feed_dict)
      if save_states:
        self._rnn_states = results[len(outputs):]
      return results[:len(outputs)]

  def _select_action_from_outputs(self, outputs, deterministic):
    """Given the policy outputs, select an action to perform."""
    if self.continuous:
      action_mean, action_std = outputs
      if deterministic:
        return action_mean[0]
      else:
        return np.random.normal(action_mean[0], action_std[0])
    else:
      action_prob = outputs[0]
      if deterministic:
        return action_prob.argmax()
      else:
        action_prob = action_prob.flatten()
        return np.random.choice(np.arange(len(action_prob)), p=action_prob)


class _Worker(object):
  """A Worker object is created for each training thread."""
@@ -340,8 +407,11 @@ class _Worker(object):
    self.scope = 'worker%d' % index
    self.env = copy.deepcopy(a3c._env)
    self.env.reset()
    self.graph, self.features, self.rewards, self.actions, self.action_prob, self.value, self.advantages = a3c._build_graph(
        a3c._graph._get_tf('Graph'), self.scope, None)
    fields = a3c._build_graph(a3c._graph._get_tf('Graph'), self.scope, None)
    if a3c.continuous:
      self.graph, self.features, self.rewards, self.actions, self.action_mean, self.action_std, self.value, self.advantages = fields
    else:
      self.graph, self.features, self.rewards, self.actions, self.action_prob, self.value, self.advantages = fields
    self.rnn_states = self.graph.rnn_zero_states
    with a3c._graph._get_tf("Graph").as_default():
      local_vars = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES,
@@ -386,13 +456,16 @@ class _Worker(object):
      state = self.env.state
      states.append(state)
      feed_dict = self.create_feed_dict(state)
      if self.a3c.continuous:
        tensors = [self.action_mean, self.action_std, self.value]
      else:
        tensors = [self.action_prob, self.value]
      results = session.run(
          [self.action_prob.out_tensor, self.value.out_tensor] +
          self.graph.rnn_final_states,
          feed_dict=feed_dict)
      probabilities, value = results[:2]
      self.rnn_states = results[2:]
      action = np.random.choice(np.arange(n_actions), p=probabilities[0])
          tensors + self.graph.rnn_final_states, feed_dict=feed_dict)
      value = results[len(tensors) - 1]
      self.rnn_states = results[len(tensors):]
      action = self.a3c._select_action_from_outputs(results[:len(tensors) - 1],
                                                    False)
      actions.append(action)
      values.append(float(value))
      rewards.append(self.env.step(action))
@@ -431,10 +504,14 @@ class _Worker(object):
          1] += self.a3c.discount_factor * self.a3c.advantage_lambda * advantages[
              j]

    # Convert the actions to one-hot.
    # Record the actions, converting to one-hot if necessary.

    n_actions = self.env.n_actions
    actions_matrix = []
    if self.a3c.continuous:
      for action in actions:
        actions_matrix.append(action)
    else:
      n_actions = self.env.n_actions
      for action in actions:
        a = np.zeros(n_actions)
        a[action] = 1.0
@@ -457,10 +534,10 @@ class _Worker(object):
                                  initial_rnn_states):
      feed_dict[placeholder] = value
    for f, s in zip(self.features, state_arrays):
      feed_dict[f.out_tensor] = s
    feed_dict[self.rewards.out_tensor] = discounted_rewards
    feed_dict[self.actions.out_tensor] = actions_matrix
    feed_dict[self.advantages.out_tensor] = advantages
      feed_dict[f] = s
    feed_dict[self.rewards] = discounted_rewards
    feed_dict[self.actions] = actions_matrix
    feed_dict[self.advantages] = advantages
    feed_dict[self.global_step] = step_count
    self.a3c._session.run(self.train_op, feed_dict=feed_dict)

@@ -484,8 +561,7 @@ class _Worker(object):
      feed_dict[f.out_tensor] = s
    values = self.a3c._session.run(self.value.out_tensor, feed_dict=feed_dict)
    values = np.append(values.flatten(), 0.0)
    self.process_rollout(hindsight_states, actions,
                         np.array(rewards),
    self.process_rollout(hindsight_states, actions, np.array(rewards),
                         np.array(values), initial_rnn_states, step_count)

  def create_feed_dict(self, state):
+66 −1
Original line number Diff line number Diff line
from flaky import flaky

import deepchem as dc
from deepchem.models.tensorgraph.layers import Reshape, Variable, SoftMax, GRU, Dense
from deepchem.models.tensorgraph.layers import Reshape, Variable, SoftMax, GRU, Dense, Constant
from deepchem.models.tensorgraph.optimizers import Adam, PolynomialDecay
import numpy as np
import tensorflow as tf
@@ -230,3 +230,68 @@ class TestA3C(unittest.TestCase):
      if np.array_equal(env.state[:2], env.state[2:]):
        pass_count += 1
    assert pass_count >= 3

  def test_continuous(self):
    """Test A3C on an environment with a continous action space."""

    # The state consists of two numbers: a current value and a target value.
    # The policy just needs to learn to output the target value (or at least
    # move toward it).

    class TestEnvironment(dc.rl.Environment):

      def __init__(self):
        super(TestEnvironment, self).__init__((2,), action_shape=(1,))

      def reset(self):
        target = np.random.uniform(-50, 50)
        self._state = np.array([0, target])
        self._terminated = False
        self.count = 0

      def step(self, action):
        target = self._state[1]
        dist = np.abs(target - action[0])
        old_dist = np.abs(target - self._state[0])
        new_state = np.array([action[0], target])
        self._state = new_state
        self.count += 1
        reward = old_dist - dist
        self._terminated = (self.count == 10)
        return reward

    # A simple policy with no hidden layers.

    class TestPolicy(dc.rl.Policy):

      def create_layers(self, state, **kwargs):
        action_mean = Dense(
            1, in_layers=state, weights_initializer=tf.zeros_initializer)
        action_std = Constant([10.0])
        value = Dense(1, in_layers=state)
        return {
            'action_mean': action_mean,
            'action_std': action_std,
            'value': value
        }

    # Optimize it.

    env = TestEnvironment()
    learning_rate = PolynomialDecay(
        initial_rate=0.005, final_rate=0.0005, decay_steps=25000)
    a3c = dc.rl.A3C(
        env,
        TestPolicy(),
        discount_factor=0,
        optimizer=Adam(learning_rate=learning_rate))
    a3c.fit(25000)

    # Try running it and see if it reaches the target

    env.reset()
    while not env.terminated:
      env.step(a3c.select_action(env.state, deterministic=True))
    distance = np.abs(env.state[0] - env.state[1])
    tolerance = max(1.0, 0.1 * np.abs(env.state[1]))
    assert distance < tolerance