Commit e44e811d authored by Bharath Ramsundar's avatar Bharath Ramsundar
Browse files

Cleanup

parent 1f34ec68
Loading
Loading
Loading
Loading
+29 −47
Original line number Diff line number Diff line
@@ -13,10 +13,7 @@ from functools import partial
from deepchem.utils.save import save_to_disk
from deepchem.utils.save import load_from_disk
from deepchem.utils.save import log
####################################################### DEBUG
import time
import sys
####################################################### DEBUG

__author__ = "Bharath Ramsundar"
__copyright__ = "Copyright 2016, Stanford University"
@@ -65,7 +62,8 @@ class Dataset(object):

  @staticmethod
  def write_dataframe(val, data_dir, featurizer=None, tasks=None,
                      raw_data=None, basename=None, mol_id_field="mol_id"):
                      raw_data=None, basename=None, mol_id_field="mol_id",
                      verbosity=None):
    """Writes data from dataframe to disk."""
    if featurizer is not None and tasks is not None:
      feature_type = featurizer.__class__.__name__
@@ -73,23 +71,15 @@ class Dataset(object):
      # TODO(rbharath): This is a hack. clean up.
      if not len(df):
        return None
      ############################################################## DEBUG
      #print("About to call convert_df_to_numpy")
      #print("mol_id_field")
      #print(mol_id_field)
      ############################################################## DEBUG
      ############################################################## DEBUG
      ############################################################## TIMING
      time1 = time.time()
      ############################################################## DEBUG
      ids, X, y, w = convert_df_to_numpy(df, feature_type, tasks, mol_id_field)
      ############################################################## DEBUG
      ############################################################## TIMING
      ids, X, y, w = convert_df_to_numpy(df, feature_type, tasks, mol_id_field,
                                         verbosity)
      ############################################################## TIMING
      time2 = time.time()
      print("CONVERT_DF_TO_NUMPY TOOK %0.3f s" % (time2-time1))
      ############################################################## DEBUG
      ############################################################## DEBUG
      #print("convert_df_to_numpy returned successfully")
      #sys.stdout.flush()
      ############################################################## DEBUG
      log("TIMING: convert_df_to_numpy took %0.3f s" % (time2-time1), verbosity)
      ############################################################## TIMING
    else:
      ids, X, y, w = raw_data
      basename = ""
@@ -609,71 +599,63 @@ def compute_sums_and_nb_sample(tensor, W=None):

# The following are all associated with Dataset, but are separate functions to
# make it easy to use multiprocessing.
def convert_df_to_numpy(df, feature_type, tasks, mol_id_field):
  """Transforms a featurized dataset df into standard set of numpy arrays"""
def convert_df_to_numpy(df, feature_type, tasks, mol_id_field, verbosity=None):
  """Transforms a dataframe containing deepchem input into numpy arrays"""
  if feature_type not in df.keys():
    raise ValueError(
        "Featurized data does not support requested feature_type %s." % feature_type)
  # perform common train/test split across all tasks
  n_samples = df.shape[0]
  n_tasks = len(tasks)
  ############################################################## DEBUG
  ############################################################## TIMING
  time1 = time.time()
  ############################################################## DEBUG
  ############################################################## TIMING
  y = np.hstack([
      np.reshape(np.array(df[task].values), (n_samples, 1)) for task in tasks])
  ############################################################## DEBUG
  ############################################################## TIMING
  time2 = time.time()
  print("CONVERT_DF_TO_NUMPY Y COMP TOOK %0.3f s" % (time2-time1))
  ############################################################## DEBUG
  log("TIMING: convert_df_to_numpy y computation took %0.3f s" % (time2-time1),
      verbosity)
  ############################################################## TIMING
  w = np.ones((n_samples, n_tasks))
  missing = np.zeros_like(y).astype(int)
  feature_shape = None
  ############################################################## DEBUG
  ############################################################## TIMING
  time1 = time.time()
  ############################################################## DEBUG
  ############################################################## TIMING
  for ind in range(n_samples):
    for task in range(n_tasks):
      if y[ind, task] == "":
        missing[ind, task] = 1
  x_list = list(df[feature_type].values)
  ############################################################## DEBUG
  print("x_list")
  print(x_list)
  print("[type(elt) for elt in x_list]")
  print([type(elt) for elt in x_list])
  ############################################################## DEBUG
  valid_inds = np.array([1 if elt.size > 0 else 0 for elt in x_list], dtype=bool)
  x_list = [elt for (is_valid, elt) in zip(valid_inds, x_list) if is_valid]
  x = np.squeeze(np.array(x_list))
  ############################################################## DEBUG
  ############################################################## TIMING
  time2 = time.time()
  print("CONVERT_DF_TO_NUMPY X COMP TOOK %0.3f s" % (time2-time1))
  ############################################################## DEBUG
  ############################################################## DEBUG
  log("TIMING: convert_df_to_numpy x computation took %0.3f s" % (time2-time1),
      verbosity)
  ############################################################## TIMING
  sorted_ids = df[mol_id_field].values
  ############################################################## DEBUG

  # Set missing data to have weight zero
  # TODO(rbharath): There's a better way to do this with numpy indexing
  ############################################################## DEBUG
  ############################################################## TIMING
  time1 = time.time()
  ############################################################## DEBUG
  ############################################################## TIMING
  for ind in range(n_samples):
    for task in range(n_tasks):
      if missing[ind, task]:
        y[ind, task] = 0.
        w[ind, task] = 0.
  ############################################################## DEBUG
  ############################################################## TIMING
  time2 = time.time()
  print("CONVERT_DF_TO_NUMPY MISSING COMP TOOK %0.3f s" % (time2-time1))
  ############################################################## DEBUG
  log("TIMING: convert_df_to_numpy missing elts computation took %0.3f s"
      % (time2-time1), verbosity)
  ############################################################## TIMING

  ############################################################## DEBUG
  sorted_ids = sorted_ids[valid_inds]
  y = y[valid_inds]
  w = w[valid_inds]
  ############################################################## DEBUG
  # Adding this assertion in to avoid ill-formed outputs.
  assert len(sorted_ids) == len(x) == len(y) == len(w)
  return sorted_ids, x.astype(float), y.astype(float), w.astype(float)
+0 −7
Original line number Diff line number Diff line
@@ -38,13 +38,6 @@ class TestBasicDatasetAPI(TestDatasetAPI):
  def test_get_data_shape(self):
    """Test that get_data_shape returns currect data shape"""
    solubility_dataset = self.load_solubility_data()
    ################################################################# DEBUG
    print("solubility_dataset.get_data_shape()")
    print(solubility_dataset.get_data_shape())
    X, y, w, ids = solubility_dataset.to_numpy()
    print("X.shape, y.shape, w.shape, ids.shape")
    print(X.shape, y.shape, w.shape, ids.shape)
    ################################################################# DEBUG
    assert solubility_dataset.get_data_shape() == (1024,) 
    
    multitask_dataset = self.load_multitask_data()
+57 −163
Original line number Diff line number Diff line
@@ -31,13 +31,11 @@ import time
import sys
############################################################## DEBUG

#def _process_helper(row, loader, fields, input_type):
#  return loader._process_raw_sample(input_type, row, fields)


# Shortcut to multiprocessing's logger
# The error() function and the LogExceptions, LoggingPool classes were adapted
# from
# http://stackoverflow.com/questions/6728236/exception-thrown-in-multiprocessing-pool-not-detected
def error(msg, *args):
  """Shortcut to multiprocessing's logger"""
  ############################################################# DEBUG
  import sys
  sys.stdout.flush()
@@ -45,6 +43,11 @@ def error(msg, *args):
  return mp.get_logger().error(msg, *args)

class LogExceptions(object):
  """Used to wrap thrown exceptions with a stack trace.

  Python's multiprocessing does a terrible job at error handling. This
  class wraps thrown exceptions with a stack trace to facilitate debugging.
  """
  def __init__(self, callable):
    self.__callable = callable

@@ -64,6 +67,7 @@ class LogExceptions(object):
    return result

class LoggingPool(Pool):
  """Wraps multiprocessing.Pool to enable logging."""
  def apply_async(self, func, args=(), kwds={}, callback=None):
    return Pool.apply_async(self, LogExceptions(func), args, kwds, callback)

@@ -71,10 +75,9 @@ class LoggingPool(Pool):
    return Pool.map_async(self, LogExceptions(func), iterable, chunksize, callback)

def featurize_map_function(args):
  #try:
  ############################################################## DEBUG
  ############################################################## TIMING
  time1 = time.time()
  ############################################################## DEBUG
  ############################################################## TIMING
  ((loader, shard_size, input_type, data_dir), (shard_num, raw_df_shard)) = args
  log("Loading shard %d of size %s from file." % (shard_num+1, str(shard_size)),
      loader.verbosity)
@@ -82,47 +85,24 @@ def featurize_map_function(args):
  write_fn = partial(
      Dataset.write_dataframe, data_dir=data_dir,
      featurizer=loader.featurizer, tasks=loader.tasks,
      mol_id_field=loader.id_field)
  #process_fn = partial(_process_helper, loader=loader,
  #                     fields=raw_df_shard.keys(),
  #                     input_type=input_type)
  ############################################################## DEBUG
      mol_id_field=loader.id_field, verbosity=loader.verbosity)
  ############################################################## TIMING
  shard_time1 = time.time()
  ############################################################## DEBUG
  ############################################################## TIMING
  metadata_row = loader._featurize_shard(
  #    raw_df_shard, process_fn, write_fn, shard_num, input_type)
  ############################################################## DEBUG
      raw_df_shard, write_fn, shard_num, input_type)
  ############################################################## DEBUG
  ############################################################## TIMING
  shard_time2 = time.time()
  print("SHARD FEATURIZATION TOOK %0.3f s" % (shard_time2-shard_time1))
  ############################################################## DEBUG
  log("Sucessfully featurized shard %d" % shard_num, loader.verbosity)
  ############################################################## DEBUG
  log("TIMING: shard featurization took %0.3f s" % (shard_time2-shard_time1),
      loader.verbosity)
  ############################################################## TIMING
  ############################################################## TIMING
  time2 = time.time()
  print("FEATURIZATION MAP FUNCTION TOOK %0.3f s" % (time2-time1))
  ############################################################## DEBUG
  log("TIMING: featurization map function took %0.3f s" % (time2-time1),
      loader.verbosity)
  ############################################################## TIMING
  return metadata_row
  #except:
  #  print("Shard %d featurization crashed!" % shard_num)
  #  return None


#def _process_field(val):
#  """Parse data in a field."""
#  if (isinstance(val, numbers.Number) or isinstance(val, np.ndarray)):
#    return val
#  elif isinstance(val, list):
#    return [_process_field(elt) for elt in val]
#  elif isinstance(val, str):
#    try:
#      return float(val)
#    except ValueError:
#      return val
#  elif isinstance(val, Chem.Mol):
#    return val
#  else:
#    raise ValueError("Field of unrecognized type: %s" % str(val))


class DataLoader(object):
  """
@@ -159,9 +139,9 @@ class DataLoader(object):
                num_shards_per_batch=24, worker_pool=None,
                logging=True, debug=False):
    """Featurize provided files and write to specified location."""
    ############################################################## DEBUG
    ############################################################## TIMING
    time1 = time.time()
    ############################################################## DEBUG
    ############################################################## TIMING
    log("Loading raw samples now.", self.verbosity)
    log("shard_size: %d" % shard_size, self.verbosity)
    log("num_shards_per_batch: %d" % num_shards_per_batch, self.verbosity)
@@ -181,31 +161,29 @@ class DataLoader(object):
    if logging:
      mp.log_to_stderr()
    if worker_pool is None:
      ############################################################## DEBUG
      if logging:
        worker_pool = LoggingPool(processes=1)
      else:
        worker_pool = mp.Pool(processes=1)
      ############################################################## DEBUG
    log("Spawning workers now.", self.verbosity)
    metadata_rows = []
    data_iterator = it.izip(
        it.repeat((self, shard_size, input_type, data_dir)),
        enumerate(load_data(input_files, shard_size, self.verbosity)))
    ###### Turns out python map is terrible and exhausts the
    ###### generator as given. Solution seems to be to to manually pull out N elements
    ###### from iterator, then to map on only those N elements. BLECH. Python
    ###### should do a better job here.
    # Turns out python map is terrible and exhausts the generator as given.
    # Solution seems to be to to manually pull out N elements from iterator,
    # then to map on only those N elements. BLECH. Python should do a better
    # job here.
    num_batches = 0
    ############################################################## DEBUG
    ############################################################## TIMING
    time2 = time.time()
    print("PRE MAP FEATURIZATION TOOK %0.3f s" % (time2-time1))
    ############################################################## DEBUG
    log("TIMING: pre-map featurization took %0.3f s" % (time2-time1))
    ############################################################## TIMING
    while True:
      log("About to start processing next batch of shards", self.verbosity)
      ############################################################## DEBUG
      ############################################################## TIMING
      time1 = time.time()
      ############################################################## DEBUG
      ############################################################## TIMING
      iterator = itertools.islice(data_iterator, num_shards_per_batch)
      if not debug:
        batch_metadata = worker_pool.map(
@@ -214,10 +192,11 @@ class DataLoader(object):
        batch_metadata = []
        for elt in iterator:
          batch_metadata.append(featurize_map_function(elt))
      ############################################################## DEBUG
      ############################################################## TIMING
      time2 = time.time()
      print("MAP CALL TOOK %0.3f s" % (time2-time1))
      ############################################################## DEBUG
      log("TIMING: map call on batch took %0.3f s" % (time2-time1),
           self.verbosity)
      ############################################################## TIMING
      if batch_metadata:
        metadata_rows.extend([elt for elt in batch_metadata if elt is not None])
        num_batches += 1
@@ -225,49 +204,24 @@ class DataLoader(object):
            % (shard_size * num_shards_per_batch * num_batches), self.verbosity)
      else:
        break
    ############################################################## DEBUG
    ############################################################## TIMING
    time1 = time.time()
    ############################################################## DEBUG
    ############################################################## TIMING

    # TODO(rbharath): This whole bit with metadata_rows is an awkward way of
    # creating a Dataset. Is there a more elegant solutions?
    dataset = Dataset(data_dir=data_dir,
                      metadata_rows=metadata_rows,
                      reload=reload, verbosity=self.verbosity)
    ############################################################## DEBUG
    ############################################################## TIMING
    time2 = time.time()
    print("POST MAP DATASET CONSTRUCTION TOOK %0.3f s" % (time2-time1))
    ############################################################## DEBUG
    print("TIMING: dataset construction took %0.3f s" % (time2-time1),
          self.verbosity)
    ############################################################## TIMING
    return dataset 

  #def _featurize_shard(self, raw_df_shard, process_fn, write_fn, shard_num,
  #                     input_type):
  ############################################################## DEBUG
  def _featurize_shard(self, raw_df_shard, write_fn, shard_num, input_type):
  ############################################################## DEBUG
  def _featurize_shard(self, df_shard, write_fn, shard_num, input_type):
    """Featurizes a shard of an input dataframe."""
    ############################################################## DEBUG
    time1 = time.time()
    ############################################################## DEBUG
    log("Applying processing transformation to shard.",
        self.verbosity)
    #raw_df_shard = raw_df_shard.apply(
    #    process_fn, axis=1, reduce=False)
    ############################################################## DEBUG
    time2 = time.time()
    print("PROCESSING TRANSFORMATION TOOK %0.3f s" % (time2-time1))
    ############################################################## DEBUG
    ############################################################## DEBUG
    time1 = time.time()
    ############################################################## DEBUG
    log("About to standardize dataframe.")
    #df_shard = self._standardize_df(raw_df_shard) 
    df_shard = raw_df_shard
    ############################################################## DEBUG
    time2 = time.time()
    print("STANDARDIZATION TOOK %0.3f s" % (time2-time1))
    ############################################################## DEBUG
  
    field = self.mol_field if input_type == "sdf" else self.smiles_field 
    log("Currently featurizing feature_type: %s"
        % self.featurizer.__class__.__name__, self.verbosity)
@@ -278,17 +232,15 @@ class DataLoader(object):
    elif isinstance(self.featurizer, ComplexFeaturizer):
      self._featurize_complexes(df_shard, self.featurizer)
    basename = "shard-%d" % shard_num 
    ############################################################## DEBUG
    ############################################################## TIMING
    time1 = time.time()
    ############################################################## DEBUG
    ############################################################## DEBUG
    print("About to invoke write_fn")
    ############################################################## DEBUG
    ############################################################## TIMING
    metadata_row = write_fn((basename, df_shard))
    ############################################################## DEBUG
    ############################################################## TIMING
    time2 = time.time()
    print("WRITING METADATA ROW TOOK %0.3f s" % (time2-time1))
    ############################################################## DEBUG
    log("TIMING: writing metadata row took %0.3f s" % (time2-time1),
        self.verbosity)
    ############################################################## TIMING
    return metadata_row

  def _shard_files_exist(self, feature_dir):
@@ -298,45 +250,6 @@ class DataLoader(object):
        return True
    return False

  #def _process_raw_sample(self, input_type, row, fields):
  #  """Extract information from row data."""
  #  data = {}
  #  if input_type == "csv":
  #    for ind, field in enumerate(fields):
  #      data[field] = _process_field(row[ind])
  #  elif input_type in ["pandas-pickle", "pandas-joblib", "sdf"]:
  #    for field in fields:
  #      data[field] = _process_field(row[field])
  #  else:
  #    raise ValueError("Unrecognized input_type")
  #  if self.threshold is not None:
  #    for task in self.tasks:
  #      raw = _process_field(data[task])
  #      if not isinstance(raw, float):
  #        raise ValueError("Cannot threshold non-float fields.")
  #      data[field] = 1 if raw > self.threshold else 0
  #  return data

  #def _standardize_df(self, ori_df):
  #  """Copy specified columns to new df with standard column names.

  #  TODO(rbharath): I think think function is now unnecessary (since the
  #                  dataframes are only temporary and not on disk). Should
  #                  be able to remove this function.
  #  """
  #  df = pd.DataFrame(ori_df[[self.id_field]])
  #  df.columns = ["mol_id"]
  #  if self.smiles_field is not None:
  #    df["smiles"] = ori_df[[self.smiles_field]]
  #  for task in self.tasks:
  #    df[task] = ori_df[[task]]
  #  if self.user_specified_features is not None:
  #    for feature in self.user_specified_features:
  #      df[feature] = ori_df[[feature]]
  #  if self.mol_field is not None:
  #    df["mol"] = ori_df[[self.mol_field]]
  #  return df

  def _featurize_complexes(self, df, featurizer, parallel=True,
                           worker_pool=None):
    """Generates circular fingerprints for dataset."""
@@ -358,7 +271,6 @@ class DataLoader(object):
    else:
      features = worker_pool.map_sync(featurize_wrapper, 
                                      zip(ligand_pdbs, protein_pdbs))
      #features = featurize_wrapper(zip(ligand_pdbs, protein_pdbs))
    df[featurizer.__class__.__name__] = list(features)

  def _featurize_mol(self, df, featurizer, parallel=True, field="mol",
@@ -378,7 +290,6 @@ class DataLoader(object):
      TODO(rbharath): Needs to be merged with _featurize_compounds
    """
    assert field in ["mol", "smiles"]
    #sample_mols = df["mol"].tolist()
    sample_elems = df[field].tolist()

    if worker_pool is None:
@@ -422,31 +333,14 @@ class DataLoader(object):
          -) PDB files for interacting molecules.
        3) User specified featurizations.
    """
    ############################################################## DEBUG
    ############################################################## TIMING
    time1 = time.time()
    ############################################################## DEBUG
    log("Aggregating User-Specified Features", self.verbosity)
    features_data = []
    ############################################################## DEBUG
    ############################################################## TIMING
    df[featurizer.feature_fields] = df[featurizer.feature_fields].apply(pd.to_numeric)
    X_shard = df.as_matrix(columns=featurizer.feature_fields)
    #df[featurizer.__class__.__name__] = X_shard.tolist()
    df[featurizer.__class__.__name__] = [np.array(elt) for elt in X_shard.tolist()]
    ############################################################## DEBUG
    print("X_shard")
    print(X_shard)
    print("type(X_shard)")
    print(type(X_shard))
    print("[type(elt) for elt in X_shard.tolist()]")
    print([type(elt) for elt in X_shard.tolist()])
    #for ind, row in df.iterrows():
    #  # pandas rows are tuples (row_num, row_data)
    #  feature_list = []
    #  for feature_name in featurizer.feature_fields:
    #    feature_list.append(row[feature_name])
    #  features_data.append(np.array(feature_list))
    #df[featurizer.__class__.__name__] = features_data
    ############################################################## DEBUG
    ############################################################## TIMING
    time2 = time.time()
    print("USER SPECIFIED PROCESSING TOOK %0.3f s" % (time2-time1))
    ############################################################## DEBUG
    log("TIMING: user specified processing took %0.3f s" % (time2-time1),
        self.verbosity)
    ############################################################## TIMING