Commit f05c4e19 authored by peastman's avatar peastman
Browse files

Implemented continuous action spaces for A3C

parent 0e2cb8d9
Loading
Loading
Loading
Loading
+125 −58
Original line number Diff line number Diff line
@@ -13,11 +13,11 @@ import re
import threading


class A3CLoss(Layer):
  """This layer computes the loss function for A3C."""
class A3CLossDiscrete(Layer):
  """This layer computes the loss function for A3C with discrete action spaces."""

  def __init__(self, value_weight, entropy_weight, **kwargs):
    super(A3CLoss, self).__init__(**kwargs)
    super(A3CLossDiscrete, self).__init__(**kwargs)
    self.value_weight = value_weight
    self.entropy_weight = entropy_weight

@@ -35,6 +35,28 @@ class A3CLoss(Layer):
    return self.out_tensor


class A3CLossContinuous(Layer):
  """This layer computes the loss function for A3C with continuous action spaces."""

  def __init__(self, value_weight, entropy_weight, **kwargs):
    super(A3CLossContinuous, self).__init__(**kwargs)
    self.value_weight = value_weight
    self.entropy_weight = entropy_weight

  def create_tensor(self, **kwargs):
    reward, action, mean, std, value, advantage = [
        layer.out_tensor for layer in self.in_layers
    ]
    distrib = tf.distributions.Normal(mean, std)
    reduce_axes = list(range(1, len(action.shape)))
    log_prob = tf.reduce_sum(distrib.log_prob(action), reduce_axes)
    policy_loss = -tf.reduce_mean(advantage * log_prob)
    value_loss = tf.reduce_mean(tf.square(reward - value))
    entropy = tf.reduce_mean(distrib.entropy())
    self.out_tensor = policy_loss + self.value_weight * value_loss - self.entropy_weight * entropy
    return self.out_tensor


class A3C(object):
  """
  Implements the Asynchronous Advantage Actor-Critic (A3C) algorithm for reinforcement learning.
@@ -128,9 +150,13 @@ class A3C(object):
      self._optimizer = Adam(learning_rate=0.001, beta1=0.9, beta2=0.999)
    else:
      self._optimizer = optimizer
    fields = self._build_graph(None, 'global', model_dir)
    if self.continuous:
      (self._graph, self._features, self._rewards, self._actions,
     self._action_prob, self._value, self._advantages) = self._build_graph(
         None, 'global', model_dir)
       self._action_mean, self._action_std, self._value, self._advantages) = fields
    else:
      (self._graph, self._features, self._rewards, self._actions,
       self._action_prob, self._value, self._advantages) = fields
    with self._graph._get_tf("Graph").as_default():
      self._session = tf.Session()
    self._rnn_states = self._graph.rnn_zero_states
@@ -146,15 +172,9 @@ class A3C(object):
    for s, d in zip(state_shape, state_dtype):
      features.append(Feature(shape=[None] + list(s), dtype=tf.as_dtype(d)))
    policy_layers = self._policy.create_layers(features)
    action_prob = policy_layers['action_prob']
    value = policy_layers['value']
    rewards = Weights(shape=(None,))
    advantages = Weights(shape=(None,))
    actions = Label(shape=(None, self._env.n_actions))
    loss = A3CLoss(
        self.value_weight,
        self.entropy_weight,
        in_layers=[rewards, actions, action_prob, value, advantages])
    graph = TensorGraph(
        batch_size=self.max_rollout_length,
        use_queue=False,
@@ -162,13 +182,35 @@ class A3C(object):
        model_dir=model_dir)
    for f in features:
      graph._add_layer(f)
    if 'action_prob' in policy_layers:
      self.continuous = False
      action_prob = policy_layers['action_prob']
      actions = Label(shape=(None, self._env.n_actions))
      loss = A3CLossDiscrete(
          self.value_weight,
          self.entropy_weight,
          in_layers=[rewards, actions, action_prob, value, advantages])
      graph.add_output(action_prob)
    else:
      self.continuous = True
      action_mean = policy_layers['action_mean']
      action_std = policy_layers['action_std']
      actions = Label(shape=[None]+list(action_mean.shape))
      loss = A3CLossContinuous(
          self.value_weight,
          self.entropy_weight,
          in_layers=[rewards, actions, action_mean, action_std, value, advantages])
      graph.add_output(action_mean)
      graph.add_output(action_std)
    graph.add_output(value)
    graph.set_loss(loss)
    graph.set_optimizer(self._optimizer)
    with graph._get_tf("Graph").as_default():
      with tf.variable_scope(scope):
        graph.build()
    if self.continuous:
      return graph, features, rewards, actions, action_mean, action_std, value, advantages
    else:
      return graph, features, rewards, actions, action_prob, value, advantages

  def fit(self,
@@ -246,17 +288,11 @@ class A3C(object):
    -------
    the array of action probabilities, and the estimated value function
    """
    if not self._state_is_list:
      state = [state]
    with self._graph._get_tf("Graph").as_default():
      feed_dict = self._create_feed_dict(state, use_saved_states)
      tensors = [self._action_prob.out_tensor, self._value.out_tensor]
      if save_states:
        tensors += self._graph.rnn_final_states
      results = self._session.run(tensors, feed_dict=feed_dict)
      if save_states:
        self._rnn_states = results[2:]
      return results[:2]
    if self.continuous:
      outputs = [self._action_mean, self._action_std, self._value]
    else:
      outputs = [self._action_prob, self._value]
    return self._predict_outputs(outputs, state, use_saved_states, save_states)

  def select_action(self,
                    state,
@@ -290,22 +326,12 @@ class A3C(object):
    -------
    the index of the selected action
    """
    if not self._state_is_list:
      state = [state]
    with self._graph._get_tf("Graph").as_default():
      feed_dict = self._create_feed_dict(state, use_saved_states)
      tensors = [self._action_prob.out_tensor]
      if save_states:
        tensors += self._graph.rnn_final_states
      results = self._session.run(tensors, feed_dict=feed_dict)
      probabilities = results[0]
      if save_states:
        self._rnn_states = results[1:]
      if deterministic:
        return probabilities.argmax()
    if self.continuous:
      tensors = [self._action_mean, self._action_std]
    else:
        return np.random.choice(
            np.arange(self._env.n_actions), p=probabilities[0])
      tensors = [self._action_prob]
    outputs = self._predict_outputs(tensors, state, use_saved_states, save_states)
    return self._select_action_from_outputs(outputs, deterministic)

  def restore(self):
    """Reload the model parameters from the most recent checkpoint file."""
@@ -330,6 +356,37 @@ class A3C(object):
      feed_dict[placeholder] = value
    return feed_dict

  def _predict_outputs(self, outputs, state, use_saved_states, save_states):
    """Compute a set of outputs for a state. """
    if not self._state_is_list:
      state = [state]
    with self._graph._get_tf("Graph").as_default():
      feed_dict = self._create_feed_dict(state, use_saved_states)
      if save_states:
        tensors = outputs + self._graph.rnn_final_states
      else:
        tensors = outputs
      results = self._session.run(tensors, feed_dict=feed_dict)
      if save_states:
        self._rnn_states = results[len(outputs):]
      return results[:len(outputs)]

  def _select_action_from_outputs(self, outputs, deterministic):
    """Given the policy outputs, select an action to perform."""
    if self.continuous:
      action_mean, action_std = outputs
      if deterministic:
        return action_mean
      else:
        return np.random.normal(action_mean, action_std)
    else:
      action_prob = outputs[0]
      if deterministic:
        return action_prob.argmax()
      else:
        action_prob = action_prob.flatten()
        return np.random.choice(np.arange(len(action_prob)), p=action_prob)


class _Worker(object):
  """A Worker object is created for each training thread."""
@@ -340,8 +397,11 @@ class _Worker(object):
    self.scope = 'worker%d' % index
    self.env = copy.deepcopy(a3c._env)
    self.env.reset()
    self.graph, self.features, self.rewards, self.actions, self.action_prob, self.value, self.advantages = a3c._build_graph(
        a3c._graph._get_tf('Graph'), self.scope, None)
    fields = a3c._build_graph(a3c._graph._get_tf('Graph'), self.scope, None)
    if a3c.continuous:
      self.graph, self.features, self.rewards, self.actions, self.action_mean, self.action_std, self.value, self.advantages = fields
    else:
      self.graph, self.features, self.rewards, self.actions, self.action_prob, self.value, self.advantages = fields
    self.rnn_states = self.graph.rnn_zero_states
    with a3c._graph._get_tf("Graph").as_default():
      local_vars = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES,
@@ -386,13 +446,16 @@ class _Worker(object):
      state = self.env.state
      states.append(state)
      feed_dict = self.create_feed_dict(state)
      if self.a3c.continuous:
        tensors = [self.action_mean, self.action_std, self.value]
      else:
        tensors = [self.action_prob, self.value]
      results = session.run(
          [self.action_prob.out_tensor, self.value.out_tensor] +
          self.graph.rnn_final_states,
          tensors + self.graph.rnn_final_states,
          feed_dict=feed_dict)
      probabilities, value = results[:2]
      self.rnn_states = results[2:]
      action = np.random.choice(np.arange(n_actions), p=probabilities[0])
      value = results[len(tensors)-1]
      self.rnn_states = results[len(tensors):]
      action = self.a3c._select_action_from_outputs(results[:len(tensors)-1], False)
      actions.append(action)
      values.append(float(value))
      rewards.append(self.env.step(action))
@@ -431,10 +494,14 @@ class _Worker(object):
          1] += self.a3c.discount_factor * self.a3c.advantage_lambda * advantages[
              j]

    # Convert the actions to one-hot.
    # Record the actions, computing to one-hot if necessary.

    n_actions = self.env.n_actions
    actions_matrix = []
    if self.a3c.continuous:
      for action in actions:
        actions_matrix.append(action)
    else:
      n_actions = self.env.n_actions
      for action in actions:
        a = np.zeros(n_actions)
        a[action] = 1.0
@@ -457,10 +524,10 @@ class _Worker(object):
                                  initial_rnn_states):
      feed_dict[placeholder] = value
    for f, s in zip(self.features, state_arrays):
      feed_dict[f.out_tensor] = s
    feed_dict[self.rewards.out_tensor] = discounted_rewards
    feed_dict[self.actions.out_tensor] = actions_matrix
    feed_dict[self.advantages.out_tensor] = advantages
      feed_dict[f] = s
    feed_dict[self.rewards] = discounted_rewards
    feed_dict[self.actions] = actions_matrix
    feed_dict[self.advantages] = advantages
    feed_dict[self.global_step] = step_count
    self.a3c._session.run(self.train_op, feed_dict=feed_dict)

+61 −1
Original line number Diff line number Diff line
from flaky import flaky

import deepchem as dc
from deepchem.models.tensorgraph.layers import Reshape, Variable, SoftMax, GRU, Dense
from deepchem.models.tensorgraph.layers import Reshape, Variable, SoftMax, GRU, Dense, Constant
from deepchem.models.tensorgraph.optimizers import Adam, PolynomialDecay
import numpy as np
import tensorflow as tf
@@ -230,3 +230,63 @@ class TestA3C(unittest.TestCase):
      if np.array_equal(env.state[:2], env.state[2:]):
        pass_count += 1
    assert pass_count >= 3


  def test_continuous(self):
    """Test A3C on an environment with a continous action space."""

    # The state consists of two numbers: a current value and a target value.
    # The policy just needs to learn to output the target value (or at least
    # move toward it).

    class TestEnvironment(dc.rl.Environment):

      def __init__(self):
        super(TestEnvironment, self).__init__((2,), 0)

      def reset(self):
        target = np.random.uniform(-50, 50)
        self._state = np.array([0, target])
        self._terminated = False
        self.count = 0

      def step(self, action):
        target = self._state[1]
        dist = np.abs(target - action[0][0])
        old_dist = np.abs(target - self._state[0])
        new_state = np.array([action[0][0], target])
        self._state = new_state
        self.count += 1
        reward = old_dist - dist
        self._terminated = (self.count == 10)
        return reward

    # A simple policy with two hidden layers.

    class TestPolicy(dc.rl.Policy):

      def create_layers(self, state, **kwargs):
        action_mean = Dense(1, in_layers=state, weights_initializer=tf.zeros_initializer)
        action_std = Constant(10.0)
        value = Dense(1, in_layers=state)
        return {'action_mean': action_mean, 'action_std': action_std, 'value': value}

    # Optimize it.

    env = TestEnvironment()
    learning_rate = PolynomialDecay(
        initial_rate=0.005, final_rate=0.0005, decay_steps=25000)
    a3c = dc.rl.A3C(
        env,
        TestPolicy(), discount_factor=0,
        optimizer=Adam(learning_rate=learning_rate))
    a3c.fit(25000)

    # Try running it and see if it reaches the target

    env.reset()
    while not env.terminated:
      env.step(a3c.select_action(env.state, deterministic=True))
    distance = np.abs(env.state[0]-env.state[1])
    tolerance = max(1.0, 0.1*np.abs(env.state[1]))
    assert distance < tolerance