Commit 098095d4 authored by Peter Eastman's avatar Peter Eastman
Browse files

Beginning of Dataset refactoring

parent 6a840d00
Loading
Loading
Loading
Loading
+176 −34
Original line number Diff line number Diff line
@@ -107,9 +107,154 @@ def pad_batch(batch_size, X_b, y_b, w_b, ids_b):
      start += increment
    return (X_out, y_out, w_out, ids_out)


class Dataset(object):
  """Abstract base class for datasets defined by X, y, w elements."""

  def __init__(self):
    raise NotImplementedError()

  def __len__(self):
    """
    Get the number of elements in the dataset.
    """
    raise NotImplementedError()

  def get_shape(self):
    """Get the shape of the dataset.
    
    Returns four tuples, giving the shape of the X, y, w, and ids arrays.
    """
    raise NotImplementedError()

  def get_task_names(self):
    """Get the names of the tasks associated with this dataset."""
    raise NotImplementedError()

  @property
  def y(self):
    """Get the y vector for this dataset as a single numpy array."""
    raise NotImplementedError()

  @property
  def ids(self):
    """Get the ids vector for this dataset as a single numpy array."""

    raise NotImplementedError()

  @property
  def w(self):
    """Get the weight vector for this dataset as a single numpy array."""
    raise NotImplementedError()

  def to_numpy(self):
    """
    Transforms internal data into arrays X, y, w, ids

    Creates three arrays containing all data in this object. This operation is
    dangerous (!) for large datasets which don't fit into memory.
    """
    raise NotImplementedError()

  def iterbatches(self, batch_size=None, epoch=0, deterministic=False, pad_batches=False):
    """Generator that iterates over minibatches from the dataset.
    
    Each minibatch is returned as a tuple of four numpy arrays: (X, y, w, ids).
    """
    raise NotImplementedError()


class NumpyDataset(Dataset):
  """A Dataset defined by in-memory numpy arrays."""

  def __init__(self, X, y, w=None, ids=None):
    n_samples = len(X)
    # The -1 indicates that y will be reshaped to have length -1
    if n_samples > 0:
      y = np.reshape(y, (n_samples, -1))
      if w is not None:
        w = np.reshape(w, (n_samples, -1))
    n_tasks = y.shape[1]
    if ids is None:
      ids = np.arange(n_samples)
    if w is None:
      w = np.ones_like(y)
    self._X = X
    self._y = y
    self._w = w
    self._ids = np.array(ids, dtype=object)

  def __len__(self):
    """
  Wrapper class for dataset transformed into X, y, w numpy ndarrays.
    Get the number of elements in the dataset.
    """
    return len(self._y)

  def get_shape(self):
    """Get the shape of the dataset.
    
    Returns four tuples, giving the shape of the X, y, w, and ids arrays.
    """
    return self._X.shape, self._y.shape, self._w.shape, self._ids.shape

  def get_task_names(self):
    """Get the names of the tasks associated with this dataset."""
    tasks = np.arange(self._y.shape[1])

  @property
  def y(self):
    """Get the y vector for this dataset as a single numpy array."""
    return self._y

  @property
  def ids(self):
    """Get the ids vector for this dataset as a single numpy array."""
    return self._ids

  @property
  def w(self):
    """Get the weight vector for this dataset as a single numpy array."""
    return self._w

  def to_numpy(self):
    """
    Transforms internal data into arrays X, y, w, ids

    Creates three arrays containing all data in this object. This operation is
    dangerous (!) for large datasets which don't fit into memory.
    """
    return self._X, self._y, self._w, self._ids

  def iterbatches(self, batch_size=None, epoch=0, deterministic=False, pad_batches=False):
    """Generator that iterates over minibatches from the dataset.
    
    Each minibatch is returned as a tuple of four numpy arrays: (X, y, w, ids).
    """
    n_samples = self._X.shape[0]
    if not deterministic:
      sample_perm = np.random.permutation(n_samples)
    else:
      sample_perm = np.arange(n_samples)
    if batch_size is None:
      batch_size = n_samples
    interval_points = np.linspace(
        0, n_samples, np.ceil(float(n_samples)/batch_size)+1, dtype=int)
    for j in range(len(interval_points)-1):
      indices = range(interval_points[j], interval_points[j+1])
      perm_indices = sample_perm[indices]
      X_batch = self._X[perm_indices, :]
      y_batch = self._y[perm_indices]
      w_batch = self._w[perm_indices]
      ids_batch = self._ids[perm_indices]
      if pad_batches:
        (X_batch, y_batch, w_batch, ids_batch) = pad_batch(
          batch_size, X_batch, y_batch, w_batch, ids_batch)
      yield (X_batch, y_batch, w_batch, ids_batch)


class DiskDataset(Dataset):
  """
  A Dataset that is stored as a set of files on disk.
  """
  def __init__(self, data_dir=None, tasks=[], metadata_rows=None, #featurizers=None, 
               raw_data=None, verbosity=None, reload=False,
@@ -125,23 +270,23 @@ class Dataset(object):

    if not reload or not os.path.exists(self._get_metadata_filename()):
      if metadata_rows is not None:
        self.metadata_df = Dataset.construct_metadata(metadata_rows)
        self.metadata_df = DiskDataset.construct_metadata(metadata_rows)
        self.save_to_disk()
      elif raw_data is not None:
        metadata_rows = []
        ids, X, y, w = raw_data
        metadata_rows.append(
            Dataset.write_data_to_disk(
            DiskDataset.write_data_to_disk(
                self.data_dir, "data", tasks, X, y, w, ids,
                compute_feature_statistics=compute_feature_statistics))
        self.metadata_df = Dataset.construct_metadata(metadata_rows)
        self.metadata_df = DiskDataset.construct_metadata(metadata_rows)
        self.save_to_disk()
      else:
        # Create an empty metadata dataframe to be filled at a later time
        basename = "metadata"
        metadata_rows = [Dataset.write_data_to_disk(
        metadata_rows = [DiskDataset.write_data_to_disk(
            self.data_dir, basename, tasks)]
        self.metadata_df = Dataset.construct_metadata(metadata_rows)
        self.metadata_df = DiskDataset.construct_metadata(metadata_rows)
        self.save_to_disk()

    else:
@@ -184,7 +329,7 @@ class Dataset(object):
      assert X.shape[0] == y.shape[0]
      assert y.shape == w.shape
      assert len(ids) == X.shape[0]
    return Dataset.write_data_to_disk(
    return DiskDataset.write_data_to_disk(
        data_dir, basename, tasks, X, y, w, ids,
        compute_feature_statistics=compute_feature_statistics)

@@ -370,12 +515,12 @@ class Dataset(object):
        w_batch, w_next = w_next[:shard_size], w_next[shard_size:]
        ids_batch, ids_next = ids_next[:shard_size], ids_next[shard_size:]
        new_basename = "reshard-%d" % ind
        new_metadata.append(Dataset.write_data_to_disk(
        new_metadata.append(DiskDataset.write_data_to_disk(
            reshard_dir, new_basename, tasks, X_batch, y_batch, w_batch, ids_batch))
        ind += 1
    # Handle spillover from last shard
    new_basename = "reshard-%d" % ind
    new_metadata.append(Dataset.write_data_to_disk(
    new_metadata.append(DiskDataset.write_data_to_disk(
        reshard_dir, new_basename, tasks, X_next, y_next, w_next, ids_next))
    ind += 1
    # Get new metadata rows
@@ -404,7 +549,7 @@ class Dataset(object):
    if tasks is None:
      tasks = np.arange(n_tasks)
    raw_data = (ids, X, y, w)
    return Dataset(data_dir=data_dir, tasks=tasks, raw_data=raw_data,
    return DiskDataset(data_dir=data_dir, tasks=tasks, raw_data=raw_data,
                   verbosity=verbosity,
                   compute_feature_statistics=compute_feature_statistics)

@@ -420,7 +565,7 @@ class Dataset(object):
      basename = "dataset-%d" % ind
      tasks = dataset.get_task_names()
      metadata_rows.append(
          Dataset.write_data_to_disk(merge_dir, basename, tasks, X, y, w, ids))
          DiskDataset.write_data_to_disk(merge_dir, basename, tasks, X, y, w, ids))
    return Dataset(data_dir=merge_dir,
                   metadata_rows=metadata_rows,
                   verbosity=dataset.verbosity)
@@ -436,7 +581,7 @@ class Dataset(object):
        continue
      X, y, w, ids = self.get_shard(shard_num)
      basename = "dataset-%d" % shard_num
      metadata_rows.append(Dataset.write_data_to_disk(
      metadata_rows.append(DiskDataset.write_data_to_disk(
          subset_dir, basename, tasks, X, y, w, ids))
    return Dataset(data_dir=subset_dir,
                   metadata_rows=metadata_rows,
@@ -540,14 +685,14 @@ class Dataset(object):
        X_i, y_i, w_i, ids_i = X[:n_i], y[:n_i], w[:n_i], ids[:n_i]
        X_j, y_j, w_j, ids_j = X[n_i:], y[n_i:], w[n_i:], ids[n_i:]

        Dataset.write_data_to_disk(
        DiskDataset.write_data_to_disk(
            self.data_dir, basename_i, tasks, X_i, y_i, w_i, ids_i)
        Dataset.write_data_to_disk(
        DiskDataset.write_data_to_disk(
            self.data_dir, basename_j, tasks, X_j, y_j, w_j, ids_j)
        assert len(self) == len_data
      # Now shuffle order of rows in metadata_df
      random.shuffle(metadata_rows)
      self.metadata_df = Dataset.construct_metadata(metadata_rows)
      self.metadata_df = DiskDataset.construct_metadata(metadata_rows)
      self.save_to_disk()

  def shuffle_each_shard(self):
@@ -564,14 +709,14 @@ class Dataset(object):
      permutation = np.random.permutation(n)
      X, y, w, ids = (X[permutation], y[permutation],
                      w[permutation], ids[permutation])
      Dataset.write_data_to_disk(
      DiskDataset.write_data_to_disk(
          self.data_dir, basename, tasks, X, y, w, ids)

  def shuffle_shards(self):
    """Shuffles the order of the shards for this dataset."""
    metadata_rows = self.metadata_df.values.tolist()
    random.shuffle(metadata_rows)
    self.metadata_df = Dataset.construct_metadata(metadata_rows)
    self.metadata_df = DiskDataset.construct_metadata(metadata_rows)
    self.save_to_disk()

  def get_shard(self, i):
@@ -591,7 +736,7 @@ class Dataset(object):
    """Writes data shard to disk"""
    basename = "shard-%d" % shard_num 
    tasks = self.get_task_names()
    Dataset.write_data_to_disk(self.data_dir, basename, tasks, X, y, w, ids)
    DiskDataset.write_data_to_disk(self.data_dir, basename, tasks, X, y, w, ids)

  def set_verbosity(self, new_verbosity):
    """Sets verbosity."""
@@ -638,7 +783,7 @@ class Dataset(object):
      ids_sel = ids[shard_inds]
      basename = "dataset-%d" % shard_num
      metadata_rows.append(
          Dataset.write_data_to_disk(
          DiskDataset.write_data_to_disk(
              select_dir, basename, tasks,
              X_sel, y_sel, w_sel, ids_sel,
              compute_feature_statistics=compute_feature_statistics))
@@ -672,12 +817,12 @@ class Dataset(object):

        if X_nonzero.size > 0: 
          task_metadata_rows[task].append(
            Dataset.write_data_to_disk(
            DiskDataset.write_data_to_disk(
                task_dirs[task_num], basename, [task],
                X_nonzero, y_nonzero, w_nonzero, ids_nonzero))
    
    task_datasets = [
        Dataset(data_dir=task_dirs[task_num],
        DiskDataset(data_dir=task_dirs[task_num],
                metadata_rows=task_metadata_rows[task],
                verbosity=self.verbosity)
        for (task_num, task) in enumerate(tasks)]
@@ -685,7 +830,7 @@ class Dataset(object):
    
  def to_numpy(self):
    """
    Transforms internal data into arrays X, y, w
    Transforms internal data into arrays X, y, w, ids

    Creates three arrays containing all data in this object. This operation is
    dangerous (!) for large datasets which don't fit into memory.
@@ -700,10 +845,9 @@ class Dataset(object):
    return (np.vstack(Xs), np.vstack(ys), np.vstack(ws),
            np.concatenate(ids))

  def get_ids(self):
    """
    Returns all molecule-ids for this dataset.
    """
  @property
  def ids(self):
    """Get the ids vector for this dataset as a single numpy array."""
    if len(self) == 0:
      return np.array([])
    ids = []
@@ -711,19 +855,17 @@ class Dataset(object):
      ids.append(np.atleast_1d(np.squeeze(ids_b)))
    return np.concatenate(ids)

  def get_labels(self):
    """
    Returns all labels for this dataset.
    """
  @property
  def y(self):
    """Get the y vector for this dataset as a single numpy array."""
    ys = []
    for (_, y_b, _, _) in self.itershards():
      ys.append(y_b)
    return np.vstack(ys)

  def get_weights(self):
    """
    Returns all weights for this dataset.
    """
  @property
  def w(self):
    """Get the weight vector for this dataset as a single numpy array."""
    ws = []
    for (_, _, w_b, _) in self.itershards():
      ws.append(np.array(w_b))
+2 −2
Original line number Diff line number Diff line
@@ -322,8 +322,8 @@ class TestBasicDatasetAPI(TestDatasetAPI):
    """Test that ordering of labels is consistent over time."""
    solubility_dataset = self.load_solubility_data()

    ids1 = solubility_dataset.get_ids()
    ids2 = solubility_dataset.get_ids()
    ids1 = solubility_dataset.ids
    ids2 = solubility_dataset.ids

    assert np.array_equal(ids1, ids2)

+3 −3
Original line number Diff line number Diff line
@@ -17,7 +17,7 @@ from deepchem.models.tests import TestAPI
from deepchem.utils.save import load_from_disk
from deepchem.featurizers.fingerprints import CircularFingerprint
from deepchem.featurizers.featurize import DataLoader
from deepchem.datasets import Dataset
from deepchem.datasets import DiskDataset

## task0: 1,1,0,-,0,-,1,-,-,1

@@ -46,7 +46,7 @@ class TestLoad(TestAPI):
    X, y, w, ids = dataset.to_numpy()
    shutil.move(data_dir, moved_data_dir)

    moved_dataset = Dataset(
    moved_dataset = DiskDataset(
        moved_data_dir, reload=True)

    X_moved, y_moved, w_moved, ids_moved = moved_dataset.to_numpy()
@@ -107,7 +107,7 @@ class TestLoad(TestAPI):
    y_tasks, w_tasks, = [], []
    for ind, task in enumerate(all_tasks):
      print("Processing task %s" % task)
      dataset = Dataset(data_dir, verbosity=verbosity, reload=reload)
      dataset = DiskDataset(data_dir, verbosity=verbosity, reload=reload)

      X_task, y_task, w_task, ids_task = dataset.to_numpy()
      y_tasks.append(y_task[:, ind])
+3 −3
Original line number Diff line number Diff line
@@ -71,13 +71,13 @@ class TestMerge(TestAPI):

    shard_nums = [1, 2]

    orig_ids = dataset.get_ids()
    orig_ids = dataset.ids
    _, _, _, ids_1 = dataset.get_shard(1)
    _, _, _, ids_2 = dataset.get_shard(2)

    subset = dataset.subset(subset_dir, shard_nums)
    after_ids = dataset.get_ids()
    after_ids = dataset.ids

    assert len(subset) == 4
    assert sorted(subset.get_ids()) == sorted(np.concatenate([ids_1, ids_2]))
    assert sorted(subset.ids) == sorted(np.concatenate([ids_1, ids_2]))
    assert list(orig_ids) == list(after_ids)
+3 −3
Original line number Diff line number Diff line
@@ -23,7 +23,7 @@ from deepchem.utils.save import save_to_disk
from deepchem.utils.save import load_pickle_from_disk
from deepchem.featurizers import Featurizer, ComplexFeaturizer
from deepchem.featurizers import UserDefinedFeaturizer
from deepchem.datasets import Dataset
from deepchem.datasets import DiskDataset
from deepchem.utils.save import load_data
from deepchem.utils.save import get_input_type
############################################################## DEBUG
@@ -83,7 +83,7 @@ def featurize_map_function(args):
      loader.verbosity)
  log("About to featurize shard.", loader.verbosity)
  write_fn = partial(
      Dataset.write_dataframe, data_dir=data_dir,
      DiskDataset.write_dataframe, data_dir=data_dir,
      featurizer=loader.featurizer, tasks=loader.tasks,
      mol_id_field=loader.id_field, verbosity=loader.verbosity)
  ############################################################## TIMING
@@ -212,7 +212,7 @@ class DataLoader(object):

    # TODO(rbharath): This whole bit with metadata_rows is an awkward way of
    # creating a Dataset. Is there a more elegant solutions?
    dataset = Dataset(data_dir=data_dir,
    dataset = DiskDataset(data_dir=data_dir,
                      metadata_rows=metadata_rows,
                      reload=True, verbosity=self.verbosity)
    ############################################################## TIMING
Loading