Commit b8ebb777 authored by Nathan Frey's avatar Nathan Frey
Browse files

Add training loop

parent 83b9ba1e
Loading
Loading
Loading
Loading
+199 −206
Original line number Diff line number Diff line
@@ -4,18 +4,212 @@ Normalizing flows for transforming distributions.

import numpy as np
import logging
from typing import List
from typing import List, Iterable, Optional, Tuple

import tensorflow_probability as tfp
import tensorflow as tf

import deepchem as dc
from deepchem.models.losses import Loss
from deepchem.models.models import Model
from deepchem.models.optimizers import Adam

logger = logging.getLogger(__name__)


class NormalizingFlow(tf.keras.models.Model):
  """Base class for normalizing flow.

  The purpose of a normalizing flow is to map a simple distribution that is
  easy to sample from and evaluate probability densities to more complex
  distribituions that are learned with data. The base distribution p(x) is
  transformed by the associated normalizing flow y=g(x) to model the
  distribution p(y).

  Normalizing flows combine the advantages of autoregressive models
  (which provide likelihood estimation but do not learn features) and
  variational autoencoders (which learn feature representations but
  do not provide marginal likelihoods).

  The determinant of the Jacobian of the transformation gives a factor
  that preserves the probability volume to 1 when transforming between
  probability densities of different random variables.

  """

  def __init__(self, **kwargs):
    """Create a new NormalizingFlow."""

    super(NormalizingFlow, self).__init__(**kwargs)

    # An instance of tfd.TransformedDistribution
    self.flow = None

  def __call__(self, *x):
    return self.flow.bijector.forward(*x)

  @tf.function
  def fit_on_batch(self, x: np.ndarray,
                   optimizer: dc.models.optimizers.Optimizer,
                   loss: dc.models.losses.Loss) -> float:
    """Fit on batch of samples.
    
    Parameters
    ----------
    x: np.ndarray, shape (n_samples, n_dim)
      Array of samples where each sample is a vector of length `n_dim`.
    optimizer: dc.models.optimizers.Optimizer
      An instance of Optimizer.
    loss: dc.models.losses.Loss
      An instance of Loss.

    Returns
    -------
    batch_loss: float
      Loss computed on this batch.

    """

    with tf.GradientTape() as tape:
      batch_loss = loss(x)
      grads = tape.gradient(batch_loss, self.trainable_variables)
      optimizer.apply_gradients(zip(grads, self.trainable_variables))
    return batch_loss


class NormalizingFlowModel(NormalizingFlow):
  """A base distribution and normalizing flow for applying transformations.

  A distribution implements two main operations:
    1. Sampling from the transformed distribution.
    2. Calculating log probabilities.

  A normalizing flow implements three main operations:
    1. Forward transformation, 2. Inverse transformation, and 
    3. Calculating the Jacobian.

  Deep Normalizing Flow models require normalizing flow layers where
  input and output dimensions are the same, the transformation is invertible,
  and the determinant of the Jacobian is efficient to compute and
  differentiable. 

  """

  def __init__(self,
               base_distribution,
               flow_layers: Iterable,
               optimizer: Optional[dc.models.optimizers.Optimizer] = None,
               loss: Optional[dc.models.losses.Loss] = None,
               **kwargs):
    """Creates a new NormalizingFlowModel.

    Parameters
    ----------
    base_distribution : tfd.Distribution
      Probability distribution to be transformed.
      Typically an N dimensional multivariate Gaussian.
    flow_layers : Iterable[tfb.Bijector]
      An iterable of bijectors that comprise the flow.
    optimizer: dc.models.optimizers.Optimizer
      An instance of Optimizer.
    loss: dc.models.losses.Loss
      An instance of Loss.

    """

    try:
      import tensorflow_probability as tfp
      tfd = tfp.distributions
      tfb = tfp.bijectors
    except ModuleNotFoundError:
      raise ValueError(
          "This class requires tensorflow-probability to be installed.")

    super(NormalizingFlowModel, self).__init__(**kwargs)

    self.base_distribution = base_distribution
    self.flow_layers = flow_layers
    if optimizer is None:
      self.optimizer = Adam(learning_rate=1e-5)._create_optimizer(
          tf.Variable(0, trainable=False))
    else:
      self.optimizer = optimizer

    # Chain of flows is also a normalizing flow
    bijector = tfb.Chain(list(reversed(self.flow_layers)))

    self.flow = tfd.TransformedDistribution(
        distribution=self.base_distribution, bijector=bijector)

    if loss is None:
      self.loss = self.nll
    else:
      self.loss = loss

    self.built = False

  def build(self):
    """Initialize tf network."""
    x = self.flow.distribution.sample(self.flow.distribution.batch_shape)
    for b in reversed(self.flow.bijector.bijectors):
      x = b.forward(x)

    self.built = True

  def fit(self,
          dataset: dc.data.Dataset,
          batch_size: int = 64,
          nb_epoch: int = 10) -> Tuple[float, float]:
    """Train on `dataset`.

    Parameters
    ----------
    dataset: dc.data.Dataset
      The Dataset to train on
    batch_size: int, default 64
      Number of elements in each batch
    nb_epoch: int, default 10
      the number of epochs to train for

    Returns
    -------
    final_loss: float
      Final loss value after training.
    avg_loss: float
      Average loss during training.

    """

    if not self.built:
      self.build()

    avg_loss = 0.
    nbatches = 0

    # Generator of (X, y, w, ids) batches
    gen = dataset.iterbatches(batch_size=batch_size)
    for epoch in range(nb_epoch):
      x = tf.convert_to_tensor(next(gen)[0], tf.float32)
      batch_loss = self.fit_on_batch(x, self.optimizer, self.loss)
      logger.info('Loss on epoch %i is %.4f' % (epoch, batch_loss))
      avg_loss += batch_loss
      nbatches += 1

    avg_loss /= nbatches
    final_loss = batch_loss
    return (final_loss, avg_loss)

  def nll(self, X):
    """Negative log loss."""

    return -tf.reduce_mean(self.flow.log_prob(X, training=True))


class NormalizingFlowLayer(object):
  """Base class for normalizing flow layers.

  This is an abstract base class for implementing new normalizing flow
  layers that are not available in tfb. It should not be called directly.

  A normalizing flow transforms random variables into new random variables.
  Each learnable layer is a bijection, an invertible
  transformation between two probability distributions. A simple initial
@@ -46,21 +240,10 @@ class NormalizingFlowLayer(object):

  """

  def __init__(self, model, **kwargs):
    """Create a new NormalizingFlowLayer.
    
    Parameters
    ----------
    model : object
      Model object from `tensorflow_probability.bijectors`. The model
      should be a bijective transformation with forward, inverse, and 
      LDJ methods.
    kwargs : dict
      Additional keyword arguments.

    """
  def __init__(self, **kwargs):
    """Create a new NormalizingFlowLayer."""

    self.model = model
    pass

  def _forward(self, x):
    """Forward transformation.
@@ -139,193 +322,3 @@ class NormalizingFlowLayer(object):
    """

    return -self._forward_log_det_jacobian(self._inverse(y))


class NormalizingFlow(object):
  """Base class for normalizing flow.

  A normalizing flow is a chain of NormalizingFlowLayers.

  The purpose of a normalizing flow is to map a simple distribution that is
  easy to sample from and evaluate probability densities to more complex
  distribituions that are learned with data. The base distribution p(x) is
  transformed by the associated normalizing flow y=g(x) to model the
  distribution p(y).

  Normalizing flows combine the advantages of autoregressive models
  (which provide likelihood estimation but do not learn features) and
  variational autoencoders (which learn feature representations but
  do not provide marginal likelihoods).

  The determinant of the Jacobian of the transformation gives a factor
  that preserves the probability volume to 1 when transforming between
  probability densities of different random variables.

  """

  def __init__(self, flows: List[NormalizingFlowLayer]):
    """Create a new NormalizingFlow.

    Parameters
    ----------
    flows : List[NormalizingFlowLayer]
      List of NormalizingFlowLayers.

    """

    self.flows = flows

  def _forward(self, x):
    """Apply normalizing flow.

    Parameters
    ----------
    x : Tensor
      Samples from distribution.

    Returns
    -------
    (ys, ldjs) : Tuple[Tensor, Tensor]
      Transformed samples and log det Jacobian values.

    """

    ys = [x]
    ldjs = np.zeros(x.shape[0])

    for flow in self.flows:
      x = flow._forward(x)
      ldj = flow._forward_log_det_jacobian(x)
      ldjs += ldj
      ys.append(x)

    return (ys, ldjs)

  def _inverse(self, y):
    """Invert normalizing flow.

    Parameters
    ----------
    y : Tensor
      Samples from transformed distribution.

    Returns
    -------
    (xs, ildjs) : Tuple[Tensor, Tensor]
      Transformed samples and inverse log det Jacobian values.

    """

    xs = [y]
    ildjs = np.zeros(y.shape[0])

    for flow in self.flows:
      x = flow._inverse(y)
      ildj = flow._inverse_log_det_jacobian(y)
      ildjs += ildj
      xs.append(x)

    return (xs, ildjs)


class NormalizingFlowModel(Model):
  """A base distribution and normalizing flow for applying transformations.

  A distribution implements two main operations:
    1. Sampling from the transformed distribution.
    2. Calculating log probabilities.

  A normalizing flow implements three main operations:
    1. Forward transformation, 2. Inverse transformation, and 
    3. Calculating the Jacobian.

  Deep Normalizing Flow models require normalizing flow layers where
  input and output dimensions are the same, the transformation is invertible,
  and the determinant of the Jacobian is efficient to compute and
  differentiable. 

  """

  def __init__(self,
               base_distribution: tfp.distributions.Distribution,
               normalizing_flow: NormalizingFlow,
               event_shape=None):
    """Creates a new NormalizingFlowModel.

    Parameters
    ----------
    base_distribution : Distribution
      Probability distribution to be transformed.
    normalizing_flow : NormalizingFlow
      An instance of NormalizingFlow.
    event_shape : Tensor
      Shape of single samples drawn from distribution. For scalar
      distributions the shape is []. For a 3D Multi-variate normal 
      distribution, the shape is [3].

    """

    self.base_distribution = base_distribution
    self.normalizing_flow = normalizing_flow
    self.event_shape = event_shape

  def __call__(self, x):
    """Apply `normalizing_flow` to samples from `base_distribution`.

    Parameters
    ----------
    x : Tensor
      Samples from `base_distribution`.

    Returns
    -------
    (y, ldjs) : Tuple[Tensor, Tensor]
      Samples from transformed distribution and log det Jacobian.

    """

    return self.normalizing_flow._forward(x)

  def sample(self, shape, seed=None):
    """Generate samples from the transformed distribution.

    Parameters
    ----------
    shape : Tensor
      Shape of generated samples.
    seed : int
      Random seed.

    Returns
    -------
    samples : Tensor
      Tensor of random samples from the distribution.

    """

    raise NotImplementedError("Sampling must be defined.")

  def log_prob(self, value):
    """Log probability function.

    Given a datapoint `x`, what is the probability assigned by the
    model p(x). Equivalent to probability density estimation.

    The negative log likelihood (NLL) is a common loss function for 
    fitting data to distributions.

    NLL = -mean(log_prob(x))

    Parameters
    ----------
    value : Tensor
      Value of random variable.

    Returns
    -------
    log_prob : Tensor
      Log-likelihood function.

    """

    raise NotImplementedError("Log prob must be defined.")
+33 −46
Original line number Diff line number Diff line
@@ -13,60 +13,47 @@ import tensorflow_probability as tfp
import unittest
import numpy as np

from deepchem.models.normalizing_flows import NormalizingFlowLayer, NormalizingFlow, NormalizingFlowModel
from deepchem.data import NumpyDataset
from deepchem.models.normalizing_flows import NormalizingFlow, NormalizingFlowModel

tfd = tfp.distributions
tfb = tfp.bijectors


class TestNormalizingFlow(unittest.TestCase):

  def setUp(self):

    self.ef = ExpFlow()
    self.nfm = TransformedNormal()
    flow_layers = [
        tfb.RealNVP(
            num_masked=2,
            shift_and_log_scale_fn=tfb.real_nvp_default_template(
                hidden_layers=[8, 8]))
    ]
    # 3D Multivariate Gaussian base distribution
    self.nfm = NormalizingFlowModel(
        base_distribution=tfd.MultivariateNormalDiag(loc=[0., 0., 0.]),
        flow_layers=flow_layers)

    # Must be float32 for RealNVP
    self.dataset = NumpyDataset(
        X=np.random.rand(5, 3).astype(np.float32),
        y=np.random.rand(5,),
        ids=np.arange(5))

  def test_simple_flow(self):
    """Tests a simple flow of Exp layers."""

    X = self.nfm.sample([10])

    ys, ldjs = self.nfm(X)
    xs, ildjs = self.nfm.normalizing_flow._inverse(ys[-1])

    assert len(xs) == 3
    assert len(ys) == 3
    assert xs[0].shape == 10
    assert np.isclose(self.nfm.log_prob(1), -1.4, atol=0.5)


class ExpFlow(NormalizingFlowLayer):
  """Exp(x)."""

  def __init__(self, **kwargs):
    model = tfp.bijectors.Exp()
    super(ExpFlow, self).__init__(model, **kwargs)

  def _forward(self, x):
    return self.model.forward(x)

  def _inverse(self, y):
    return self.model.inverse(y)

  def _forward_log_det_jacobian(self, x):
    return self.model.forward_log_det_jacobian(x, 1)


class TransformedNormal(NormalizingFlowModel):
  """Univariate Gaussian base distribution."""

  def __init__(self, 
    base_distribution=tfp.distributions.Normal(0, 1),
    normalizing_flow=NormalizingFlow([ExpFlow(), ExpFlow()])
    ):

    super(TransformedNormal, self).__init__(base_distribution, normalizing_flow)
    """Tests a simple flow of one RealNVP layer."""

  def sample(self, shape, seed=None):
    return self.base_distribution.sample(sample_shape=shape, seed=seed)
    X = self.nfm.flow.sample()
    x1 = tf.zeros([3])
    x2 = self.dataset.X[0]

  def log_prob(self, value):
    return self.base_distribution.log_prob(value=value)
    # log likelihoods should be negative
    assert self.nfm.flow.log_prob(X).numpy() < 0
    assert self.nfm.flow.log_prob(x1).numpy() < 0
    assert self.nfm.flow.log_prob(x2).numpy() < 0

    # Build and fit model
    self.nfm.build()
    final, avg = self.nfm.fit(self.dataset, batch_size=1, nb_epoch=5)
    assert final.numpy() < 5.0
+6 −0
Original line number Diff line number Diff line
@@ -302,3 +302,9 @@ ChemCeption

.. autoclass:: deepchem.models.ChemCeption
  :members:

NormalizingFlowModel
--------------------

.. autoclass:: deepchem.models.normalizing_flows.NormalizingFlowModel
  :members:
 No newline at end of file