Commit 1b8db4d6 authored by peastman's avatar peastman
Browse files

Use multiple processes to transform datasets

parent 53366e7d
Loading
Loading
Loading
Loading
+76 −29
Original line number Diff line number Diff line
@@ -15,7 +15,7 @@ import time
import shutil
import json
import warnings
from multiprocessing.dummy import Pool
import multiprocessing
from deepchem.utils.save import save_to_disk, save_metadata
from deepchem.utils.save import load_from_disk

@@ -380,8 +380,7 @@ class Dataset(object):
    """
    raise NotImplementedError()

  def transform(self, fn: Callable[[np.ndarray, np.ndarray, np.ndarray], Tuple[
      np.ndarray, np.ndarray, np.ndarray]], **args) -> "Dataset":
  def transform(self, transformer: "dc.trans.Transformer", **args) -> "Dataset":
    """Construct a new dataset by applying a transformation to every sample in this dataset.

    The argument is a function that can be called as follows:
@@ -394,8 +393,8 @@ class Dataset(object):

    Parameters
    ----------
    fn: function
      A function to apply to each sample in the dataset
    transformer: Transformer
      the transformation to apply to each sample in the dataset

    Returns
    -------
@@ -811,8 +810,8 @@ class NumpyDataset(Dataset):
    return ((self._X[i], self._y[i], self._w[i], self._ids[i])
            for i in range(n_samples))

  def transform(self, fn: Callable[[np.ndarray, np.ndarray, np.ndarray], Tuple[
      np.ndarray, np.ndarray, np.ndarray]], **args) -> "NumpyDataset":
  def transform(self, transformer: "dc.trans.Transformer",
                **args) -> "NumpyDataset":
    """Construct a new dataset by applying a transformation to every sample in this dataset.

    The argument is a function that can be called as follows:
@@ -825,14 +824,14 @@ class NumpyDataset(Dataset):

    Parameters
    ----------
    fn: function
      A function to apply to each sample in the dataset
    transformer: Transformer
      the transformation to apply to each sample in the dataset

    Returns
    -------
    a newly constructed Dataset object
    """
    newx, newy, neww = fn(self._X, self._y, self._w)
    newx, newy, neww = transformer.transform_array(self._X, self._y, self._w)
    return NumpyDataset(newx, newy, neww, self._ids[:])

  def select(self, indices: Sequence[int],
@@ -1218,7 +1217,8 @@ class DiskDataset(Dataset):
      # than process based pools, since process based pools need to pickle/serialize
      # objects as an extra overhead. Also, as hideously as un-thread safe this looks,
      # we're actually protected by the GIL.
      pool = Pool(1)  # mp.dummy aliases ThreadPool to Pool
      pool = multiprocessing.dummy.Pool(
          1)  # mp.dummy aliases ThreadPool to Pool

      if batch_size is None:
        num_global_batches = num_shards
@@ -1336,8 +1336,10 @@ class DiskDataset(Dataset):

    return iterate(self)

  def transform(self, fn: Callable[[np.ndarray, np.ndarray, np.ndarray], Tuple[
      np.ndarray, np.ndarray, np.ndarray]], **args) -> "DiskDataset":
  def transform(self,
                transformer: "dc.trans.Transformer",
                parallel=False,
                **args) -> "DiskDataset":
    """Construct a new dataset by applying a transformation to every sample in this dataset.

    The argument is a function that can be called as follows:
@@ -1350,11 +1352,13 @@ class DiskDataset(Dataset):

    Parameters
    ----------
    fn: function
      A function to apply to each sample in the dataset
    transformer: Transformer
      the transformation to apply to each sample in the dataset
    out_dir: string
      The directory to save the new dataset in.  If this is omitted, a
      temporary directory is created automatically
    parallel: bool
      if True, use multiple processes to transform the dataset in parallel

    Returns
    -------
@@ -1365,18 +1369,61 @@ class DiskDataset(Dataset):
    else:
      out_dir = tempfile.mkdtemp()
    tasks = self.get_task_names()

    n_shards = self.get_number_shards()

    time1 = time.time()
    if parallel:
      results = []
      pool = multiprocessing.Pool()
      for i in range(self.get_number_shards()):
        row = self.metadata_df.iloc[i]
        X_file = os.path.join(self.data_dir, row['X'])
        if row['y'] is not None:
          y_file: Optional[str] = os.path.join(self.data_dir, row['y'])
        else:
          y_file = None
        if row['w'] is not None:
          w_file: Optional[str] = os.path.join(self.data_dir, row['w'])
        else:
          w_file = None
        ids_file = os.path.join(self.data_dir, row['ids'])
        results.append(
            pool.apply_async(DiskDataset._transform_shard,
                             (transformer, i, X_file, y_file, w_file, ids_file,
                              out_dir, tasks)))
      pool.close()
      metadata_rows = [r.get() for r in results]
      metadata_df = DiskDataset._construct_metadata(metadata_rows)
      save_metadata(tasks, metadata_df, out_dir)
      dataset = DiskDataset(out_dir)
    else:

      def generator():
        for shard_num, row in self.metadata_df.iterrows():
          logger.info("Transforming shard %d/%d" % (shard_num, n_shards))
          X, y, w, ids = self.get_shard(shard_num)
        newx, newy, neww = fn(X, y, w)
          newx, newy, neww = transformer.transform_array(X, y, w)
          yield (newx, newy, neww, ids)

    return DiskDataset.create_dataset(
      dataset = DiskDataset.create_dataset(
          generator(), data_dir=out_dir, tasks=tasks)
    time2 = time.time()
    logger.info("TIMING: transforming took %0.3f s" % (time2 - time1))
    return dataset

  @staticmethod
  def _transform_shard(transformer: "dc.trans.Transformer", shard_num: int,
                       X_file: str, y_file: str, w_file: str, ids_file: str,
                       out_dir: str, tasks: np.ndarray):
    """This is called by transform() to transform a single shard."""
    X = None if X_file is None else np.array(load_from_disk(X_file))
    y = None if y_file is None else np.array(load_from_disk(y_file))
    w = None if w_file is None else np.array(load_from_disk(w_file))
    ids = np.array(load_from_disk(ids_file))
    X, y, w = transformer.transform_array(X, y, w)
    basename = "shard-%d" % shard_num
    return DiskDataset.write_data_to_disk(out_dir, basename, tasks, X, y, w,
                                          ids)

  def make_pytorch_dataset(self, epochs: int = 1, deterministic: bool = False):
    """Create a torch.utils.data.IterableDataset that iterates over the data in this Dataset.
@@ -2082,8 +2129,8 @@ class ImageDataset(Dataset):
    return ((get_image(self._X, i), get_image(self._y, i), self._w[i],
             self._ids[i]) for i in range(n_samples))

  def transform(self, fn: Callable[[np.ndarray, np.ndarray, np.ndarray], Tuple[
      np.ndarray, np.ndarray, np.ndarray]], **args) -> NumpyDataset:
  def transform(self, transformer: "dc.trans.Transformer",
                **args) -> NumpyDataset:
    """Construct a new dataset by applying a transformation to every sample in this dataset.

    The argument is a function that can be called as follows:
@@ -2096,14 +2143,14 @@ class ImageDataset(Dataset):

    Parameters
    ----------
    fn: function
      A function to apply to each sample in the dataset
    transformer: Transformer
      the transformation to apply to each sample in the dataset

    Returns
    -------
    a newly constructed Dataset object
    """
    newx, newy, neww = fn(self.X, self.y, self.w)
    newx, newy, neww = transformer.transform_array(self.X, self.y, self.w)
    return NumpyDataset(newx, newy, neww, self.ids[:])

  def select(self, indices: Sequence[int],
+19 −15
Original line number Diff line number Diff line
@@ -53,6 +53,12 @@ def load_multitask_data():
  return loader.featurize(input_file)


class TestTransformer(dc.trans.Transformer):

  def transform_array(self, X, y, w):
    return (2 * X, 1.5 * y, w)


class TestDatasets(test_util.TensorFlowTestCase):
  """
  Test basic top-level API for dataset objects.
@@ -386,10 +392,8 @@ class TestDatasets(test_util.TensorFlowTestCase):

    # Transform it

    def fn(x, y, w):
      return (2 * x, 1.5 * y, w)

    transformed = dataset.transform(fn)
    transformer = TestTransformer(transform_X=True, transform_y=True)
    transformed = dataset.transform(transformer)
    np.testing.assert_array_equal(X, dataset.X)
    np.testing.assert_array_equal(y, dataset.y)
    np.testing.assert_array_equal(w, dataset.w)
@@ -408,10 +412,10 @@ class TestDatasets(test_util.TensorFlowTestCase):
    ids = dataset.ids

    # Transform it
    def fn(x, y, w):
      return (2 * x, 1.5 * y, w)

    transformed = dataset.transform(fn)
    transformer = TestTransformer(transform_X=True, transform_y=True)
    for parallel in (True, False):
      transformed = dataset.transform(transformer, parallel=parallel)
      np.testing.assert_array_equal(X, dataset.X)
      np.testing.assert_array_equal(y, dataset.y)
      np.testing.assert_array_equal(w, dataset.w)
+1 −3
Original line number Diff line number Diff line
@@ -56,9 +56,7 @@ class TestReload(unittest.TestCase):
    # TODO(rbharath): Transformers don't play nice with reload! Namely,
    # reloading will cause the transform to be reapplied. This is undesirable in
    # almost all cases. Need to understand a method to fix this.
    transformers = [
        dc.trans.BalancingTransformer(transform_w=True, dataset=train_dataset)
    ]
    transformers = [dc.trans.BalancingTransformer(dataset=train_dataset)]
    logger.info("Transforming datasets")
    for dataset in [train_dataset, valid_dataset, test_dataset]:
      for transformer in transformers:
+5 −9
Original line number Diff line number Diff line
@@ -18,8 +18,7 @@ def test_binary_1d():
  w = np.ones((n_samples,))
  dataset = dc.data.NumpyDataset(X, y, w)

  balancing_transformer = dc.trans.BalancingTransformer(
      transform_w=True, dataset=dataset)
  balancing_transformer = dc.trans.BalancingTransformer(dataset=dataset)
  dataset = balancing_transformer.transform(dataset)
  X_t, y_t, w_t, ids_t = (dataset.X, dataset.y, dataset.w, dataset.ids)
  # Check ids are unchanged.
@@ -52,8 +51,7 @@ def test_binary_singletask():
  w = np.ones((n_samples, n_tasks))
  dataset = dc.data.NumpyDataset(X, y, w)

  balancing_transformer = dc.trans.BalancingTransformer(
      transform_w=True, dataset=dataset)
  balancing_transformer = dc.trans.BalancingTransformer(dataset=dataset)
  dataset = balancing_transformer.transform(dataset)
  X_t, y_t, w_t, ids_t = (dataset.X, dataset.y, dataset.w, dataset.ids)
  # Check ids are unchanged.
@@ -86,7 +84,7 @@ def test_binary_multitask():
  w = np.ones((n_samples, n_tasks))
  multitask_dataset = dc.data.NumpyDataset(X, y, w)
  balancing_transformer = dc.trans.BalancingTransformer(
      transform_w=True, dataset=multitask_dataset)
      dataset=multitask_dataset)
  #X, y, w, ids = (multitask_dataset.X, multitask_dataset.y,
  #                multitask_dataset.w, multitask_dataset.ids)
  multitask_dataset = balancing_transformer.transform(multitask_dataset)
@@ -122,8 +120,7 @@ def test_multiclass_singletask():
  w = np.ones((n_samples, n_tasks))
  dataset = dc.data.NumpyDataset(X, y, w)

  balancing_transformer = dc.trans.BalancingTransformer(
      transform_w=True, dataset=dataset)
  balancing_transformer = dc.trans.BalancingTransformer(dataset=dataset)
  dataset = balancing_transformer.transform(dataset)
  X_t, y_t, w_t, ids_t = (dataset.X, dataset.y, dataset.w, dataset.ids)
  # Check ids are unchanged.
@@ -157,8 +154,7 @@ def test_transform_to_directory():
  w = np.ones((n_samples,))
  dataset = dc.data.NumpyDataset(X, y, w)

  balancing_transformer = dc.trans.BalancingTransformer(
      transform_w=True, dataset=dataset)
  balancing_transformer = dc.trans.BalancingTransformer(dataset=dataset)
  with tempfile.TemporaryDirectory() as tmpdirname:
    dataset = balancing_transformer.transform(dataset, out_dir=tmpdirname)
    balanced_dataset = dc.data.DiskDataset(tmpdirname)
+24 −97
Original line number Diff line number Diff line
@@ -107,8 +107,6 @@ class Transformer(object):
    self.transform_w = transform_w
    # One, but not both, transform_X or tranform_y is true
    assert transform_X or transform_y or transform_w
    # Use fact that bools add as ints in python
    assert (transform_X + transform_y + transform_w) == 1

  def transform_array(self, X, y, w):
    """Transform the data in a set of (X, y, w) arrays.
@@ -166,7 +164,8 @@ class Transformer(object):
    dataset: dc.data.Dataset
      Dataset object to be transformed.
    parallel: bool, optional (default False)
      At present this argument is ignored.
      if True, use multiple processes to transform the dataset in parallel.
      For large datasets, this might be faster.
    out_dir: str, optional
      If `out_dir` is specified in `kwargs` and `dataset` is a `DiskDataset`,
      the output dataset will be written to the specified directory.
@@ -185,8 +184,7 @@ class Transformer(object):
      raise ValueError("Cannot transform y when y_values are not present")
    if w_shape == tuple() and self.transform_w:
      raise ValueError("Cannot transform w when w_values are not present")
    return dataset.transform(
        lambda X, y, w: self.transform_array(X, y, w), out_dir=out_dir)
    return dataset.transform(self, out_dir=out_dir, parallel=parallel)

  def transform_on_array(self, X, y, w):
    """Transforms numpy arrays X, y, and w
@@ -257,15 +255,10 @@ class MinMaxTransformer(Transformer):

  Raises
  ------
  `ValueError` if `transform_w` is set or `transform_X` and `transform_y` are
  both set.
  `ValueError` if `transform_X` and `transform_y` are both set.
  """

  def __init__(self,
               transform_X=False,
               transform_y=False,
               transform_w=False,
               dataset=None):
  def __init__(self, transform_X=False, transform_y=False, dataset=None):
    """Initialization of MinMax transformer.

    Parameters
@@ -274,15 +267,11 @@ class MinMaxTransformer(Transformer):
      Whether to transform X
    transform_y: bool, optional (default False)
      Whether to transform y
    transform_w: bool, optional (default False)
      Whether to transform w
    dataset: dc.data.Dataset object, optional (default None)
      Dataset to be transformed
    """
    if transform_X and transform_y:
      raise ValueError("Can only transform only one of X and y")
    if transform_w:
      raise ValueError("MinMaxTransformer doesn't support w transformation.")
    if transform_X:
      self.X_min = np.min(dataset.X, axis=0)
      self.X_max = np.max(dataset.X, axis=0)
@@ -295,10 +284,7 @@ class MinMaxTransformer(Transformer):
        assert len(self.y_min) == dataset.y.shape[1]

    super(MinMaxTransformer, self).__init__(
        transform_X=transform_X,
        transform_y=transform_y,
        transform_w=transform_w,
        dataset=dataset)
        transform_X=transform_X, transform_y=transform_y, dataset=dataset)

  def transform(self, dataset, parallel=False):
    """Transforms the dataset.
@@ -413,8 +399,7 @@ class NormalizationTransformer(Transformer):

  Raises
  ------
  `ValueError` if `transform_w` is set or `transform_X` and `transform_y` are
  both set.
  `ValueError` if `transform_X` and `transform_y` are both set.
  """

  def __init__(self,
@@ -554,7 +539,6 @@ class ClippingTransformer(Transformer):
  def __init__(self,
               transform_X=False,
               transform_y=False,
               transform_w=False,
               dataset=None,
               x_max=5.,
               y_max=500.):
@@ -566,8 +550,6 @@ class ClippingTransformer(Transformer):
      Whether to transform X
    transform_y: bool, optional (default False)
      Whether to transform y
    transform_w: bool, optional (default False)
      Whether to transform w
    dataset: dc.data.Dataset object, optional
      Dataset to be transformed
    x_max: float, optional
@@ -585,12 +567,7 @@ class ClippingTransformer(Transformer):
    `ValueError` if `transform_w` is set.
    """
    super(ClippingTransformer, self).__init__(
        transform_X=transform_X,
        transform_y=transform_y,
        transform_w=transform_w,
        dataset=dataset)
    if transform_w:
      raise ValueError("ClippingTransformer doesn't support w transformation.")
        transform_X=transform_X, transform_y=transform_y, dataset=dataset)

    self.x_max = x_max
    self.y_max = y_max
@@ -668,7 +645,6 @@ class LogTransformer(Transformer):
  def __init__(self,
               transform_X=False,
               transform_y=False,
               transform_w=False,
               features=None,
               tasks=None,
               dataset=None):
@@ -680,8 +656,6 @@ class LogTransformer(Transformer):
      Whether to transform X
    transform_y: bool, optional (default False)
      Whether to transform y
    transform_w: bool, optional (default False)
      Whether to transform w
    dataset: dc.data.Dataset object, optional (default None)
      Dataset to be transformed
    features: list[Int]
@@ -691,8 +665,6 @@ class LogTransformer(Transformer):
    """
    if transform_X and transform_y:
      raise ValueError("Can only transform only one of X and y")
    if transform_w:
      raise ValueError("MinMaxTransformer doesn't support w transformation.")
    self.features = features
    self.tasks = tasks
    super(LogTransformer, self).__init__(
@@ -796,7 +768,7 @@ class BalancingTransformer(Transformer):
  >>> y = np.random.randint(n_classes, size=(n_samples, n_tasks))
  >>> w = np.ones((n_samples, n_tasks))
  >>> dataset = dc.data.NumpyDataset(X, y, w, ids)
  >>> transformer = dc.trans.BalancingTransformer(transform_w=True, dataset=dataset)
  >>> transformer = dc.trans.BalancingTransformer(dataset=dataset)
  >>> dataset = transformer.transform(dataset)

  And here's a multiclass dataset example.
@@ -810,7 +782,7 @@ class BalancingTransformer(Transformer):
  >>> y = np.random.randint(n_classes, size=(n_samples, n_tasks))
  >>> w = np.ones((n_samples, n_tasks))
  >>> dataset = dc.data.NumpyDataset(X, y, w, ids)
  >>> transformer = dc.trans.BalancingTransformer(transform_w=True, dataset=dataset)
  >>> transformer = dc.trans.BalancingTransformer(dataset=dataset)
  >>> dataset = transformer.transform(dataset)

  Note
@@ -825,21 +797,10 @@ class BalancingTransformer(Transformer):
  `ValueError` if `y` or `w` aren't of shape `(N,)` or `(N, n_tasks)`.
  """

  def __init__(self,
               transform_X=False,
               transform_y=False,
               transform_w=False,
               dataset=None):
  def __init__(self, dataset=None):
    # BalancingTransformer can only transform weights.
    if transform_X or transform_y:
      raise ValueError("Cannot transform X or y")
    if not transform_w:
      raise ValueError("BalancingTransformer must have transform_w=True.")
    super(BalancingTransformer, self).__init__(
        transform_X=transform_X,
        transform_y=transform_y,
        transform_w=transform_w,
        dataset=dataset)
        transform_w=True, dataset=dataset)

    # Compute weighting factors from dataset.
    y = dataset.y
@@ -932,11 +893,7 @@ class CDFTransformer(Transformer):
  TODO: Add an example of this. The current documentation is confusing.
  """

  def __init__(self,
               transform_X=False,
               transform_y=False,
               transform_w=False,
               dataset=None,
  def __init__(self, transform_X=False, transform_y=False, dataset=None,
               bins=2):
    """Initialize this transformer.

@@ -946,15 +903,13 @@ class CDFTransformer(Transformer):
      Whether to transform X
    transform_y: bool, optional (default False)
      Whether to transform y
    transform_w: bool, optional (default False)
      Whether to transform w
    dataset: dc.data.Dataset object, optional (default None)
      Dataset to be transformed
    bins: int, optional (default 2)

    """
    self.transform_X = transform_X
    self.transform_y = transform_y
    super(CDFTransformer, self).__init__(
        transform_X=transform_X, transform_y=transform_y)
    self.bins = bins
    self.y = dataset.y
    # self.w = dataset.w
@@ -1049,7 +1004,6 @@ class PowerTransformer(Transformer):
  def __init__(self,
               transform_X=False,
               transform_y=False,
               transform_w=False,
               dataset=None,
               powers=[1]):
    """Initialize this transformer
@@ -1060,18 +1014,14 @@ class PowerTransformer(Transformer):
      Whether to transform X
    transform_y: bool, optional (default False)
      Whether to transform y
    transform_w: bool, optional (default False)
      Whether to transform w
    dataset: dc.data.Dataset object, optional (default None)
      Dataset to be transformed. Note that this argument is ignored since
      `PowerTransformer` doesn't require it to be specified.
    powers: list[int], optional (default `[1]`)
      The list of powers of features/labels to compute.
    """
    if transform_w:
      raise ValueError("PowerTransformer doesn't support w transformation.")
    self.transform_X = transform_X
    self.transform_y = transform_y
    super(PowerTransformer, self).__init__(
        transform_X=transform_X, transform_y=transform_y)
    self.powers = powers

  def transform_array(self, X, y, w):
@@ -1417,21 +1367,12 @@ class DAGTransformer(Transformer):
  DAG calculation orders
  """

  def __init__(self,
               max_atoms=50,
               transform_X=True,
               transform_y=False,
               transform_w=False):
  def __init__(self, max_atoms=50):
    """Initializes DAGTransformer.
    Only X can be transformed
    """
    self.max_atoms = max_atoms
    self.transform_X = transform_X
    self.transform_y = transform_y
    self.transform_w = transform_w
    assert self.transform_X
    assert not self.transform_y
    assert not self.transform_w
    super(DAGTransformer, self).__init__(transform_X=True)

  def transform_array(self, X, y, w):
    """Add calculation orders to ConvMol objects"""
@@ -1542,16 +1483,10 @@ class ImageTransformer(Transformer):
  Convert an image into width, height, channel
  """

  def __init__(self,
               size,
               transform_X=True,
               transform_y=False,
               transform_w=False):
  def __init__(self, size):
    """Initializes transformation based on dataset statistics."""
    self.size = size
    self.transform_X = True
    self.transform_y = False
    self.transform_w = False
    super(ImageTransformer, self).__init__(transform_X=True)

  def transform_array(self, X, y, w):
    """Transform the data in a set of (X, y, w) arrays."""
@@ -1573,10 +1508,7 @@ class ANITransformer(Transformer):
               angular_length=8,
               atom_cases=[1, 6, 7, 8, 16],
               atomic_number_differentiated=True,
               coordinates_in_bohr=True,
               transform_X=True,
               transform_y=False,
               transform_w=False):
               coordinates_in_bohr=True):
    """
    Only X can be transformed
    """
@@ -1588,15 +1520,10 @@ class ANITransformer(Transformer):
    self.atom_cases = atom_cases
    self.atomic_number_differentiated = atomic_number_differentiated
    self.coordinates_in_bohr = coordinates_in_bohr
    self.transform_X = transform_X
    self.transform_y = transform_y
    self.transform_w = transform_w
    self.compute_graph = self.build()
    self.sess = tf.Session(graph=self.compute_graph)
    self.transform_batch_size = 32
    assert self.transform_X
    assert not self.transform_y
    assert not self.transform_w
    super(ANITransformer, self).__init__(transform_X=True)

  def transform_array(self, X, y, w):
    if self.transform_X:
@@ -1824,7 +1751,7 @@ class FeaturizationTransformer(Transformer):
    return X, y, w


class DataTransforms(Transformer):
class DataTransforms(object):
  """Applies different data transforms to images."""

  def __init__(self, Image):