Commit 4e6ce062 authored by Kevin Shen's avatar Kevin Shen
Browse files

Combine all Wandblogger KerasModel integration commits

fixed initialization, added logging

metric logging, epoch logging, refactor

fixed logger, removed wandb from callback

updating logging and evaluation

ran new tests, made fixes

design changes

Added docstrings

style changes

style changes for keras_model.py
parent c81d221b
Loading
Loading
Loading
Loading
+0 −3
Original line number Diff line number Diff line
@@ -80,9 +80,6 @@ class ValidationCallback(object):
      for key in scores:
        model._log_scalar_to_tensorboard(key, scores[key],
                                         model.get_global_step())
    if model.wandb:
      import wandb
      wandb.log(scores, step=step)
    if self.save_dir is not None:
      score = scores[self.metrics[self.save_metric].name]
      if not self.save_on_minimum:
+89 −97
Original line number Diff line number Diff line
@@ -14,7 +14,6 @@ from deepchem.metrics import Metric
from deepchem.models.losses import Loss
from deepchem.models.models import Model
from deepchem.models.optimizers import Adam, Optimizer, LearningRateSchedule
from deepchem.models.callbacks import ValidationCallback
from deepchem.trans import Transformer, undo_transforms
from deepchem.utils.evaluate import GeneratorEvaluator

@@ -22,19 +21,6 @@ from typing import Any, Callable, Dict, Iterable, List, Optional, Sequence, Tupl
from deepchem.utils.typing import ArrayLike, LossFn, OneOrMany
from deepchem.models.wandblogger import WandbLogger

try:
  import wandb
  wandb.ensure_configured()
  if wandb.api.api_key is None:
    _has_wandb = False
    wandb.termwarn(
        "W&B installed but not logged in.  Run `wandb login` or set the WANDB_API_KEY env variable."
    )
  else:
    _has_wandb = True
except (ImportError, AttributeError):
  _has_wandb = False

logger = logging.getLogger(__name__)


@@ -134,7 +120,7 @@ class KerasModel(Model):
               learning_rate: Union[float, LearningRateSchedule] = 0.001,
               optimizer: Optional[Optimizer] = None,
               tensorboard: bool = False,
               wandb: Optional[WandbLogger] = None,
               wandb_logger: Optional[WandbLogger] = None,
               log_frequency: int = 100,
               **kwargs) -> None:
    """Create a new KerasModel.
@@ -161,8 +147,8 @@ class KerasModel(Model):
      ignored.
    tensorboard: bool
      whether to log progress to TensorBoard during training
    wandb: bool (MODIFY)
      whether to log progress to Weights & Biases during training
    wandb_logger: WandbLogger
      the Weights & Biases logger to log data and metrics
    log_frequency: int
      The frequency at which to log data. Data is logged using
      `logging` by default. If `tensorboard` is set, data is also
@@ -184,29 +170,25 @@ class KerasModel(Model):
      self.optimizer = optimizer
    self.tensorboard = tensorboard

    # W&B logging
    if (wandb is not None) and not _has_wandb:
      logger.warning(
          "You are using a wandb logger but W&B is not installed. To use wandb logging, "
          "run `pip install wandb; wandb login` see https://docs.wandb.com/huggingface."
      )
    self.wandb_logger = wandb
    #Need to save model??
    #wandb.save_model(...) #save the tf.keras.Model
    self.wandb_logger = wandb_logger

    #update config with KerasModel Params
    run_params = dict(
      model=model,
      loss=loss,
    # Setup and initialize W&B logging
    if (self.wandb_logger is not None) and (not self.wandb_logger.initialized):
      self.wandb_logger.setup()

    # Update config with KerasModel params
    wandb_logger_config = dict(loss=loss,
                               output_types=output_types,
                               batch_size=batch_size,
                               model_dir=model_dir,
                               learning_rate=learning_rate,
                               optimizer=optimizer,
                               tensorboard=tensorboard,
      log_frequency=log_frequency
    )
    self.wandb_logger.wandb.config.update(run_params)
                               log_frequency=log_frequency)
    wandb_logger_config.update(**kwargs)

    if self.wandb_logger is not None:
      self.wandb_logger.update_config(wandb_logger_config)

    # Backwards compatibility
    if "tensorboard_log_frequency" in kwargs:
@@ -252,8 +234,8 @@ class KerasModel(Model):
    self._built = True
    self._global_step = tf.Variable(0, trainable=False)
    self._tf_optimizer = self.optimizer._create_tf_optimizer(self._global_step)
    self._checkpoint = tf.train.Checkpoint(
        optimizer=self._tf_optimizer, model=self.model)
    self._checkpoint = tf.train.Checkpoint(optimizer=self._tf_optimizer,
                                           model=self.model)

  def _create_inputs(self, example_inputs: List) -> None:
    """The first time this is called, create tensors representing the inputs and outputs."""
@@ -271,8 +253,8 @@ class KerasModel(Model):
          for x in example_inputs
      ]

  def _create_training_ops(self,
                           example_batch: Tuple[List, List, List]) -> None:
  def _create_training_ops(self, example_batch: Tuple[List, List,
                                                      List]) -> None:
    """The first time this is called, create tensors used in optimization."""
    if self._training_ops_built:
      return
@@ -337,10 +319,11 @@ class KerasModel(Model):
    The average loss over the most recent checkpoint interval
   """
    return self.fit_generator(
        self.default_generator(
            dataset, epochs=nb_epoch,
            deterministic=deterministic), max_checkpoints_to_keep,
        checkpoint_interval, restore, variables, loss, callbacks, all_losses)
        self.default_generator(dataset,
                               epochs=nb_epoch,
                               deterministic=deterministic),
        max_checkpoints_to_keep, checkpoint_interval, restore, variables, loss,
        callbacks, all_losses)

  def fit_generator(self,
                    generator: Iterable[Tuple[Any, Any, Any]],
@@ -414,13 +397,8 @@ class KerasModel(Model):
    # Main training loop.

    # Warn if both ValidationCallback and WandbLogger present
    if wandb is not None:
      for c in callbacks:
        if isinstance(c, ValidationCallback):
          logger.warning(
            "You are using both WandbLogger and ValidationCallback. WandbLogger is able to log validation metrics" 
            "so there is no need to have a ValidationCallback. Logging validation metrics twice may take longer."
          )
    if self.wandb_logger is not None:
      self.wandb_logger.check_other_loggers(callbacks)

    for batch in generator:
      self._create_training_ops(batch)
@@ -444,8 +422,8 @@ class KerasModel(Model):
      should_log = (current_step % self.log_frequency == 0)
      if should_log:
        avg_loss = float(avg_loss) / averaged_batches
        logger.info(
            'Ending global_step %d: Average loss %g' % (current_step, avg_loss))
        logger.info('Ending global_step %d: Average loss %g' %
                    (current_step, avg_loss))
        if all_losses is not None:
          all_losses.append(avg_loss)
        # Capture the last avg_loss in case of return since we're resetting to
@@ -460,14 +438,21 @@ class KerasModel(Model):
        c(self, current_step)
      if self.tensorboard and should_log:
        self._log_scalar_to_tensorboard('loss', batch_loss, current_step)
      if self.wandb and should_log:
        #wandb.log({'loss': batch_loss}, step=current_step)
      if self.wandb_logger is not None:
        # Calculate epoch number, sample count number, and log to wandb
        self.wandb_logger.calculate_epoch_and_sample_count(current_step)
        self.wandb_logger.log(self, {'train/loss': batch_loss},
                              step=current_step)

    # Close WandbLogger
    if self.wandb_logger is not None:
      self.wandb_logger.finish()

    # Report final results.
    if averaged_batches > 0:
      avg_loss = float(avg_loss) / averaged_batches
      logger.info(
          'Ending global_step %d: Average loss %g' % (current_step, avg_loss))
      logger.info('Ending global_step %d: Average loss %g' %
                  (current_step, avg_loss))
      if all_losses is not None:
        all_losses.append(avg_loss)
      last_avg_loss = avg_loss
@@ -546,19 +531,19 @@ class KerasModel(Model):
    """
    self._ensure_built()
    dataset = NumpyDataset(X, y, w)
    return self.fit(
        dataset,
    return self.fit(dataset,
                    nb_epoch=1,
                    max_checkpoints_to_keep=max_checkpoints_to_keep,
        checkpoint_interval=self._global_step.numpy() + 2 if checkpoint else 0,
                    checkpoint_interval=self._global_step.numpy() +
                    2 if checkpoint else 0,
                    variables=variables,
                    loss=loss,
                    callbacks=callbacks)

  def _predict(
      self, generator: Iterable[Tuple[Any, Any, Any]],
      transformers: List[Transformer], outputs: Optional[OneOrMany[tf.Tensor]],
      uncertainty: bool,
      self, generator: Iterable[Tuple[Any, Any,
                                      Any]], transformers: List[Transformer],
      outputs: Optional[OneOrMany[tf.Tensor]], uncertainty: bool,
      other_output_types: Optional[OneOrMany[str]]) -> OneOrMany[np.ndarray]:
    """
    Predict outputs for data provided by a generator.
@@ -753,8 +738,10 @@ class KerasModel(Model):
    dataset = NumpyDataset(X=X, y=None)
    return self.predict(dataset, transformers, outputs)

  def predict_uncertainty_on_batch(self, X: Sequence, masks: int = 50
                                  ) -> OneOrMany[Tuple[np.ndarray, np.ndarray]]:
  def predict_uncertainty_on_batch(
      self,
      X: Sequence,
      masks: int = 50) -> OneOrMany[Tuple[np.ndarray, np.ndarray]]:
    """
    Predict the model's outputs, along with the uncertainty in each one.

@@ -812,10 +799,11 @@ class KerasModel(Model):
    a NumPy array of the model produces a single output, or a list of arrays
    if it produces multiple outputs
    """
    generator = self.default_generator(
        dataset, mode='predict', deterministic=True, pad_batches=False)
    return self.predict_on_generator(
        generator,
    generator = self.default_generator(dataset,
                                       mode='predict',
                                       deterministic=True,
                                       pad_batches=False)
    return self.predict_on_generator(generator,
                                     transformers=transformers,
                                     outputs=outputs,
                                     output_types=output_types)
@@ -836,12 +824,15 @@ class KerasModel(Model):
    a NumPy array of the embeddings model produces, or a list
    of arrays if it produces multiple embeddings
    """
    generator = self.default_generator(
        dataset, mode='predict', pad_batches=False)
    generator = self.default_generator(dataset,
                                       mode='predict',
                                       pad_batches=False)
    return self._predict(generator, [], None, False, ['embedding'])

  def predict_uncertainty(self, dataset: Dataset, masks: int = 50
                         ) -> OneOrMany[Tuple[np.ndarray, np.ndarray]]:
  def predict_uncertainty(
      self,
      dataset: Dataset,
      masks: int = 50) -> OneOrMany[Tuple[np.ndarray, np.ndarray]]:
    """
    Predict the model's outputs, along with the uncertainty in each one.

@@ -869,8 +860,9 @@ class KerasModel(Model):
    sum_sq_pred: List[np.ndarray] = []
    sum_var: List[np.ndarray] = []
    for i in range(masks):
      generator = self.default_generator(
          dataset, mode='uncertainty', pad_batches=False)
      generator = self.default_generator(dataset,
                                         mode='uncertainty',
                                         pad_batches=False)
      results = self._predict(generator, [], None, True, None)
      if len(sum_pred) == 0:
        for p, v in results:
@@ -951,8 +943,8 @@ class KerasModel(Model):
    # Use a GradientTape to compute gradients.

    X = tf.constant(X[0])
    with tf.GradientTape(
        persistent=True, watch_accessed_variables=False) as tape:
    with tf.GradientTape(persistent=True,
                         watch_accessed_variables=False) as tape:
      tape.watch(X)
      outputs = self._compute_model(X)
      if tf.is_tensor(outputs):
@@ -970,8 +962,8 @@ class KerasModel(Model):
      return final_result[0]
    return final_result

  def _prepare_batch(self,
                     batch: Tuple[Any, Any, Any]) -> Tuple[List, List, List]:
  def _prepare_batch(self, batch: Tuple[Any, Any,
                                        Any]) -> Tuple[List, List, List]:
    inputs, labels, weights = batch
    inputs = [
        x if x.dtype == t else x.astype(t)
@@ -1031,8 +1023,8 @@ class KerasModel(Model):
    ([inputs], [outputs], [weights])
    """
    for epoch in range(epochs):
      for (X_b, y_b, w_b, ids_b) in dataset.iterbatches(
          batch_size=self.batch_size,
      for (X_b, y_b, w_b,
           ids_b) in dataset.iterbatches(batch_size=self.batch_size,
                                         deterministic=deterministic,
                                         pad_batches=pad_batches):
        yield ([X_b], [y_b], [w_b])
@@ -1222,8 +1214,8 @@ class KerasModel(Model):

    if assignment_map is None:
      logger.info("No assignment map provided. Creating custom assignment map.")
      assignment_map = self._create_assignment_map(
          source_model=source_model, include_top=include_top)
      assignment_map = self._create_assignment_map(source_model=source_model,
                                                   include_top=include_top)

    for source_var, dest_var in assignment_map.items():
      assert source_var.deref().shape == dest_var.shape
+200 −72
Original line number Diff line number Diff line
import torch
import tensorflow as tf
from typing import Any, Callable, Dict, Iterable, List, Optional, Sequence, Tuple, Union
from deepchem.data import Dataset, NumpyDataset
import math
import logging
import importlib.util
from typing import List, Optional, Union
from deepchem.data import Dataset
from deepchem.metrics import Metric
from deepchem.models.callbacks import ValidationCallback

logger = logging.getLogger(__name__)

def is_wandb_available():
  return importlib.util.find_spec("wandb") is not None


class WandbLogger(object):
    """
  Weights & Biases Logger
  """Weights & Biases Logger for KerasModel.

    This is a logger class that can be passed into the initialization
    of a KerasModel. It initializes and sets up a wandb logger which
    will log the specified metrics calculated on the specific datasets
    to the user's W&B dashboard.

    If a WandbLogger is provided to the wandb_logger flag in KerasModel,
    the metrics are logged to Weights & Biases, along with other information
    such as epoch number, losses, sample counts, and model configuration data.
    """

  def __init__(self,
                 datasets: List[Dataset],
                 metrics: List[Metric],
                 log_loss: bool = True,
               train_dataset: Dataset,
               eval_dataset: Optional[Dataset] = None,
               metrics: Optional[List[Metric]] = None,
               logging_strategy: Optional[str] = "step",
               name: Optional[str] = None,
               entity: Optional[str] = None,
               project: Optional[str] = None,
               save_dir: Optional[str] = None,
                 offline: Optional[bool] = False,
               mode: Optional[str] = "online",
               id: Optional[str] = None,
                 anonymous: Optional[bool] = None,
                 version: Optional[str] = None,
                 project: Optional[str] = None,
               resume: Optional[Union[bool, str]] = None,
               anonymous: Optional[str] = "never",
               log_model: Optional[bool] = False,
                 experiment=None,
                 prefix: Optional[str] = '',
               log_dataset: Optional[bool] = False,
               **kwargs):
        try:
    """Parameters
    ----------
    train_dataset: dc.data.Dataset
      the training set on which the model is run on
    eval_dataset: dc.data.Dataset
      the validation set on which to compute the metrics
    metrics: list of dc.metrics.Metric
      metrics to compute on eval_dataset
    logging_strategy: str
      the logging strategy used for logging (step or epoch)
    name: str
      a display name for the run in the W&B dashboard
    entity: str
      an entity is a username or team name where you're sending the W&B run
    project: str
      the name of the project where you're sending the new W&B run
    save_dir: str
      path where data is saved (wandb dir by default)
    mode: str
      W&B online or offline mode
    id: str
      a unique ID for this run, used for resuming
    resume: bool or str
      sets the resuming behavior
    anonymous: str
      controls anonymous data logging
    log_model: bool
      whether to log the model to W&B
    log_dataset: bool
      whether to log the dataset to W&B
    """

    assert is_wandb_available(
    ), "WandbLogger requires wandb to be installed. Please run `pip install wandb --upgrade`"
    import wandb
        except ImportError:
            raise ImportError(
                'You want to use `wandb` logger which is not installed yet,'
                ' install it with `pip install wandb`.'
            )
    self._wandb = wandb

        if offline and log_model:
            # TODO: Different exception type?
    if mode == "offline" and log_model:
      raise Exception(
                f'Providing log_model={log_model} and offline={offline} is an invalid configuration'
          f'Providing log_model={log_model} and mode={mode} is an invalid configuration'
          ' since model checkpoints cannot be uploaded in offline mode.\n'
                'Hint: Set `offline=False` to log your model.'
          'Hint: Set `mode="online"` to log your model.')

    # Check for metrics and logging strategy
    if ((metrics is None) or (not metrics)) and (eval_dataset is not None):
      logger.warning(
          "Warning: No metrics are provided. "
          "Please provide a list of metrics to be calculated on the datasets.")

    if logging_strategy != "step" and logging_strategy != "epoch":
      logger.warning(
          "Warning: `logging_strategy` needs to be either 'step' or 'epoch'. Defaulting to 'step'."
      )
        self.base_model = None # will be set in KerasModel init
        self.datasets = datasets
      logging_strategy = "step"

    self.datasets = {"train": train_dataset, "eval": eval_dataset}
    self.train_dataset_size = len(self.datasets["train"])
    self.metrics = metrics
        self.log_loss = log_loss
    self.logging_strategy = logging_strategy

        self.offline = offline
    self.log_model = log_model
        self.prefix = prefix
        self.experiment = experiment
    self.log_dataset = log_dataset
    self.save_dir = save_dir

    # set wandb init arguments
        anonymous_lut = {True: 'allow', False: None}
        self.wandb_init = dict(
            name=name,
    self.wandb_init_params = dict(name=name,
                                  project=project,
            id=version or id,
                                  entity=entity,
                                  mode=mode,
                                  id=id,
                                  dir=save_dir,
            resume='allow',
            anonymous=anonymous_lut.get(anonymous, anonymous)
                                  resume=resume,
                                  anonymous=anonymous)
    self.wandb_init_params.update(**kwargs)
    self.initialized = False

  def setup(self):
    """Initializes a W&B run and create a run object.
    """
    self.wandb_run = self._wandb.init(**self.wandb_init_params)
    self.initialized = True

  def check_other_loggers(self, callbacks):
    """Check for different callbacks and warn for redundant logging behaviour.
    Parameters
    ----------
    callbacks: function or list of functions
      one or more functions of the form f(model, step) that will be passed into fit().

    """
    for c in callbacks:
      if isinstance(c, ValidationCallback):
        logger.warning(
            "Note: You are using both WandbLogger and ValidationCallback. "
            "This will result in evaluation metrics being calculated twice and may increase runtime."
        )
        self.wandb_init.update(**kwargs)
        # extract parameters
        self.save_dir = self.wandb_init.get('dir')
        self.name = self.wandb_init.get('name')
        self.id = self.wandb_init.get('id')

        #Log the parameters of KerasModel
  def calculate_epoch_and_sample_count(self, current_step):
    """Calculates the steps per epoch, current epoch number,
    and the number of samples seen by the model.

    Parameters
    ----------
    current_step: int
      the training step of the model

        self.wandb = wandb.init(**self.wandb_init) if wandb.run is None else wandb.run
    """
    self.steps_per_epoch = math.ceil(self.train_dataset_size /
                                     self.wandb_run.config.batch_size)
    self.epoch_num = current_step / self.steps_per_epoch
    self.sample_count = current_step * self.wandb_run.config.batch_size

  def log(self, model, extra_data, step):
    """Logs the metrics and other extra data to W&B.

    Parameters
    ----------
    model: tf.keras.Model
     the Keras model implementing the calculation
    extra_data: dict
     extra data to be logged alongside calculated metrics
    step: int
     the step number
    """

    def save_model(self, model):
        #model is a tf.keras.Model
        return None
    all_data = dict({})
    all_data.update(extra_data)
    all_data.update({
        'train/epoch': self.epoch_num,
        'train/sample_count': self.sample_count
    })

    def log_data(self, model, step):
        #model is a Deepchem KerasModel
        for dataset in self.datasets:
            scores = model.evaluate(dataset, self.metrics)
        self.wandb.log()
    if self.metrics is not None and self.metrics:
      # Get Training Metrics (interval dependent)
      if self.logging_strategy == "step" and step % self.wandb_run.config.log_frequency == 0:
        scores = model.evaluate(self.datasets["train"], self.metrics)
        scores = {'train/' + k: v for k, v in scores.items()}
        all_data.update(scores)
      elif self.logging_strategy == "epoch" and step % self.steps_per_epoch == 0:
        scores = model.evaluate(self.datasets["train"], self.metrics)
        scores = {'train/' + k: v for k, v in scores.items()}
        all_data.update(scores)

      # Get Eval Metrics (interval dependent)
      if self.datasets["eval"] is not None:
        if self.logging_strategy == "step" and step % self.wandb_run.config.log_frequency == 0:
          scores = model.evaluate(self.datasets["eval"], self.metrics)
          scores = {'eval/' + k: v for k, v in scores.items()}
          all_data.update(scores)
        elif self.logging_strategy == "epoch" and step % self.steps_per_epoch == 0:
          scores = model.evaluate(self.datasets["eval"], self.metrics)
          scores = {'eval/' + k: v for k, v in scores.items()}
          all_data.update(scores)

    def update_config(self):
        return None
    self.wandb_run.log(all_data, step=step)

  def finish(self):
    """Finishes and closes the W&B run.
    """
    self.wandb_run.finish()

  def update_config(self, config_data):
    """Updates the W&B configuration.
    Parameters
    ----------
    config_data: dict
      additional configuration data to add
    """
    self.wandb_run.config.update(config_data)