Unverified Commit a244ab73 authored by Daiki Nishikawa's avatar Daiki Nishikawa Committed by GitHub
Browse files

Merge branch 'master' into fix-windows-ci

parents 71ccdb3a 4e9cfbe4
Loading
Loading
Loading
Loading
+340 −80
Original line number Diff line number Diff line
@@ -14,8 +14,9 @@ import time
import shutil
import warnings
import multiprocessing
from deepchem.utils.save import save_to_disk, save_metadata
from deepchem.utils.save import save_to_disk
from deepchem.utils.save import load_from_disk
from ast import literal_eval as make_tuple

from typing import Any, Dict, Iterable, Iterator, List, Optional, Sequence, Tuple, Union
from deepchem.utils.typing import OneOrMany, Shape
@@ -324,15 +325,16 @@ class Dataset(object):
    threshold = dc.utils.get_print_threshold()
    task_str = np.array2string(
        np.array(self.get_task_names()), threshold=threshold)
    X_shape, y_shape, w_shape, _ = self.get_shape()
    if self.__len__() < dc.utils.get_max_print_size():
      id_str = np.array2string(self.ids, threshold=threshold)
      return "<%s X.shape: %s, y.shape: %s, w.shape: %s, ids: %s, task_names: %s>" % (
          self.__class__.__name__, str(self.X.shape), str(self.y.shape),
          str(self.w.shape), id_str, task_str)
          self.__class__.__name__, str(X_shape), str(y_shape), str(w_shape),
          id_str, task_str)
    else:
      return "<%s X.shape: %s, y.shape: %s, w.shape: %s, task_names: %s>" % (
          self.__class__.__name__, str(self.X.shape), str(self.y.shape),
          str(self.w.shape), task_str)
          self.__class__.__name__, str(X_shape), str(y_shape), str(w_shape),
          task_str)

  def __str__(self) -> str:
    """Convert self to str representation."""
@@ -646,8 +648,10 @@ class NumpyDataset(Dataset):

  This subclass of `Dataset` stores arrays `X,y,w,ids` in memory as
  numpy arrays. This makes it very easy to construct `NumpyDataset`
  objects. For example
  objects.

  Examples
  --------
  >>> import numpy as np
  >>> dataset = NumpyDataset(X=np.random.rand(5, 3), y=np.random.rand(5,), ids=np.arange(5))
  """
@@ -952,16 +956,129 @@ class _Shard(object):
class DiskDataset(Dataset):
  """
  A Dataset that is stored as a set of files on disk.

  The DiskDataset is the workhorse class of DeepChem that facilitates analyses
  on large datasets. Use this class whenever you're working with a large
  dataset that can't be easily manipulated in RAM.

  On disk, a `DiskDataset` has a simple structure. All files for a given
  `DiskDataset` are stored in a `data_dir`. The contents of `data_dir` should
  be laid out as follows:

  data_dir/
    |
    ---> metadata.csv.gzip
    |
    ---> tasks.json
    |
    ---> shard-0-X.npy
    |
    ---> shard-0-y.npy
    |
    ---> shard-0-w.npy
    |
    ---> shard-0-ids.npy
    |
    ---> shard-1-X.npy
    .
    .
    .

  The metadata is constructed by static method
  `DiskDataset._construct_metadata` and saved to disk by
  `DiskDataset._save_metadata`. The metadata itself consists of a csv file
  which has columns `('ids', 'X', 'y', 'w', 'ids_shape', 'X_shape', 'y_shape',
  'w_shape')`. `tasks.json` consists of a list of task names for this dataset.

  The actual data is stored in `.npy` files (numpy array files) of the form
  'shard-0-X.npy', 'shard-0-y.npy', etc.

  The basic structure of `DiskDataset` is quite robust and will likely serve
  you well for datasets up to about 100 GB or larger. However note that
  `DiskDataset` has not been tested for very large datasets at the terabyte
  range and beyond. You may be better served by implementing a custom
  `Dataset` class for those use cases.

  Examples
  --------
  Let's walk through a simple example of constructing a new `DiskDataset`.

  >>> import deepchem as dc
  >>> import numpy as np
  >>> X = np.random.rand(10, 10)
  >>> dataset = dc.data.DiskDataset.from_numpy(X)

  If you have already saved a `DiskDataset` to `data_dir`, you can reinitialize it with

  >> data_dir = "/path/to/my/data"
  >> dataset = dc.data.DiskDataset(data_dir)

  Once you have a dataset you can access its attributes as follows

  >>> X = np.random.rand(10, 10)
  >>> y = np.random.rand(10,) 
  >>> w = np.ones_like(y)
  >>> dataset = dc.data.DiskDataset.from_numpy(X)
  >>> X, y, w = dataset.X, dataset.y, dataset.w

  One thing to beware of is that `dataset.X`, `dataset.y`, `dataset.w` are
  loading data from disk! If you have a large dataset, these operations can be
  extremely slow. Instead try iterating through the dataset instead.

  >>> for (xi, yi, wi, idi) in dataset.itersamples():
  ...   pass

  Attributes
  ----------
  data_dir: str
    Location of directory where this `DiskDataset` is stored to disk
  metadata_df: pd.DataFrame
    Pandas Dataframe holding metadata for this `DiskDataset`
  legacy_metadata: bool
    Whether this `DiskDataset` uses legacy format.

  Note
  ----
  `DiskDataset` originally had a simpler metadata format without shape
  information. Older `DiskDataset` objects had metadata files with columns
  `('ids', 'X', 'y', 'w') and not additional shape columns. `DiskDataset`
  maintains backwards compatibility with this older metadata format, but we
  recommend for performance reasons not using legacy metadata for new
  projects.
  """

  def __init__(self, data_dir: str) -> None:
    """
    Turns featurized dataframes into numpy files, writes them & metadata to disk.
    """Load a constructed DiskDataset from disk

    Note that this method cannot construct a new disk dataset. Instead use
    static methods `DiskDataset.create_dataset` or `DiskDataset.from_numpy`
    for that purpose. Use this constructor instead to load a `DiskDataset`
    that has already been created on disk.

    Parameters
    ----------
    data_dir: str
      Location on disk of an existing `DiskDataset`.
    """
    self.data_dir = data_dir

    logger.info("Loading dataset from disk.")
    self.tasks, self.metadata_df = self.load_metadata()
    if len(self.metadata_df.columns) == 4 and list(
        self.metadata_df.columns) == ['ids', 'X', 'y', 'w']:
      logger.info(
          "Detected legacy metatadata on disk. You can upgrade from legacy metadata to the more efficient current metadata by resharding this dataset by calling the reshard() method of this object.."
      )
      self.legacy_metadata = True
    elif len(self.metadata_df.columns) == 8 and list(
        self.metadata_df.columns) == [
            'ids', 'X', 'y', 'w', 'ids_shape', 'X_shape', 'y_shape', 'w_shape'
        ]:
      self.legacy_metadata = False
    else:
      raise ValueError(
          "Malformed metadata on disk. Metadata must have columns 'ids', 'X', 'y', 'w', 'ids_shape', 'X_shape', 'y_shape', 'w_shape' (or if in legacy metadata format, columns 'ids', 'X', 'y', 'w')"
      )
    self._cached_shards: Optional[List] = None
    self._memory_cache_size = 20 * (1 << 20)  # 20 MB
    self._cache_used = 0
@@ -979,7 +1096,7 @@ class DiskDataset(Dataset):
      (X, y, w, ids). Each tuple will be written to a separate shard on disk.
    data_dir: str
      Filename for data directory. Creates a temp directory if none specified.
    tasks: list
    tasks: Optional[sequence] 
      List of tasks for this dataset.

    Returns
@@ -999,12 +1116,13 @@ class DiskDataset(Dataset):
          DiskDataset.write_data_to_disk(data_dir, basename, tasks, X, y, w,
                                         ids))
    metadata_df = DiskDataset._construct_metadata(metadata_rows)
    save_metadata(tasks, metadata_df, data_dir)
    DiskDataset._save_metadata(metadata_df, data_dir, tasks)
    time2 = time.time()
    logger.info("TIMING: dataset construction took %0.3f s" % (time2 - time1))
    return DiskDataset(data_dir)

  def load_metadata(self):
  def load_metadata(self) -> Tuple[List[str], pd.DataFrame]:
    """Helper method that loads metadata from disk."""
    try:
      tasks_filename, metadata_filename = self._get_metadata_filename()
      with open(tasks_filename) as fin:
@@ -1021,18 +1139,47 @@ class DiskDataset(Dataset):
      tasks, metadata_df = load_from_disk(metadata_filename)
      del metadata_df['task_names']
      del metadata_df['basename']
      save_metadata(tasks, metadata_df, self.data_dir)
      DiskDataset._save_metadata(metadata_df, self.data_dir, tasks)
      return tasks, metadata_df
    raise ValueError("No Metadata Found On Disk")

  @staticmethod
  def _save_metadata(metadata_df: pd.DataFrame, data_dir: str,
                     tasks: Optional[Sequence]) -> None:
    """Saves the metadata for a DiskDataset

    Parameters
    ----------
    metadata_df: pd.DataFrame
      The dataframe which will be written to disk.
    data_dir: str
      Directory to store metadata
    tasks: Optional[Sequence]
      Tasks of DiskDataset. If `None`, an empty list of tasks is written to
      disk.
    """
    if tasks is None:
      tasks = []
    elif isinstance(tasks, np.ndarray):
      tasks = tasks.tolist()
    metadata_filename = os.path.join(data_dir, "metadata.csv.gzip")
    tasks_filename = os.path.join(data_dir, "tasks.json")
    with open(tasks_filename, 'w') as fout:
      json.dump(tasks, fout)
    metadata_df.to_csv(metadata_filename, index=False, compression='gzip')

  @staticmethod
  def _construct_metadata(metadata_entries: List) -> pd.DataFrame:
    """Construct a dataframe containing metadata.

    Parameters
    ----------
    metadata_entries: list
      metadata_entries should have elements returned by write_data_to_disk
      above.
    """
    columns = ('ids', 'X', 'y', 'w')
    columns = ('ids', 'X', 'y', 'w', 'ids_shape', 'X_shape', 'y_shape',
               'w_shape')
    metadata_df = pd.DataFrame(metadata_entries, columns=columns)
    return metadata_df

@@ -1068,46 +1215,99 @@ class DiskDataset(Dataset):

    Returns
    -------
    List with values `[out_ids, out_X, out_y, out_w]` with filenames of locations to disk which these respective arrays were written.
    List with values `[out_ids, out_X, out_y, out_w, out_ids_shape,
    out_X_shape, out_y_shape, out_w_shape]` with filenames of locations to
    disk which these respective arrays were written.
    """
    if X is not None:
      out_X: Optional[str] = "%s-X.npy" % basename
      save_to_disk(X, os.path.join(data_dir, out_X))  # type: ignore
      out_X_shape = X.shape
    else:
      out_X = None
      out_X_shape = None

    if y is not None:
      out_y: Optional[str] = "%s-y.npy" % basename
      save_to_disk(y, os.path.join(data_dir, out_y))  # type: ignore
      out_y_shape = y.shape
    else:
      out_y = None
      out_y_shape = None

    if w is not None:
      out_w: Optional[str] = "%s-w.npy" % basename
      save_to_disk(w, os.path.join(data_dir, out_w))  # type: ignore
      out_w_shape = w.shape
    else:
      out_w = None
      out_w_shape = None

    if ids is not None:
      out_ids: Optional[str] = "%s-ids.npy" % basename
      save_to_disk(ids, os.path.join(data_dir, out_ids))  # type: ignore
      out_ids_shape = ids.shape
    else:
      out_ids = None
      out_ids_shape = None

    # note that this corresponds to the _construct_metadata column order
    return [out_ids, out_X, out_y, out_w]
    return [
        out_ids, out_X, out_y, out_w, out_ids_shape, out_X_shape, out_y_shape,
        out_w_shape
    ]

  def save_to_disk(self) -> None:
    """Save dataset to disk."""
    save_metadata(self.tasks, self.metadata_df, self.data_dir)
    DiskDataset._save_metadata(self.metadata_df, self.data_dir, self.tasks)
    self._cached_shards = None

  def move(self, new_data_dir: str) -> None:
    """Moves dataset to new directory."""
    if os.path.isdir(new_data_dir):
  def move(self, new_data_dir: str,
           delete_if_exists: Optional[bool] = True) -> None:
    """Moves dataset to new directory.

    Note
    ----
    This is a stateful operation! `self.data_dir` will be moved into
    `new_data_dir`. If `delete_if_exists` is set to `True` (by default this is
    set `True`), then `new_data_dir` is deleted if it's a pre-existing
    directory.

    Parameters
    ----------
    new_data_dir: str
      The new directory name to move this to dataset to.
    delete_if_exists: Optional[bool] (default True)
      If this option is set, delete the destination directory if it exists
      before moving. This is set to True by default to be backwards compatible
      with behavior in earlier versions of DeepChem.
    """
    if delete_if_exists and os.path.isdir(new_data_dir):
      shutil.rmtree(new_data_dir)
    shutil.move(self.data_dir, new_data_dir)
    if delete_if_exists:
      self.data_dir = new_data_dir
    else:
      self.data_dir = os.path.join(new_data_dir,
                                   os.path.basename(self.data_dir))

  def copy(self, new_data_dir: str) -> "DiskDataset":
    """Copies dataset to new directory.

    Note
    ----
    This is a stateful operation! Any data at `new_data_dir` will be deleted
    and `self.data_dir` will be deep copied into `new_data_dir`.

    Parameters
    ----------
    new_data_dir: str
      The new directory name to copy this to dataset to.
    """
    if os.path.isdir(new_data_dir):
      shutil.rmtree(new_data_dir)
    shutil.copytree(self.data_dir, new_data_dir)
    return DiskDataset(new_data_dir)

  def get_task_names(self) -> np.ndarray:
    """
@@ -1116,21 +1316,49 @@ class DiskDataset(Dataset):
    return self.tasks

  def reshard(self, shard_size: int) -> None:
    """Reshards data to have specified shard size."""
    """Reshards data to have specified shard size.

    Examples
    --------
    >>> import deepchem as dc
    >>> import numpy as np
    >>> X = np.random.rand(100, 10)
    >>> d = dc.data.DiskDataset.from_numpy(X)
    >>> d.reshard(shard_size=10)
    >>> d.get_number_shards()
    10

    Note
    ----
    If this `DiskDataset` is in `legacy_metadata` format, reshard will
    convert this dataset to have non-legacy metadata.
    """
    # Create temp directory to store resharded version
    reshard_dir = tempfile.mkdtemp()

    n_shards = self.get_number_shards()

    # Get correct shapes for y/w
    tasks = self.get_task_names()
    _, y_shape, w_shape, _ = self.get_shape()
    if len(y_shape) == 1:
      y_shape = (len(y_shape), len(tasks))
    if len(w_shape) == 1:
      w_shape = (len(w_shape), len(tasks))

    # Write data in new shards
    def generator():
      tasks = self.get_task_names()
      X_next = np.zeros((0,) + self.get_data_shape())
      y_next = np.zeros((0,) + (len(tasks),))
      w_next = np.zeros((0,) + (len(tasks),))
      y_next = np.zeros((0,) + y_shape[1:])
      w_next = np.zeros((0,) + w_shape[1:])
      ids_next = np.zeros((0,), dtype=object)
      for shard_num, (X, y, w, ids) in enumerate(self.itershards()):
        logger.info("Resharding shard %d/%d" % (shard_num, n_shards))
        # Handle shapes
        X = np.reshape(X, (len(X),) + self.get_data_shape())
        # Note that this means that DiskDataset resharding currently doesn't
        # work for datasets that aren't regression/classification.
        y = np.reshape(y, (len(y),) + y_shape[1:])
        w = np.reshape(w, (len(w),) + w_shape[1:])
        X_next = np.concatenate([X_next, X], axis=0)
        y_next = np.concatenate([y_next, y], axis=0)
        w_next = np.concatenate([w_next, w], axis=0)
@@ -1148,6 +1376,8 @@ class DiskDataset(Dataset):
        generator(), data_dir=reshard_dir, tasks=self.tasks)
    shutil.rmtree(self.data_dir)
    shutil.move(reshard_dir, self.data_dir)
    # Should have updated to non-legacy metadata
    self.legacy_metadata = False
    self.metadata_df = resharded_dataset.metadata_df
    # Note that this resets the cache internally
    self.save_to_disk()
@@ -1158,10 +1388,14 @@ class DiskDataset(Dataset):
    """
    if not len(self.metadata_df):
      raise ValueError("No data in dataset.")
    if self.legacy_metadata:
      sample_X = load_from_disk(
          os.path.join(self.data_dir,
                       next(self.metadata_df.iterrows())[1]['X']))
      return np.shape(sample_X)[1:]
    else:
      X_shape, _, _, _ = self.get_shape()
      return X_shape[1:]

  def get_shard_size(self) -> int:
    """Gets size of shards on disk."""
@@ -1420,7 +1654,7 @@ class DiskDataset(Dataset):
      pool.close()
      metadata_rows = [r.get() for r in results]
      metadata_df = DiskDataset._construct_metadata(metadata_rows)
      save_metadata(tasks, metadata_df, out_dir)
      DiskDataset._save_metadata(metadata_df, out_dir, tasks)
      dataset = DiskDataset(out_dir)
    else:

@@ -1503,44 +1737,23 @@ class DiskDataset(Dataset):
      Tasks in this dataset
    data_dir: Optional[str], optional (default None)
      The directory to write this dataset to. If none is specified, will use
      a temporary dataset instead.
      a temporary directory instead.

    Returns
    -------
    A `DiskDataset` constructed from the provided information.
    """
    n_samples = len(X)
    if ids is None:
      ids = np.arange(n_samples)

    if y is not None:
      if w is None:
        if len(y.shape) == 1:
          w = np.ones(y.shape[0], np.float32)
        else:
          w = np.ones((y.shape[0], 1), np.float32)

    # To unify shape handling so from_numpy behaves like NumpyDataset, we just
    # make a NumpyDataset under the hood
    dataset = NumpyDataset(X, y, w, ids)
    if tasks is None:
        if len(y.shape) > 1:
          n_tasks = y.shape[1]
        else:
          n_tasks = 1
        tasks = np.arange(n_tasks)

    else:
      if w is not None:
        warnings.warn('y is None but w is not None. Setting w to None',
                      UserWarning)
        w = None

      if tasks is not None:
        warnings.warn('y is None but tasks is not None. Setting tasks to None',
                      UserWarning)
        tasks = None
      tasks = dataset.get_task_names()

    # raw_data = (X, y, w, ids)
    return DiskDataset.create_dataset(
        [(X, y, w, ids)], data_dir=data_dir, tasks=tasks)
        [(dataset.X, dataset.y, dataset.w, dataset.ids)],
        data_dir=data_dir,
        tasks=tasks)

  @staticmethod
  def merge(datasets: Iterable["DiskDataset"],
@@ -1563,10 +1776,13 @@ class DiskDataset(Dataset):
      except AttributeError:
        pass
    if tasks:
      if len(tasks) < len(datasets) or len(set(map(tuple, tasks))) > 1:
      task_tuples = [tuple(task_list) for task_list in tasks]
      if len(tasks) < len(datasets) or len(set(task_tuples)) > 1:
        raise ValueError(
            'Cannot merge datasets with different task specifications')
      tasks = tasks[0]
      merge_tasks = tasks[0]
    else:
      merge_tasks = []

    def generator():
      for ind, dataset in enumerate(datasets):
@@ -1575,7 +1791,7 @@ class DiskDataset(Dataset):
        yield (X, y, w, ids)

    return DiskDataset.create_dataset(
        generator(), data_dir=merge_dir, tasks=tasks)
        generator(), data_dir=merge_dir, tasks=merge_tasks)

  def subset(self, shard_nums: Sequence[int],
             subset_dir: Optional[str] = None) -> "DiskDataset":
@@ -1657,6 +1873,10 @@ class DiskDataset(Dataset):
      A DiskDataset with a single shard.

    """
    # Create temp directory to store shuffled version
    shuffle_dir = tempfile.mkdtemp()
    n_shards = self.get_number_shards()

    all_X = []
    all_y = []
    all_w = []
@@ -1978,6 +2198,46 @@ class DiskDataset(Dataset):
  def get_shape(self) -> Tuple[Shape, Shape, Shape, Shape]:
    """Finds shape of dataset."""
    n_tasks = len(self.get_task_names())
    n_rows = len(self.metadata_df.index)
    # If shape metadata is available use it to directly compute shape from
    # metadata
    if not self.legacy_metadata:
      for shard_num in range(n_rows):
        row = self.metadata_df.iloc[shard_num]
        if row['X_shape'] is not None:
          shard_X_shape = make_tuple(str(row['X_shape']))
        else:
          shard_X_shape = tuple()
        if n_tasks > 0:
          if row['y_shape'] is not None:
            shard_y_shape = make_tuple(str(row['y_shape']))
          else:
            shard_y_shape = tuple()
          if row['w_shape'] is not None:
            shard_w_shape = make_tuple(str(row['w_shape']))
          else:
            shard_w_shape = tuple()
        else:
          shard_y_shape = tuple()
          shard_w_shape = tuple()
        if row['ids_shape'] is not None:
          shard_ids_shape = make_tuple(str(row['ids_shape']))
        else:
          shard_ids_shape = tuple()
        if shard_num == 0:
          X_shape, y_shape, w_shape, ids_shape = np.array(
              shard_X_shape), np.array(shard_y_shape), np.array(
                  shard_w_shape), np.array(shard_ids_shape)
        else:
          X_shape[0] += shard_X_shape[0]
          if n_tasks > 0:
            y_shape[0] += shard_y_shape[0]
            w_shape[0] += shard_w_shape[0]
          ids_shape[0] += shard_ids_shape[0]
      return tuple(X_shape), tuple(y_shape), tuple(w_shape), tuple(ids_shape)
    # In absense of shape metadata, fall back to loading data from disk to
    # find shape.
    else:
      for shard_num, (X, y, w, ids) in enumerate(self.itershards()):
        if shard_num == 0:
          X_shape = np.array(X.shape)
+81 B

File added.

No diff preview for this file type.

+7.94 KiB

File added.

No diff preview for this file type.

+1.15 KiB

File added.

No diff preview for this file type.

+7.94 KiB

File added.

No diff preview for this file type.

Loading