Commit 5bcff26e authored by Nathan Frey's avatar Nathan Frey
Browse files

Custom gradients and type annotations

parent bcef53fd
Loading
Loading
Loading
Loading
+94 −51
Original line number Diff line number Diff line
@@ -4,7 +4,7 @@ Normalizing flows for transforming probability distributions.

import numpy as np
import logging
from typing import List, Iterable, Optional, Tuple, Sequence, Any
from typing import List, Iterable, Optional, Tuple, Sequence, Any, Callable

import tensorflow as tf
from tensorflow.keras.layers import Lambda
@@ -14,6 +14,7 @@ from deepchem.models.losses import Loss
from deepchem.models.models import Model
from deepchem.models.keras_model import KerasModel
from deepchem.models.optimizers import Optimizer, Adam
from deepchem.utils.typing import OneOrMany

logger = logging.getLogger(__name__)

@@ -34,7 +35,11 @@ class NormalizingFlow(tf.keras.models.Model):

  """

  def __init__(self, base_distribution, flow_layers, **kwargs):
  def __init__(self,
               base_distribution,
               flow_layers: Sequence,
               event_shape: Optional[List[int]] = None,
               **kwargs) -> None:
    """Create a new NormalizingFlow.

    Parameters
@@ -44,6 +49,9 @@ class NormalizingFlow(tf.keras.models.Model):
      Typically an N dimensional multivariate Gaussian.
    flow_layers: Sequence[tfb.Bijector]
      An iterable of bijectors that comprise the flow.
    event_shape: Optional[List[int]]
      Dimensionality of inputs, e.g. [2] for 2D inputs.
    **kwargs

    """

@@ -57,13 +65,16 @@ class NormalizingFlow(tf.keras.models.Model):

    self.base_distribution = base_distribution
    self.flow_layers = flow_layers
    self.event_shape = event_shape

    # Chain of flows is also a normalizing flow
    bijector = tfb.Chain(list(reversed(self.flow_layers)))

    # An instance of tfd.TransformedDistribution
    self.flow = tfd.TransformedDistribution(
        distribution=self.base_distribution, bijector=bijector)
        distribution=self.base_distribution,
        bijector=bijector,
        event_shape=self.event_shape)

    super(NormalizingFlow, self).__init__(**kwargs)

@@ -74,13 +85,20 @@ class NormalizingFlow(tf.keras.models.Model):
class NormalizingFlowModel(KerasModel):
  """A base distribution and normalizing flow for applying transformations.

  Normalizing flows are effective for any application requiring 
  a probabilistic model that can both sample from a distribution and
  compute marginal likelihoods, e.g. generative modeling,
  unsupervised learning, or probabilistic inference. For a thorough review
  of normalizing flows, see [1]_.

  A distribution implements two main operations:
    1. Sampling from the transformed distribution.
    2. Calculating log probabilities.
    1. Sampling from the transformed distribution
    2. Calculating log probabilities

  A normalizing flow implements three main operations:
    1. Forward transformation, 2. Inverse transformation, and 
    3. Calculating the Jacobian.
    1. Forward transformation 
    2. Inverse transformation 
    3. Calculating the Jacobian

  Deep Normalizing Flow models require normalizing flow layers where
  input and output dimensions are the same, the transformation is invertible,
@@ -89,29 +107,21 @@ class NormalizingFlowModel(KerasModel):
  gives a factor that preserves the probability volume to 1 when transforming
  between probability densities of different random variables.

  They are effective for any application requiring a probabilistic
  model with these capabilities, e.g. generative modeling,
  unsupervised learning, or probabilistic inference. For a thorough review
  of normalizing flows, see [1]_.

  References
  ----------
  .. [1] Papamakarios, George et al. "Normalizing Flows for Probabilistic Modeling and Inference." (2019). https://arxiv.org/abs/1912.02762.

  """

  def __init__(self, model: NormalizingFlow, **kwargs):
  def __init__(self, model: NormalizingFlow, **kwargs) -> None:
    """Creates a new NormalizingFlowModel.

    In addition to the following arguments, this class also accepts all the keyword arguments from KerasModel.

    Parameters
    ----------
    model: NormalizingFlow
      An instance of NormalizingFlow.    
    loss: dc.models.losses.Loss, default NegLogLoss
      Loss function
    optimizer: dc.models.optimizers.Optimizer, default Adam
      Optimizer.
    

    Examples
    --------
@@ -143,30 +153,27 @@ class NormalizingFlowModel(KerasModel):
      raise ValueError(
          "This class requires tensorflow-probability to be installed.")

    self.model = model
    self.flow = model.flow  # normalizing flow
    """Initialize tf network."""
    x = self.flow.distribution.sample(self.flow.distribution.batch_shape)
    for b in reversed(self.flow.bijector.bijectors):
      x = b.forward(x)

    self.nll_loss_fn = lambda output, labels, weights: self.create_nll(output)
    self.nll_loss_fn = lambda input, labels, weights: self.create_nll(input)

    super(NormalizingFlowModel, self).__init__(
        model=self.model, loss=self.nll_loss_fn, **kwargs)
        model=model, loss=self.nll_loss_fn, **kwargs)

    self.flow = self.model.flow  # normalizing flow

    # TODO: Incompability between TF and TFP means that TF doesn't track
    # trainable variables in the flow; must override `_create_gradient_fn`
    # self._variables = self.flow.trainable_variables

  def create_nll(self, output):
    """Create the negative log loss function for density estimation.
  def create_nll(self, input: OneOrMany[tf.Tensor]) -> tf.Tensor:
    """Create the negative log likelihood loss function.

    The default implementation is appropriate for most cases. Subclasses can
    override this if there is a need to customize it.

    Parameters
    ----------
    output: Tensor
      the output from the normalizing flow on a batch of generated data.
      This is its estimate of the probability that the sample was drawn
      from the target distribution.
    input: OneOrMany[tf.Tensor]
      A batch of data.

    Returns
    -------
@@ -174,9 +181,45 @@ class NormalizingFlowModel(KerasModel):

    """

    return Lambda(
        lambda x: -tf.reduce_mean(tf.math.add(self.flow.log_prob(x), 1e-10)))(
            output)
    return -tf.reduce_mean(self.flow.log_prob(input, training=True))

  def _create_gradient_fn(self,
                          variables: Optional[List[tf.Variable]]) -> Callable:
    """Create a function that computes gradients and applies them to the model.

    Because of the way TensorFlow function tracing works, we need to create a
    separate function for each new set of variables.
    
    Parameters
    ----------
    variables: Optional[List[tf.Variable]]
      Variables to track during training.

    Returns
    -------
    Callable function that applies gradients for batch of training data.

    """

    @tf.function(experimental_relax_shapes=True)
    def apply_gradient_for_batch(inputs, labels, weights, loss):
      with tf.GradientTape() as tape:
        tape.watch(self.flow.trainable_variables)
        if isinstance(inputs, tf.Tensor):
          inputs = [inputs]
        if self._loss_outputs is not None:
          inputs = [inputs[i] for i in self._loss_outputs]
        batch_loss = loss(inputs, labels, weights)
      if variables is None:
        vars = self.flow.trainable_variables
      else:
        vars = variables
      grads = tape.gradient(batch_loss, vars)
      self._tf_optimizer.apply_gradients(zip(grads, vars))
      self._global_step.assign_add(1)
      return batch_loss

    return apply_gradient_for_batch


class NormalizingFlowLayer(object):
@@ -218,64 +261,64 @@ class NormalizingFlowLayer(object):

    pass

  def _forward(self, x):
  def _forward(self, x: tf.Tensor) -> tf.Tensor:
    """Forward transformation.

    x = g(y)

    Parameters
    ----------
    x : Tensor
    x: tf.Tensor
      Input tensor.

    Returns
    -------
    fwd_x : Tensor
    fwd_x: tf.Tensor
      Transformed tensor.

    """

    raise NotImplementedError("Forward transform must be defined.")

  def _inverse(self, y):
  def _inverse(self, y: tf.Tensor) -> tf.Tensor:
    """Inverse transformation.

    x = g^{-1}(y)
    
    Parameters
    ----------
    y : Tensor
    y: tf.Tensor
      Input tensor.

    Returns
    -------
    inv_y : Tensor
    inv_y: tf.Tensor
      Inverted tensor.

    """

    raise NotImplementedError("Inverse transform must be defined.")

  def _forward_log_det_jacobian(self, x):
  def _forward_log_det_jacobian(self, x: tf.Tensor) -> tf.Tensor:
    """Log |Determinant(Jacobian(x)|

    Note x = g^{-1}(y)

    Parameters
    ----------
    x : Tensor
    x: tf.Tensor
      Input tensor.

    Returns
    -------
    ldj : Tensor
    ldj: tf.Tensor
      Log of absolute value of determinant of Jacobian of x.

    """

    raise NotImplementedError("LDJ must be defined.")

  def _inverse_log_det_jacobian(self, y):
  def _inverse_log_det_jacobian(self, y: tf.Tensor) -> tf.Tensor:
    """Inverse LDJ.

    The ILDJ = -LDJ.
@@ -284,12 +327,12 @@ class NormalizingFlowLayer(object):

    Parameters
    ----------
    y : Tensor
    y: tf.Tensor
      Input tensor.

    Returns
    -------
    ildj : Tensor
    ildj: tf.Tensor
      Log of absolute value of determinant of Jacobian of y.

    """