Commit e831142b authored by Bharath Ramsundar's avatar Bharath Ramsundar
Browse files

First commit of shape metadata

parent c110c8fb
Loading
Loading
Loading
Loading
+253 −58
Original line number Diff line number Diff line
@@ -16,8 +16,9 @@ import shutil
import json
import warnings
import multiprocessing
from deepchem.utils.save import save_to_disk, save_metadata
from deepchem.utils.save import save_to_disk
from deepchem.utils.save import load_from_disk
from ast import literal_eval as make_tuple

from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, Sequence, Tuple, Union
from deepchem.utils.typing import OneOrMany, Shape
@@ -326,15 +327,16 @@ class Dataset(object):
    threshold = dc.utils.get_print_threshold()
    task_str = np.array2string(
        np.array(self.get_task_names()), threshold=threshold)
    X_shape, y_shape, w_shape, _ = self.get_shape()
    if self.__len__() < dc.utils.get_max_print_size():
      id_str = np.array2string(self.ids, threshold=threshold)
      return "<%s X.shape: %s, y.shape: %s, w.shape: %s, ids: %s, task_names: %s>" % (
          self.__class__.__name__, str(self.X.shape), str(self.y.shape),
          str(self.w.shape), id_str, task_str)
          self.__class__.__name__, str(X_shape), str(y_shape), str(w_shape),
          id_str, task_str)
    else:
      return "<%s X.shape: %s, y.shape: %s, w.shape: %s, task_names: %s>" % (
          self.__class__.__name__, str(self.X.shape), str(self.y.shape),
          str(self.w.shape), task_str)
          self.__class__.__name__, str(X_shape), str(y_shape), str(w_shape),
          task_str)

  def __str__(self) -> str:
    """Convert self to str representation."""
@@ -647,8 +649,10 @@ class NumpyDataset(Dataset):

  This subclass of `Dataset` stores arrays `X,y,w,ids` in memory as
  numpy arrays. This makes it very easy to construct `NumpyDataset`
  objects. For example
  objects.

  Examples
  --------
  >>> import numpy as np
  >>> dataset = NumpyDataset(X=np.random.rand(5, 3), y=np.random.rand(5,), ids=np.arange(5))
  """
@@ -957,16 +961,106 @@ class NumpyDataset(Dataset):
class DiskDataset(Dataset):
  """
  A Dataset that is stored as a set of files on disk.
  """

  def __init__(self, data_dir: str) -> None:
  The DiskDataset is the workhorse class of DeepChem that facilitates analyses
  on large datasets. Use this class whenever you're working with a large
  dataset that can't be easily manipulated in RAM.

  On disk, a `DiskDataset` has a simple structure. All files for a given
  `DiskDataset` are stored in a `data_dir`. The contents of `data_dir` should
  be laid out as follows:

  data_dir/
    |
    ---> metadata.csv.gzip
    |
    ---> tasks.json
    |
    ---> shard-0-X.npy
    |
    ---> shard-0-y.npy
    |
    ---> shard-0-w.npy
    |
    ---> shard-0-ids.npy
    |
    ---> shard-1-X.npy
    .
    .
    .

  The metadata is constructed by static method
  `DiskDataset._construct_metadata` and saved to disk by
  `DiskDataset._save_metadata`. The metadata itself consists of a csv file
  which has columns `('ids', 'X', 'y', 'w', 'ids_shape', 'X_shape', 'y_shape',
  'w_shape')`. `tasks.json` consists of a list of task names for this dataset.

  The actual data is stored in `.npy` files (numpy array files) of the form
  'shard-0-X.npy', 'shard-0-y.npy', etc.

  The basic structure of `DiskDataset` is quite robust and will likely serve
  you will for datasets up to about 100 GB or larger. However note that
  `DiskDataset` has note been tested for very large datasets at the terabyte
  range and beyond. You may be better served by implementing a custom
  `Dataset` class for those use cases.

  Examples
  --------
  Let's walk through a simple example of constructing a new `DiskDataset`.

  >>> import deepchem as dc
  >>> import numpy as np
  >>> X = np.random.rand(10, 10)
  >>> dataset = dc.data.DiskDataset.from_numpy(X)

  If you have already saved a `DiskDataset` to `data_dir`, you can reinitialize it with

  >> data_dir = "/path/to/my/data"
  >> dataset = dc.data.DiskDataset(data_dir)

  Attributes
  ----------
  data_dir: str
    Location of directory where this `DiskDataset` is stored to disk
  metadata_df: pd.DataFrame
    Pandas Dataframe holding metadata for this `DiskDataset`
  legacy_metadata: bool
    Whether this `DiskDataset` uses legacy format.

  Note
  ----
  `DiskDataset` originally had a simpler metadata format without shape
  information. Older `DiskDataset` objects had metadata files with columns
  `('ids', 'X', 'y', 'w') and not additional shape columns. `DiskDataset`
  maintains backwards compatibility with this older metadata format, but we
  recommend for performance reasons not using legacy metadata for new
  projects.
  """
    Turns featurized dataframes into numpy files, writes them & metadata to disk.

  def __init__(self, data_dir: str, legacy_metadata: bool = False) -> None:
    """Load a constructed DiskDataset from disk

    Note that this method cannot construct a new disk dataset. Instead use
    static methods `DiskDataset.create_dataset` or `DiskDataset.from_numpy`
    for that purpose. Use this constructor instead to load a `DiskDataset`
    that has already been created on disk.

    Parameters
    ----------
    data_dir: str
      Location on disk of an existing `DiskDataset`.
    legacy_metadata: bool, optional (default False)
      If `True` use the legacy format for metadata without shape information
      in metadata.
    """
    self.data_dir = data_dir
    self.legacy_metadata = legacy_metadata

    logger.info("Loading dataset from disk.")
    self.tasks, self.metadata_df = self.load_metadata()
    if len(self.metadata_df.columns) == 4:
      logger.info("Detected legacy metatadata on disk.")
      self.legacy_metadata = True
    self._cached_shards: Optional[List] = None
    self._memory_cache_size = 20 * (1 << 20)  # 20 MB
    self._cache_used = 0
@@ -974,7 +1068,8 @@ class DiskDataset(Dataset):
  @staticmethod
  def create_dataset(shard_generator: Iterable[Batch],
                     data_dir: Optional[str] = None,
                     tasks: Optional[Sequence] = []) -> "DiskDataset":
                     tasks: Optional[Sequence] = [],
                     legacy_metadata: bool = False) -> "DiskDataset":
    """Creates a new DiskDataset

    Parameters
@@ -986,6 +1081,9 @@ class DiskDataset(Dataset):
      Filename for data directory. Creates a temp directory if none specified.
    tasks: list
      List of tasks for this dataset.
    legacy_metadata: bool, optional (default False)
      If `True` use the legacy format for metadata without shape information
      in metadata.

    Returns
    -------
@@ -1002,14 +1100,16 @@ class DiskDataset(Dataset):
      basename = "shard-%d" % shard_num
      metadata_rows.append(
          DiskDataset.write_data_to_disk(data_dir, basename, tasks, X, y, w,
                                         ids))
    metadata_df = DiskDataset._construct_metadata(metadata_rows)
    save_metadata(tasks, metadata_df, data_dir)
                                         ids, legacy_metadata))
    metadata_df = DiskDataset._construct_metadata(metadata_rows,
                                                  legacy_metadata)
    DiskDataset._save_metadata(tasks, metadata_df, data_dir)
    time2 = time.time()
    logger.info("TIMING: dataset construction took %0.3f s" % (time2 - time1))
    return DiskDataset(data_dir)
    return DiskDataset(data_dir, legacy_metadata)

  def load_metadata(self):
    """Helper method that loads metadata from disk."""
    try:
      tasks_filename, metadata_filename = self._get_metadata_filename()
      with open(tasks_filename) as fin:
@@ -1026,30 +1126,62 @@ class DiskDataset(Dataset):
      tasks, metadata_df = load_from_disk(metadata_filename)
      del metadata_df['task_names']
      del metadata_df['basename']
      save_metadata(tasks, metadata_df, self.data_dir)
      DiskDataset._save_metadata(tasks, metadata_df, self.data_dir)
      return tasks, metadata_df
    raise ValueError("No Metadata Found On Disk")

  @staticmethod
  def _construct_metadata(metadata_entries: List) -> pd.DataFrame:
  def _save_metadata(tasks, metadata_df, data_dir):
    """Saves the metadata for a DiskDataset

    Parameters
    ----------
    tasks: list of str
      Tasks of DiskDataset
    metadata_df: pd.DataFrame
      The dataframe which will be written to disk.
    data_dir: str
      Directory to store metadata
    """
    if isinstance(tasks, np.ndarray):
      tasks = tasks.tolist()
    metadata_filename = os.path.join(data_dir, "metadata.csv.gzip")
    tasks_filename = os.path.join(data_dir, "tasks.json")
    with open(tasks_filename, 'w') as fout:
      json.dump(tasks, fout)
    metadata_df.to_csv(metadata_filename, index=False, compression='gzip')

  @staticmethod
  def _construct_metadata(metadata_entries: List,
                          legacy_metadata: bool = False) -> pd.DataFrame:
    """Construct a dataframe containing metadata.

    Parameters
    ----------
    metadata_entries: list
      metadata_entries should have elements returned by write_data_to_disk
      above.
    legacy_metadata: bool, optional (default False)
      If `True` use the legacy format for metadata without shape information
      in metadata.
    """
    if not legacy_metadata:
      columns = ('ids', 'X', 'y', 'w', 'ids_shape', 'X_shape', 'y_shape',
                 'w_shape')
    else:
      columns = ('ids', 'X', 'y', 'w')
    metadata_df = pd.DataFrame(metadata_entries, columns=columns)
    return metadata_df

  @staticmethod
  def write_data_to_disk(
      data_dir: str,
  def write_data_to_disk(data_dir: str,
                         basename: str,
                         tasks: np.ndarray,
                         X: Optional[np.ndarray] = None,
                         y: Optional[np.ndarray] = None,
                         w: Optional[np.ndarray] = None,
      ids: Optional[np.ndarray] = None) -> List[Optional[str]]:
                         ids: Optional[np.ndarray] = None,
                         legacy_metadata: bool = False) -> List[Optional[str]]:
    """Static helper method to write data to disk.

    This helper method is used to write a shard of data to disk.
@@ -1070,10 +1202,17 @@ class DiskDataset(Dataset):
      The weights array 
    ids: Optional[np.ndarray]
      The identifiers array 
    legacy_metadata: bool, optional (default False)
      If `True` use the legacy format for metadata without shape information
      in metadata.

    Returns
    -------
    List with values `[out_ids, out_X, out_y, out_w, out_ids_shape, out_X_shape, out_y_shape, out_w_shape]` with filenames of locations to disk which these respective arrays were written.
    List with values `[out_ids, out_X, out_y, out_w, out_ids_shape,
    out_X_shape, out_y_shape, out_w_shape]` with filenames of locations to
    disk which these respective arrays were written. If `legacy_metadata` is
    set will return a list with values `[out_ids, out_X, out_y, out_w]`
    without shape information.
    """
    if X is not None:
      out_X: Optional[str] = "%s-X.npy" % basename
@@ -1108,14 +1247,17 @@ class DiskDataset(Dataset):
      out_ids_shape = None

    # note that this corresponds to the _construct_metadata column order
    if not legacy_metadata:
      return [
          out_ids, out_X, out_y, out_w, out_ids_shape, out_X_shape, out_y_shape,
          out_w_shape
      ]
    else:
      return [out_ids, out_X, out_y, out_w]

  def save_to_disk(self) -> None:
    """Save dataset to disk."""
    save_metadata(self.tasks, self.metadata_df, self.data_dir)
    DiskDataset._save_metadata(self.tasks, self.metadata_df, self.data_dir)
    self._cached_shards = None

  def move(self, new_data_dir: str) -> None:
@@ -1132,7 +1274,13 @@ class DiskDataset(Dataset):
    return self.tasks

  def reshard(self, shard_size: int) -> None:
    """Reshards data to have specified shard size."""
    """Reshards data to have specified shard size.

    Note
    ----
    If this `DiskDataset` is in `legacy_metadata` format, reshard will
    maintain legacy metadata format.
    """
    # Create temp directory to store resharded version
    reshard_dir = tempfile.mkdtemp()

@@ -1161,7 +1309,10 @@ class DiskDataset(Dataset):
      yield (X_next, y_next, w_next, ids_next)

    resharded_dataset = DiskDataset.create_dataset(
        generator(), data_dir=reshard_dir, tasks=self.tasks)
        generator(),
        data_dir=reshard_dir,
        tasks=self.tasks,
        legacy_metadata=self.legacy_metadata)
    shutil.rmtree(self.data_dir)
    shutil.move(reshard_dir, self.data_dir)
    self.metadata_df = resharded_dataset.metadata_df
@@ -1436,7 +1587,7 @@ class DiskDataset(Dataset):
      pool.close()
      metadata_rows = [r.get() for r in results]
      metadata_df = DiskDataset._construct_metadata(metadata_rows)
      save_metadata(tasks, metadata_df, out_dir)
      DiskDataset._save_metadata(tasks, metadata_df, out_dir)
      dataset = DiskDataset(out_dir)
    else:

@@ -1514,7 +1665,8 @@ class DiskDataset(Dataset):
                 w: Optional[np.ndarray] = None,
                 ids: Optional[np.ndarray] = None,
                 tasks: Optional[Sequence] = None,
                 data_dir: Optional[str] = None) -> "DiskDataset":
                 data_dir: Optional[str] = None,
                 legacy_metadata: bool = False) -> "DiskDataset":
    """Creates a DiskDataset object from specified Numpy arrays.

    Parameters
@@ -1532,6 +1684,9 @@ class DiskDataset(Dataset):
    data_dir: Optional[str], optional (default None)
      The directory to write this dataset to. If none is specified, will use
      a temporary dataset instead.
    legacy_metadata: bool, optional (default False)
      If `True` use the legacy format for metadata without shape information
      in metadata.

    Returns
    -------
@@ -1568,7 +1723,10 @@ class DiskDataset(Dataset):

    # raw_data = (X, y, w, ids)
    return DiskDataset.create_dataset(
        [(X, y, w, ids)], data_dir=data_dir, tasks=tasks)
        [(X, y, w, ids)],
        data_dir=data_dir,
        tasks=tasks,
        legacy_metadata=legacy_metadata)

  @staticmethod
  def merge(datasets: Iterable["DiskDataset"],
@@ -2019,25 +2177,62 @@ class DiskDataset(Dataset):
    """Finds shape of dataset."""
    n_tasks = len(self.get_task_names())
    n_rows = len(self.metadata_df.index)
    for i in range(n_rows):
      row = self.metadata_df.iloc[i]
    #for shard_num, (X, y, w, ids) in enumerate(self.itershards()):
    #  if shard_num == 0:
    #    X_shape = np.array(X.shape)
    #    if n_tasks > 0:
    #      y_shape = np.array(y.shape)
    #      w_shape = np.array(w.shape)
    #    else:
    #      y_shape = tuple()
    #      w_shape = tuple()
    #    ids_shape = np.array(ids.shape)
    #  else:
    #    X_shape[0] += np.array(X.shape)[0]
    #    if n_tasks > 0:
    #      y_shape[0] += np.array(y.shape)[0]
    #      w_shape[0] += np.array(w.shape)[0]
    #    ids_shape[0] += np.array(ids.shape)[0]
    #return tuple(X_shape), tuple(y_shape), tuple(w_shape), tuple(ids_shape)
    # If shape metadata is available use it to directly compute shape from
    # metadata
    if not self.legacy_metadata:
      for shard_num in range(n_rows):
        row = self.metadata_df.iloc[shard_num]
        if row['X_shape'] is not None:
          shard_X_shape = make_tuple(row['X_shape'])
        else:
          shard_X_shape = tuple()
        if n_tasks > 0:
          if row['y_shape'] is not None:
            shard_y_shape = make_tuple(row['y_shape'])
          else:
            shard_y_shape = tuple()
          if row['w_shape'] is not None:
            shard_w_shape = make_tuple(row['w_shape'])
          else:
            shard_w_shape = tuple()
        else:
          shard_y_shape = tuple()
          shard_w_shape = tuple()
        if row['ids_shape'] is not None:
          shard_ids_shape = make_tuple(row['ids_shape'])
        else:
          shard_ids_shape = tuple()
        if shard_num == 0:
          X_shape, y_shape, w_shape, ids_shape = np.array(
              shard_X_shape), np.array(shard_y_shape), np.array(
                  shard_w_shape), np.array(shard_ids_shape)
        else:
          X_shape[0] += shard_X_shape[0]
          if n_tasks > 0:
            y_shape[0] += shard_y_shape[0]
            w_shape[0] += shard_w_shape[0]
          ids_shape[0] += shard_ids_shape[0]
      return tuple(X_shape), tuple(y_shape), tuple(w_shape), tuple(ids_shape)
    # In absense of shape metadata, fall back to loading data from disk to
    # find shape.
    else:
      for shard_num, (X, y, w, ids) in enumerate(self.itershards()):
        if shard_num == 0:
          X_shape = np.array(X.shape)
          if n_tasks > 0:
            y_shape = np.array(y.shape)
            w_shape = np.array(w.shape)
          else:
            y_shape = tuple()
            w_shape = tuple()
          ids_shape = np.array(ids.shape)
        else:
          X_shape[0] += np.array(X.shape)[0]
          if n_tasks > 0:
            y_shape[0] += np.array(y.shape)[0]
            w_shape[0] += np.array(w.shape)[0]
          ids_shape[0] += np.array(ids.shape)[0]
      return tuple(X_shape), tuple(y_shape), tuple(w_shape), tuple(ids_shape)

  def get_label_means(self) -> pd.DataFrame:
    """Return pandas series of label means."""
+0 −20
Original line number Diff line number Diff line
@@ -343,26 +343,6 @@ def test_complete_shuffle():
  np.testing.assert_array_equal(np.sort(dataset.ids), np.sort(res.ids))


def test_get_shape():
  """Test that get_shape works."""
  num_datapoints = 100
  num_features = 10
  num_tasks = 10
  # Generate data
  X = np.random.rand(num_datapoints, num_features)
  y = np.random.randint(2, size=(num_datapoints, num_tasks))
  w = np.random.randint(2, size=(num_datapoints, num_tasks))
  ids = np.array(["id"] * num_datapoints)

  dataset = dc.data.NumpyDataset(X, y, w, ids)

  X_shape, y_shape, w_shape, ids_shape = dataset.get_shape()
  assert X_shape == X.shape
  assert y_shape == y.shape
  assert w_shape == w.shape
  assert ids_shape == ids.shape


def test_iterbatches():
  """Test that iterating over batches of data works."""
  solubility_dataset = load_solubility_data()
+25 −0
Original line number Diff line number Diff line
import deepchem as dc
import numpy as np


def test_make_legacy_dataset_from_numpy():
  """Test that legacy DiskDataset objects can be constructed."""
  num_datapoints = 100
  num_features = 10
  num_tasks = 10
  # Generate data
  X = np.random.rand(num_datapoints, num_features)
  y = np.random.randint(2, size=(num_datapoints, num_tasks))
  w = np.random.randint(2, size=(num_datapoints, num_tasks))
  ids = np.array(["id"] * num_datapoints)

  dataset = dc.data.DiskDataset.from_numpy(X, y, w, ids, legacy_metadata=True)
  assert dataset.legacy_metadata
  assert len(dataset.metadata_df.columns) == 4
  assert list(dataset.metadata_df.columns) == ['ids', 'X', 'y', 'w']

  # Test constructor reload works for legacy format
  dataset2 = dc.data.DiskDataset(dataset.data_dir)
  assert dataset2.legacy_metadata
  assert len(dataset2.metadata_df.columns) == 4
  assert list(dataset2.metadata_df.columns) == ['ids', 'X', 'y', 'w']
+106 −0
Original line number Diff line number Diff line
import deepchem as dc
import numpy as np


def test_numpy_dataset_get_shape():
  """Test that get_shape works for numpy datasets."""
  num_datapoints = 100
  num_features = 10
  num_tasks = 10
  # Generate data
  X = np.random.rand(num_datapoints, num_features)
  y = np.random.randint(2, size=(num_datapoints, num_tasks))
  w = np.random.randint(2, size=(num_datapoints, num_tasks))
  ids = np.array(["id"] * num_datapoints)

  dataset = dc.data.NumpyDataset(X, y, w, ids)

  X_shape, y_shape, w_shape, ids_shape = dataset.get_shape()
  assert X_shape == X.shape
  assert y_shape == y.shape
  assert w_shape == w.shape
  assert ids_shape == ids.shape


def test_disk_dataset_get_shape_single_shard():
  """Test that get_shape works for disk dataset."""
  num_datapoints = 100
  num_features = 10
  num_tasks = 10
  # Generate data
  X = np.random.rand(num_datapoints, num_features)
  y = np.random.randint(2, size=(num_datapoints, num_tasks))
  w = np.random.randint(2, size=(num_datapoints, num_tasks))
  ids = np.array(["id"] * num_datapoints)

  dataset = dc.data.DiskDataset.from_numpy(X, y, w, ids)

  X_shape, y_shape, w_shape, ids_shape = dataset.get_shape()
  assert X_shape == X.shape
  assert y_shape == y.shape
  assert w_shape == w.shape
  assert ids_shape == ids.shape


def test_disk_dataset_get_shape_multishard():
  """Test that get_shape works for multisharded disk dataset."""
  num_datapoints = 100
  num_features = 10
  num_tasks = 10
  # Generate data
  X = np.random.rand(num_datapoints, num_features)
  y = np.random.randint(2, size=(num_datapoints, num_tasks))
  w = np.random.randint(2, size=(num_datapoints, num_tasks))
  ids = np.array(["id"] * num_datapoints)

  dataset = dc.data.DiskDataset.from_numpy(X, y, w, ids)
  # Should now have 10 shards
  dataset.reshard(shard_size=10)

  X_shape, y_shape, w_shape, ids_shape = dataset.get_shape()
  assert X_shape == X.shape
  assert y_shape == y.shape
  assert w_shape == w.shape
  assert ids_shape == ids.shape


def test_disk_dataset_get_legacy_shape_single_shard():
  """Test that get_shape works for legacy disk dataset."""
  num_datapoints = 100
  num_features = 10
  num_tasks = 10
  # Generate data
  X = np.random.rand(num_datapoints, num_features)
  y = np.random.randint(2, size=(num_datapoints, num_tasks))
  w = np.random.randint(2, size=(num_datapoints, num_tasks))
  ids = np.array(["id"] * num_datapoints)

  dataset = dc.data.DiskDataset.from_numpy(X, y, w, ids, legacy_metadata=True)

  X_shape, y_shape, w_shape, ids_shape = dataset.get_shape()
  assert X_shape == X.shape
  assert y_shape == y.shape
  assert w_shape == w.shape
  assert ids_shape == ids.shape


def test_disk_dataset_get_legacy_shape_multishard():
  """Test that get_shape works for multisharded legacy disk dataset."""
  num_datapoints = 100
  num_features = 10
  num_tasks = 10
  # Generate data
  X = np.random.rand(num_datapoints, num_features)
  y = np.random.randint(2, size=(num_datapoints, num_tasks))
  w = np.random.randint(2, size=(num_datapoints, num_tasks))
  ids = np.array(["id"] * num_datapoints)

  dataset = dc.data.DiskDataset.from_numpy(X, y, w, ids, legacy_metadata=True)
  # Should now have 10 shards
  dataset.reshard(shard_size=10)

  X_shape, y_shape, w_shape, ids_shape = dataset.get_shape()
  assert X_shape == X.shape
  assert y_shape == y.shape
  assert w_shape == w.shape
  assert ids_shape == ids.shape
+0 −20
Original line number Diff line number Diff line
@@ -291,26 +291,6 @@ def encode_bio_sequence(fname, file_type="fasta", letters="ATCGN"):
  return encode_sequence(fname, file_type=file_type, letters=letters)


def save_metadata(tasks, metadata_df, data_dir):
  """Saves the metadata for a DiskDataset

  Parameters
  ----------
  tasks: list of str
    Tasks of DiskDataset
  metadata_df: pd.DataFrame
  data_dir: str
    Directory to store metadata
  """
  if isinstance(tasks, np.ndarray):
    tasks = tasks.tolist()
  metadata_filename = os.path.join(data_dir, "metadata.csv.gzip")
  tasks_filename = os.path.join(data_dir, "tasks.json")
  with open(tasks_filename, 'w') as fout:
    json.dump(tasks, fout)
  metadata_df.to_csv(metadata_filename, index=False, compression='gzip')


def load_from_disk(filename):
  """Load a dataset from file."""
  name = filename