Commit 7c007cf5 authored by Peter Eastman's avatar Peter Eastman
Browse files

Continuing Dataset refactoring

parent f4c493d8
Loading
Loading
Loading
Loading
+134 −192
Original line number Diff line number Diff line
@@ -153,12 +153,51 @@ class Dataset(object):
    raise NotImplementedError()

  def iterbatches(self, batch_size=None, epoch=0, deterministic=False, pad_batches=False):
    """Generator that iterates over minibatches from the dataset.
    """Get an object that iterates over minibatches from the dataset.

    Each minibatch is returned as a tuple of four numpy arrays: (X, y, w, ids).
    """
    raise NotImplementedError()

  def itersamples(self):
    """Get an object that iterates over the samples in the dataset.

    Example:
    >>> for x, y, w, id in dataset.itersamples():
    >>>   print(x, y, w, id)
    """
    raise NotImplementedError()

  def get_statistics(self, X_stats=True, y_stats=True):
    """Compute and return statistics of this dataset."""
    X_means = 0.0
    X_m2 = 0.0
    y_means = 0.0
    y_m2 = 0.0
    n = 0
    for X, y, _, _ in self.itersamples():
        n += 1
        dx = X-X_means
        dy = y-y_means
        X_means += dx/n
        y_means += dy/n
        X_m2 += dx*(X-X_means)
        y_m2 += dy*(y-y_means)
    if n < 2:
        X_stds = 0.0
        y_stds = 0
    else:
        X_stds = np.sqrt(X_m2/n)
        y_stds = np.sqrt(y_m2/n)
    if X_stats and not y_stats:
      return X_means, X_stds
    elif y_stats and not X_stats:
      return y_means, y_stds
    elif X_stats and y_stats:
      return X_means, X_stds, y_means, y_stds
    else:
      return None


class NumpyDataset(Dataset):
  """A Dataset defined by in-memory numpy arrays."""
@@ -219,11 +258,12 @@ class NumpyDataset(Dataset):
    return self._w

  def iterbatches(self, batch_size=None, epoch=0, deterministic=False, pad_batches=False):
    """Generator that iterates over minibatches from the dataset.
    """Get an object that iterates over minibatches from the dataset.

    Each minibatch is returned as a tuple of four numpy arrays: (X, y, w, ids).
    """
    n_samples = self._X.shape[0]
    def iterate(dataset, batch_size, deterministic, pad_batches):
      n_samples = dataset._X.shape[0]
      if not deterministic:
        sample_perm = np.random.permutation(n_samples)
      else:
@@ -235,14 +275,25 @@ class NumpyDataset(Dataset):
      for j in range(len(interval_points)-1):
        indices = range(interval_points[j], interval_points[j+1])
        perm_indices = sample_perm[indices]
      X_batch = self._X[perm_indices, :]
      y_batch = self._y[perm_indices]
      w_batch = self._w[perm_indices]
      ids_batch = self._ids[perm_indices]
        X_batch = dataset._X[perm_indices, :]
        y_batch = dataset._y[perm_indices]
        w_batch = dataset._w[perm_indices]
        ids_batch = dataset._ids[perm_indices]
        if pad_batches:
          (X_batch, y_batch, w_batch, ids_batch) = pad_batch(
            batch_size, X_batch, y_batch, w_batch, ids_batch)
        yield (X_batch, y_batch, w_batch, ids_batch)
    return iterate(self, batch_size, deterministic, pad_batches)

  def itersamples(self):
    """Get an object that iterates over the samples in the dataset.

    Example:
    >>> for x, y, w, id in dataset.itersamples():
    >>>   print(x, y, w, id)
    """
    n_samples = self._X.shape[0]
    return ((self._X[i], self._y[i], self._w[i], self._ids[i]) for i in range(n_samples))


class DiskDataset(Dataset):
@@ -435,33 +486,39 @@ class DiskDataset(Dataset):

  def itershards(self):
    """
    Iterates over all shards in dataset.
    Return an object that iterates over all shards in dataset.

    Datasets are stored in sharded fashion on disk. Each call to next() for the
    generator defined by this function returns the data from a particular shard.
    The order of shards returned is guaranteed to remain fixed.
    """
    for _, row in self.metadata_df.iterrows():
    def iterate(dataset):
      for _, row in dataset.metadata_df.iterrows():
        X = np.array(load_from_disk(
          os.path.join(self.data_dir, row['X-transformed'])))
            os.path.join(dataset.data_dir, row['X-transformed'])))
        y = np.array(load_from_disk(
          os.path.join(self.data_dir, row['y-transformed'])))
            os.path.join(dataset.data_dir, row['y-transformed'])))
        w = np.array(load_from_disk(
          os.path.join(self.data_dir, row['w-transformed'])))
            os.path.join(dataset.data_dir, row['w-transformed'])))
        ids = np.array(load_from_disk(
          os.path.join(self.data_dir, row['ids'])), dtype=object)
            os.path.join(dataset.data_dir, row['ids'])), dtype=object)
        yield (X, y, w, ids)
    return iterate(self)

  def iterbatches(self, batch_size=None, epoch=0, deterministic=False,
                  pad_batches=False):
    """Returns minibatches from dataset randomly."""
    num_shards = self.get_number_shards()
    """Get an object that iterates over minibatches from the dataset.

    Each minibatch is returned as a tuple of four numpy arrays: (X, y, w, ids).
    """
    def iterate(dataset):
      num_shards = dataset.get_number_shards()
      if not deterministic:
        shard_perm = np.random.permutation(num_shards)
      else:
        shard_perm = np.arange(num_shards)
      for i in range(num_shards):
      X, y, w, ids = self.get_shard(shard_perm[i])
        X, y, w, ids = dataset.get_shard(shard_perm[i])
        n_samples = X.shape[0]
        if not deterministic:
          sample_perm = np.random.permutation(n_samples)
@@ -484,6 +541,21 @@ class DiskDataset(Dataset):
            (X_batch, y_batch, w_batch, ids_batch) = pad_batch(
              shard_batch_size, X_batch, y_batch, w_batch, ids_batch)
          yield (X_batch, y_batch, w_batch, ids_batch)
    return iterate(self)

  def itersamples(self):
    """Get an object that iterates over the samples in the dataset.

    Example:
    >>> for x, y, w, id in dataset.itersamples():
    >>>   print(x, y, w, id)
    """
    def iterate(dataset):
        for (X_shard, y_shard, w_shard, ids_shard) in dataset.itershards():
            n_samples = X_shard.shape[0]
            for i in range(n_samples):
                yield (X_shard[i], y_shard[i], w_shard[i], ids_shard[i])
    return iterate(self)

  def reshard(self, shard_size):
    """Reshards data to have specified shard size."""
@@ -787,40 +859,6 @@ class DiskDataset(Dataset):
                   metadata_rows=metadata_rows,
                   verbosity=self.verbosity)

  def to_singletask(self, task_dirs):
    """Transforms multitask dataset in collection of singletask datasets."""
    tasks = self.get_task_names()
    assert len(tasks) == len(task_dirs)
    log("Splitting multitask dataset into singletask datasets", self.verbosity)
    task_metadata_rows = {task: [] for task in tasks}
    for shard_num, (X, y, w, ids) in enumerate(self.itershards()):
      log("Processing shard %d" % shard_num, self.verbosity)
      basename = "dataset-%d" % shard_num
      for task_num, task in enumerate(tasks):
        log("\tTask %s" % task, self.verbosity)
        w_task = w[:, task_num]
        y_task = y[:, task_num]

        # Extract those datapoints which are present for this task
        X_nonzero = X[w_task != 0]
        num_datapoints = X_nonzero.shape[0]
        y_nonzero = np.reshape(y_task[w_task != 0], (num_datapoints, 1))
        w_nonzero = np.reshape(w_task[w_task != 0], (num_datapoints, 1))
        ids_nonzero = ids[w_task != 0]

        if X_nonzero.size > 0: 
          task_metadata_rows[task].append(
            DiskDataset.write_data_to_disk(
                task_dirs[task_num], basename, [task],
                X_nonzero, y_nonzero, w_nonzero, ids_nonzero))
    
    task_datasets = [
        DiskDataset(data_dir=task_dirs[task_num],
                metadata_rows=task_metadata_rows[task],
                verbosity=self.verbosity)
        for (task_num, task) in enumerate(tasks)]
    return task_datasets

  @property
  def ids(self):
    """Get the ids vector for this dataset as a single numpy array."""
@@ -893,102 +931,6 @@ class DiskDataset(Dataset):
    """Return pandas series of label stds."""
    return self.metadata_df["y_stds"]

  def get_statistics(self, X_stats=True, y_stats=True):
    """Computes and returns statistics of this dataset"""
    if len(self) == 0:
      return None, None, None, None
    self.update_moments(X_stats, y_stats)
    df = self.metadata_df
    if X_stats and not y_stats:
      X_means, X_stds = self._compute_mean_and_std(df, X_stats, y_stats)
      return X_means, X_stds
    elif y_stats and not X_stats:
      y_means, y_stds = self._compute_mean_and_std(df, X_stats, y_stats)
      return y_means, y_stds
    elif X_stats and y_stats:
      X_means, X_stds = self._compute_mean_and_std(
          df, X_stats=True, y_stats=False)
      y_means, y_stds = self._compute_mean_and_std(
          df, X_stats=False, y_stats=True)
      return X_means, X_stds, y_means, y_stds
    else:
      return None

  def _compute_mean_and_std(self, df, X_stats, y_stats):
    """
    Compute means/stds of X/y from sums/sum_squares of tensors.
    """

    if X_stats:
      X_sums = []
      X_sum_squares = []
      X_n = []
      for _, row in df.iterrows():
        Xs = load_from_disk(os.path.join(self.data_dir, row['X_sums']))
        Xss = load_from_disk(os.path.join(self.data_dir, row['X_sum_squares']))
        Xn = load_from_disk(os.path.join(self.data_dir, row['X_n']))
        X_sums.append(np.array(Xs))
        X_sum_squares.append(np.array(Xss))
        X_n.append(np.array(Xn))

      # Note that X_n is a list of floats
      n = float(np.sum(X_n))
      X_sums = np.vstack(X_sums)
      X_sum_squares = np.vstack(X_sum_squares)
      overall_X_sums = np.sum(X_sums, axis=0)
      overall_X_means = overall_X_sums / n
      overall_X_sum_squares = np.sum(X_sum_squares, axis=0)

      X_vars = (overall_X_sum_squares - np.square(overall_X_sums)/n)/(n)
      return overall_X_means, np.sqrt(X_vars)

    if y_stats:
      y_sums = []
      y_sum_squares = []
      y_n = []
      for _, row in df.iterrows():
        ys = load_from_disk(os.path.join(self.data_dir, row['y_sums']))
        yss = load_from_disk(os.path.join(self.data_dir, row['y_sum_squares']))
        yn = load_from_disk(os.path.join(self.data_dir, row['y_n']))
        y_sums.append(np.array(ys))
        y_sum_squares.append(np.array(yss))
        y_n.append(np.array(yn))

      # Note y_n is a list of arrays of shape (n_tasks,)
      y_n = np.sum(y_n, axis=0)
      y_sums = np.vstack(y_sums)
      y_sum_squares = np.vstack(y_sum_squares)
      y_means = np.sum(y_sums, axis=0)/y_n
      y_vars = np.sum(y_sum_squares, axis=0)/y_n - np.square(y_means)
      return y_means, np.sqrt(y_vars)
  
  def update_moments(self, X_stats, y_stats):
    """Re-compute statistics of this dataset during transformation"""
    df = self.metadata_df
    self._update_mean_and_std(df, X_stats, y_stats)

  def _update_mean_and_std(self, df, X_stats, y_stats):
    """
    Compute means/stds of X/y from sums/sum_squares of tensors.
    """
    if X_stats:
      X_transform = []
      for _, row in df.iterrows():
        Xt = load_from_disk(os.path.join(self.data_dir, row['X-transformed']))
        Xs = np.sum(Xt,axis=0)
        Xss = np.sum(np.square(Xt),axis=0)
        save_to_disk(Xs, os.path.join(self.data_dir, row['X_sums']))
        save_to_disk(Xss, os.path.join(self.data_dir, row['X_sum_squares']))

    if y_stats:
      y_transform = []
      for _, row in df.iterrows():
        yt = load_from_disk(os.path.join(self.data_dir, row['y-transformed']))
        ys = np.sum(yt,axis=0)
        yss = np.sum(np.square(yt),axis=0)
        save_to_disk(ys, os.path.join(self.data_dir, row['y_sums']))
        save_to_disk(yss, os.path.join(self.data_dir, row['y_sum_squares']))

  def get_grad_statistics(self):
    """Computes and returns statistics of this dataset

+30 −32
Original line number Diff line number Diff line
@@ -260,38 +260,6 @@ class TestBasicDatasetAPI(TestDatasetAPI):
    assert w_shape == w.shape
    assert ids_shape == ids.shape
  

  def test_to_singletask(self):
    """Test that to_singletask works."""
    num_datapoints = 100
    num_features = 10
    num_tasks = 10
    # Generate data
    X = np.random.rand(num_datapoints, num_features)
    y = np.random.randint(2, size=(num_datapoints, num_tasks))
    w = np.random.randint(2, size=(num_datapoints, num_tasks))
    ids = np.array(["id"] * num_datapoints)
    
    dataset = NumpyDataset(X, y, w, ids)

    task_dirs = []
    try:
      for task in range(num_tasks):
        task_dirs.append(tempfile.mkdtemp())
      singletask_datasets = dataset.to_singletask(task_dirs)
      for task in range(num_tasks):
        singletask_dataset = singletask_datasets[task]
        X_task, y_task, w_task, ids_task = (singletask_dataset.X, singletask_dataset.y, singletask_dataset.w, singletask_dataset.ids)
        w_nonzero = w[:, task] != 0
        np.testing.assert_array_equal(X_task, X[w_nonzero != 0])
        np.testing.assert_array_equal(y_task.flatten(), y[:, task][w_nonzero != 0])
        np.testing.assert_array_equal(w_task.flatten(), w[:, task][w_nonzero != 0])
        np.testing.assert_array_equal(ids_task, ids[w_nonzero != 0])
    finally:
      # Cleanup
      for task_dir in task_dirs:
        shutil.rmtree(task_dir)
  
  def test_iterbatches(self):
    """Test that iterating over batches of data works."""
    solubility_dataset = self.load_solubility_data()
@@ -304,6 +272,36 @@ class TestBasicDatasetAPI(TestDatasetAPI):
      assert w_b.shape == (batch_size,) + (len(tasks),)
      assert ids_b.shape == (batch_size,)

  def test_itersamples_numpy(self):
    """Test that iterating over samples in a NumpyDataset works."""
    num_datapoints = 100
    num_features = 10
    num_tasks = 10
    # Generate data
    X = np.random.rand(num_datapoints, num_features)
    y = np.random.randint(2, size=(num_datapoints, num_tasks))
    w = np.random.randint(2, size=(num_datapoints, num_tasks))
    ids = np.array(["id"] * num_datapoints)
    dataset = NumpyDataset(X, y, w, ids)
    for i, (sx, sy, sw, sid) in enumerate(dataset.itersamples()):
        np.testing.assert_array_equal(sx, X[i])
        np.testing.assert_array_equal(sy, y[i])
        np.testing.assert_array_equal(sw, w[i])
        np.testing.assert_array_equal(sid, ids[i])

  def test_itersamples_dist(self):
    """Test that iterating over samples in a DiskDataset works."""
    solubility_dataset = self.load_solubility_data()
    X = solubility_dataset.X
    y = solubility_dataset.y
    w = solubility_dataset.w
    ids = solubility_dataset.ids
    for i, (sx, sy, sw, sid) in enumerate(solubility_dataset.itersamples()):
        np.testing.assert_array_equal(sx, X[i])
        np.testing.assert_array_equal(sy, y[i])
        np.testing.assert_array_equal(sw, w[i])
        np.testing.assert_array_equal(sid, ids[i])

  def test_to_numpy(self):
    """Test that transformation to numpy arrays is sensible."""
    solubility_dataset = self.load_solubility_data()
+1 −1
Original line number Diff line number Diff line
@@ -12,7 +12,7 @@ import time
from collections import deque
import hashlib
import sys
#import openbabel as ob
import openbabel as ob
from functools import partial
from deepchem.featurizers import ComplexFeaturizer
from deepchem.utils.save import log
+1 −1
Original line number Diff line number Diff line
@@ -11,7 +11,7 @@ __license__ = "GNU General Public License"
import math
import os
import subprocess
#import openbabel
import openbabel
import numpy as np


+8 −3
Original line number Diff line number Diff line
@@ -24,7 +24,7 @@ from deepchem.metrics import Metric
from deepchem.models.multitask import SingletaskToMultitask 
from sklearn.ensemble import RandomForestClassifier
from sklearn.ensemble import RandomForestRegressor
from deepchem.datasets import NumpyDataset
from deepchem.datasets import DiskDataset, NumpyDataset
from deepchem.hyperparameters import HyperparamOpt
from deepchem.models.keras_models.fcnet import MultiTaskDNN
from deepchem.models.keras_models import KerasModel
@@ -85,7 +85,10 @@ class TestHyperparamOptAPI(TestAPI):
    y_train = np.random.randint(2, size=(n_train, n_tasks))
    w_train = np.ones_like(y_train)
    ids_train = ["C"] * n_train
    train_dataset = NumpyDataset(X_train, y_train, w_train, ids_train)

    train_dataset = DiskDataset.from_numpy(self.train_dir,
                                           X_train, y_train, w_train, ids_train,
                                           tasks) 

    # Define validation dataset
    n_valid = 10
@@ -93,7 +96,9 @@ class TestHyperparamOptAPI(TestAPI):
    y_valid = np.random.randint(2, size=(n_valid, n_tasks))
    w_valid = np.ones_like(y_valid)
    ids_valid = ["C"] * n_valid
    valid_dataset = NumpyDataset(X_valid, y_valid, w_valid, ids_valid)
    valid_dataset = DiskDataset.from_numpy(self.valid_dir,
                                           X_valid, y_valid, w_valid, ids_valid,
                                           tasks)

    transformers = []
    classification_metric = Metric(metrics.matthews_corrcoef, np.mean,
Loading