Commit d65c8b2a authored by peastman's avatar peastman
Browse files

More type annotations

parent 7ecee995
Loading
Loading
Loading
Loading
+15 −15
Original line number Diff line number Diff line
@@ -381,7 +381,7 @@ class Dataset(object):
    raise NotImplementedError()

  def transform(self, fn: Callable[[np.ndarray, np.ndarray, np.ndarray], Tuple[
      np.ndarray, np.ndarray, np.ndarray]], **args) -> Dataset:
      np.ndarray, np.ndarray, np.ndarray]], **args) -> "Dataset":
    """Construct a new dataset by applying a transformation to every sample in this dataset.

    The argument is a function that can be called as follows:
@@ -812,7 +812,7 @@ class NumpyDataset(Dataset):
            for i in range(n_samples))

  def transform(self, fn: Callable[[np.ndarray, np.ndarray, np.ndarray], Tuple[
      np.ndarray, np.ndarray, np.ndarray]], **args) -> NumpyDataset:
      np.ndarray, np.ndarray, np.ndarray]], **args) -> "NumpyDataset":
    """Construct a new dataset by applying a transformation to every sample in this dataset.

    The argument is a function that can be called as follows:
@@ -836,7 +836,7 @@ class NumpyDataset(Dataset):
    return NumpyDataset(newx, newy, neww, self._ids[:])

  def select(self, indices: Sequence[int],
             select_dir: str = None) -> NumpyDataset:
             select_dir: str = None) -> "NumpyDataset":
    """Creates a new dataset from a selection of indices from self.

    Parameters
@@ -896,7 +896,7 @@ class NumpyDataset(Dataset):
    return TorchDataset()

  @staticmethod
  def from_DiskDataset(ds: DiskDataset) -> NumpyDataset:
  def from_DiskDataset(ds: "DiskDataset") -> "NumpyDataset":
    """

    Parameters
@@ -924,13 +924,13 @@ class NumpyDataset(Dataset):
      json.dump(d, fout)

  @staticmethod
  def from_json(fname: str) -> NumpyDataset:
  def from_json(fname: str) -> "NumpyDataset":
    with open(fname) as fin:
      d = json.load(fin)
      return NumpyDataset(d['X'], d['y'], d['w'], d['ids'])

  @staticmethod
  def merge(datasets: Sequence[Dataset]) -> NumpyDataset:
  def merge(datasets: Sequence[Dataset]) -> "NumpyDataset":
    """
    Parameters
    ----------
@@ -1337,7 +1337,7 @@ class DiskDataset(Dataset):
    return iterate(self)

  def transform(self, fn: Callable[[np.ndarray, np.ndarray, np.ndarray], Tuple[
      np.ndarray, np.ndarray, np.ndarray]], **args) -> DiskDataset:
      np.ndarray, np.ndarray, np.ndarray]], **args) -> "DiskDataset":
    """Construct a new dataset by applying a transformation to every sample in this dataset.

    The argument is a function that can be called as follows:
@@ -1422,7 +1422,7 @@ class DiskDataset(Dataset):
                 w: Optional[np.ndarray] = None,
                 ids: Optional[np.ndarray] = None,
                 tasks: Optional[Sequence] = None,
                 data_dir: Optional[str] = None):
                 data_dir: Optional[str] = None) -> "DiskDataset":
    """Creates a DiskDataset object from specified Numpy arrays."""
    n_samples = len(X)
    if ids is None:
@@ -1458,8 +1458,8 @@ class DiskDataset(Dataset):
        [(X, y, w, ids)], data_dir=data_dir, tasks=tasks)

  @staticmethod
  def merge(datasets: Iterable[DiskDataset],
            merge_dir: Optional[str] = None) -> DiskDataset:
  def merge(datasets: Iterable["DiskDataset"],
            merge_dir: Optional[str] = None) -> "DiskDataset":
    """Merges provided datasets into a merged dataset."""
    if merge_dir is not None:
      if not os.path.exists(merge_dir):
@@ -1492,7 +1492,7 @@ class DiskDataset(Dataset):
        generator(), data_dir=merge_dir, tasks=tasks)

  def subset(self, shard_nums: Sequence[int],
             subset_dir: Optional[str] = None) -> DiskDataset:
             subset_dir: Optional[str] = None) -> "DiskDataset":
    """Creates a subset of the original dataset on disk."""
    if subset_dir is not None:
      if not os.path.exists(subset_dir):
@@ -1550,7 +1550,7 @@ class DiskDataset(Dataset):
    time2 = time.time()
    logger.info("TIMING: sparse_shuffle took %0.3f s" % (time2 - time1))

  def complete_shuffle(self, data_dir: Optional[str] = None) -> DiskDataset:
  def complete_shuffle(self, data_dir: Optional[str] = None) -> "DiskDataset":
    """
    Completely shuffle across all data, across all shards.

@@ -1702,7 +1702,7 @@ class DiskDataset(Dataset):
    return np.array(
        load_from_disk(os.path.join(self.data_dir, row['y'])), dtype=object)

  def get_shard_w(self, i: int) -> no.ndarray:
  def get_shard_w(self, i: int) -> np.ndarray:
    """Retrieves the weights for the i-th shard from disk.

    Parameters
@@ -1739,7 +1739,7 @@ class DiskDataset(Dataset):
    self._cached_shards = None

  def select(self, indices: Sequence[int],
             select_dir: str = None) -> DiskDataset:
             select_dir: str = None) -> "DiskDataset":
    """Creates a new dataset from a selection of indices from self.

    Parameters
@@ -2095,7 +2095,7 @@ class ImageDataset(Dataset):
    return NumpyDataset(newx, newy, neww, self.ids[:])

  def select(self, indices: Sequence[int],
             select_dir: str = None) -> ImageDataset:
             select_dir: str = None) -> "ImageDataset":
    """Creates a new dataset from a selection of indices from self.

    Parameters
+64 −54
Original line number Diff line number Diff line
@@ -15,6 +15,9 @@ from deepchem.utils.save import log
from deepchem.metrics import to_one_hot
from tensorflow.keras.layers import Input, Dense, Reshape, Softmax, Dropout, Activation, Lambda

from typing import Any, Callable, Iterable, List, Optional, Sequence, Tuple, Union
from deepchem.utils.typing import ActivationFn, LossFunction, OneOrMany

logger = logging.getLogger(__name__)


@@ -34,18 +37,18 @@ class MultitaskClassifier(KerasModel):
  """

  def __init__(self,
               n_tasks,
               n_features,
               layer_sizes=[1000],
               weight_init_stddevs=0.02,
               bias_init_consts=1.0,
               weight_decay_penalty=0.0,
               weight_decay_penalty_type="l2",
               dropouts=0.5,
               activation_fns=tf.nn.relu,
               n_classes=2,
               residual=False,
               **kwargs):
               n_tasks: int,
               n_features: int,
               layer_sizes: Sequence[int] = [1000],
               weight_init_stddevs: OneOrMany[float] = 0.02,
               bias_init_consts: OneOrMany[float] = 1.0,
               weight_decay_penalty: float = 0.0,
               weight_decay_penalty_type: str = "l2",
               dropouts: OneOrMany[float] = 0.5,
               activation_fns: OneOrMany[ActivationFn] = tf.nn.relu,
               n_classes: int = 2,
               residual: bool = False,
               **kwargs) -> None:
    """Create a MultitaskClassifier.

    In addition to the following arguments, this class also accepts
@@ -66,7 +69,7 @@ class MultitaskClassifier(KerasModel):
      equal len(layer_sizes).  Alternatively this may be a single
      value instead of a list, in which case the same value is used
      for every layer.
    bias_init_consts: list or loat
    bias_init_consts: list or float
      the value to initialize the biases in each layer to.  The
      length of this list should equal len(layer_sizes).
      Alternatively this may be a single value instead of a list, in
@@ -150,12 +153,13 @@ class MultitaskClassifier(KerasModel):
        output_types=['prediction', 'loss'],
        **kwargs)

  def default_generator(self,
                        dataset,
                        epochs=1,
                        mode='fit',
                        deterministic=True,
                        pad_batches=True):
  def default_generator(
      self,
      dataset: dc.data.Dataset,
      epochs: int = 1,
      mode: str = 'fit',
      deterministic: bool = True,
      pad_batches: bool = True) -> Iterable[Tuple[List, List, List]]:
    for epoch in range(epochs):
      for (X_b, y_b, w_b, ids_b) in dataset.iterbatches(
          batch_size=self.batch_size,
@@ -183,18 +187,18 @@ class MultitaskRegressor(KerasModel):
  """

  def __init__(self,
               n_tasks,
               n_features,
               layer_sizes=[1000],
               weight_init_stddevs=0.02,
               bias_init_consts=1.0,
               weight_decay_penalty=0.0,
               weight_decay_penalty_type="l2",
               dropouts=0.5,
               activation_fns=tf.nn.relu,
               uncertainty=False,
               residual=False,
               **kwargs):
               n_tasks: int,
               n_features: int,
               layer_sizes: Sequence[int] = [1000],
               weight_init_stddevs: OneOrMany[float] = 0.02,
               bias_init_consts: OneOrMany[float] = 1.0,
               weight_decay_penalty: float = 0.0,
               weight_decay_penalty_type: str = "l2",
               dropouts: OneOrMany[float] = 0.5,
               activation_fns: OneOrMany[ActivationFn] = tf.nn.relu,
               uncertainty: bool = False,
               residual: bool = False,
               **kwargs) -> None:
    """Create a MultitaskRegressor.

    In addition to the following arguments, this class also accepts all the keywork arguments
@@ -296,6 +300,7 @@ class MultitaskRegressor(KerasModel):
            stddev=weight_init_stddevs[-1]),
        bias_initializer=tf.constant_initializer(
            value=bias_init_consts[-1]))(prev_layer))
    loss: Union[dc.models.losses.Loss, LossFunction]
    if uncertainty:
      log_var = Reshape((n_tasks, 1))(Dense(
          n_tasks,
@@ -318,12 +323,13 @@ class MultitaskRegressor(KerasModel):
    super(MultitaskRegressor, self).__init__(
        model, loss, output_types=output_types, **kwargs)

  def default_generator(self,
                        dataset,
                        epochs=1,
                        mode='fit',
                        deterministic=True,
                        pad_batches=True):
  def default_generator(
      self,
      dataset: dc.data.Dataset,
      epochs: int = 1,
      mode: str = 'fit',
      deterministic: bool = True,
      pad_batches: bool = True) -> Iterable[Tuple[List, List, List]]:
    for epoch in range(epochs):
      for (X_b, y_b, w_b, ids_b) in dataset.iterbatches(
          batch_size=self.batch_size,
@@ -358,10 +364,10 @@ class MultitaskFitTransformRegressor(MultitaskRegressor):
  """

  def __init__(self,
               n_tasks,
               n_features,
               fit_transformers=[],
               batch_size=50,
               n_tasks: int,
               n_features: int,
               fit_transformers: Sequence[dc.trans.Transformer] = [],
               batch_size: int = 50,
               **kwargs):
    """Create a MultitaskFitTransformRegressor.

@@ -388,18 +394,21 @@ class MultitaskFitTransformRegressor(MultitaskRegressor):
    else:
      raise ValueError("n_features should be list or int")
    for transformer in fit_transformers:
      X_b = transformer.X_transform(X_b)
      assert transformer.transform_X and not (transformer.transform_y or
                                              transformer.transform_w)
      X_b, _, _ = transformer.transform_array(X_b, None, None)
    n_features = X_b.shape[1]
    logger.info("n_features after fit_transform: %d", int(n_features))
    super(MultitaskFitTransformRegressor, self).__init__(
        n_tasks, n_features, batch_size=batch_size, **kwargs)

  def default_generator(self,
                        dataset,
                        epochs=1,
                        mode='fit',
                        deterministic=True,
                        pad_batches=True):
  def default_generator(
      self,
      dataset: dc.data.Dataset,
      epochs: int = 1,
      mode: str = 'fit',
      deterministic: bool = True,
      pad_batches: bool = True) -> Iterable[Tuple[List, List, List]]:
    for epoch in range(epochs):
      for (X_b, y_b, w_b, ids_b) in dataset.iterbatches(
          batch_size=self.batch_size,
@@ -410,18 +419,19 @@ class MultitaskFitTransformRegressor(MultitaskRegressor):
        if X_b is not None:
          if mode == 'fit':
            for transformer in self.fit_transformers:
              X_b = transformer.X_transform(X_b)
              X_b, _, _ = transformer.transform_array(X_b, None, None)
        if mode == 'predict':
          dropout = np.array(0.0)
        else:
          dropout = np.array(1.0)
        yield ([X_b, dropout], [y_b], [w_b])

  def predict_on_generator(self,
                           generator,
                           transformers=[],
                           outputs=None,
                           output_types=None):
  def predict_on_generator(
      self,
      generator: Iterable[Tuple[Any, Any, Any]],
      transformers: List[dc.trans.Transformer] = [],
      outputs: Optional[OneOrMany[tf.Tensor]] = None,
      output_types: Optional[OneOrMany[str]] = None) -> OneOrMany[np.ndarray]:

    def transform_generator():
      for inputs, labels, weights in generator:
+1 −3
Original line number Diff line number Diff line
@@ -19,9 +19,7 @@ from deepchem.trans import Transformer, undo_transforms
from deepchem.utils.evaluate import GeneratorEvaluator

from typing import Any, Callable, Dict, Iterable, List, Optional, Sequence, Tuple, Union
from deepchem.utils.typing import OneOrMany

LossFunction = Callable[[List, List, List], float]
from deepchem.utils.typing import LossFunction, OneOrMany

try:
  import wandb
+3 −1
Original line number Diff line number Diff line
"""Type annotations that are widely used in DeepChem"""

from typing import Sequence, Tuple, TypeVar, Union
from typing import Callable, List, Sequence, Tuple, TypeVar, Union

T = TypeVar("T")
ActivationFn = Union[Callable, str]
LossFunction = Callable[[List, List, List], float]
OneOrMany = Union[T, Sequence[T]]
Shape = Tuple[int, ...]
+5 −5

File changed.

Contains only whitespace changes.