Commit 34925328 authored by evanfeinberg's avatar evanfeinberg
Browse files

improved normalization and added parallelization

parent 0339c2d4
Loading
Loading
Loading
Loading
+6 −6
Original line number Diff line number Diff line
@@ -258,7 +258,7 @@ def create_model(args):
  paths = [feature_dir]
  if not args.skip_train_test_split:
    train_test_split(
        paths, args.output_transforms, args.input_transforms, args.feature_types,
        paths, args.input_transforms, args.output_transforms, args.feature_types,
        args.splittype, args.mode, data_dir)

  print("+++++++++++++++++++++++++++++++++")
@@ -277,12 +277,12 @@ def create_model(args):
  stats_out_test = os.path.join(data_dir, "test-stats.txt")
  eval_trained_model(
      model_name, model_dir, data_dir, csv_out_train,
      stats_out_train, split="train")
      stats_out_train, args.output_transforms, split="train")
  print("Eval Model on Test")
  print("------------------")
  eval_trained_model(
      model_name, model_dir, data_dir, csv_out_test,
      stats_out_test, split="test")
      stats_out_test, args.output_transforms, split="test")

def parse_args(input_args=None):
  """Parse command-line arguments."""
@@ -309,8 +309,8 @@ def featurize_inputs_wrapper(args):

def train_test_split_wrapper(args):
  """Wrapper function that calls _train_test_split_wrapper after unwrapping args."""
  train_test_split(args.paths, args.output_transforms,
                   args.input_transforms, args.feature_types,
  train_test_split(args.paths, args.input_transforms, 
                   args.output_transforms, args.feature_types,
                   args.splittype, args.mode, args.data_dir)

def fit_model_wrapper(args):
@@ -323,7 +323,7 @@ def eval_trained_model_wrapper(args):
  """Wrapper function that calls _eval_trained_model with unwrapped args."""
  eval_trained_model(
      args.model, args.model_dir, args.data_dir,
      args.csv_out, args.stats_out, split="test")
      args.csv_out, args.stats_out, args.output_transforms, split="test")

def main():
  """Invokes argument parser."""
+19 −6
Original line number Diff line number Diff line
@@ -11,6 +11,7 @@ import warnings
from deep_chem.utils.preprocess import get_metadata_filename
from deep_chem.utils.preprocess import get_sorted_task_names
from deep_chem.utils.preprocess import get_task_type
from deep_chem.utils.preprocess import undo_transform
from deep_chem.utils.save import load_model
from deep_chem.utils.save import load_sharded_dataset
from sklearn.metrics import mean_squared_error
@@ -20,18 +21,20 @@ from sklearn.metrics import matthews_corrcoef
from sklearn.metrics import recall_score
from sklearn.metrics import accuracy_score
import pandas as pd
import sys

__author__ = "Bharath Ramsundar"
__copyright__ = "Copyright 2015, Stanford University"
__license__ = "LGPL"

def eval_trained_model(model_name, model_dir, data_dir,
                       csv_out, stats_out, split="test"):
                       csv_out, stats_out, output_transforms, split="test"):
  """Evaluates a trained model on specified data."""
  model = load_model(model_name, model_dir)
  task_type = get_task_type(model_name)
  task_names, pred_y_df = compute_y_pred(model, data_dir, csv_out, split)
  compute_model_performance(pred_y_df, task_names, task_type, stats_out)
  compute_model_performance(pred_y_df, task_names, 
                            task_type, stats_out, output_transforms)

def compute_y_pred(model, data_dir, csv_out, split):
  """
@@ -42,7 +45,8 @@ def compute_y_pred(model, data_dir, csv_out, split):
  task_names = metadata_df.iterrows().next()[1]['task_names']
  pred_task_names = ["%s_pred" % task_name for task_name in task_names]
  w_task_names = ["%s_weight" % task_name for task_name in task_names]
  column_names = ['ids'] + task_names + pred_task_names + w_task_names
  column_names = (['ids'] + task_names + pred_task_names + w_task_names
                         + ["y_means", "y_stds"])
  pred_y_df = pd.DataFrame(columns=column_names)

  split_df = metadata_df.loc[metadata_df['split'] == split]
@@ -51,8 +55,8 @@ def compute_y_pred(model, data_dir, csv_out, split):

  for i, row in split_df.iterrows():
    print("Evaluating on %s batch %d out of %d" % (split, i+1, nb_batch))
    X = load_sharded_dataset(row['X'])
    y = load_sharded_dataset(row['y'])
    X = load_sharded_dataset(row['X-transformed'])
    y = load_sharded_dataset(row['y-transformed'])
    w = load_sharded_dataset(row['w'])
    ids = load_sharded_dataset(row['ids'])

@@ -60,6 +64,7 @@ def compute_y_pred(model, data_dir, csv_out, split):
      nb_block = float(sys.getsizeof(X))/MAX_GPU_RAM
      nb_sample = np.shape(X)[0]
      interval_points = np.linspace(0,nb_sample,nb_block+1).astype(int)
      y_preds = []
      for j in range(0,len(interval_points)-1):
        indices = range(interval_points[j],interval_points[j+1])
        X_batch = X[indices,:]
@@ -77,6 +82,8 @@ def compute_y_pred(model, data_dir, csv_out, split):
    mini_df[task_names] = y
    mini_df[pred_task_names] = y_pred
    mini_df[w_task_names] = w
    mini_df["y_means"] = split_df["y_means"]
    mini_df["y_stds"] = split_df["y_stds"]
    pred_y_df = pd.concat([pred_y_df, mini_df])

  print("Saving predictions to %s" % csv_out)
@@ -85,7 +92,7 @@ def compute_y_pred(model, data_dir, csv_out, split):

  return task_names, pred_y_df

def compute_model_performance(pred_y_df, task_names, task_type, stats_file):
def compute_model_performance(pred_y_df, task_names, task_type, stats_file, output_transforms):
  """
  Computes statistics of model on test data and saves results to csv.
  """
@@ -98,11 +105,17 @@ def compute_model_performance(pred_y_df, task_names, task_type, stats_file):

  performance_df = pd.DataFrame(columns=colnames)

  y_means = pred_y_df.iterrows().next()[1]["y_means"]
  y_stds = pred_y_df.iterrows().next()[1]["y_stds"]

  for i, task_name in enumerate(task_names):
    y = pred_y_df[task_name]
    y_pred = pred_y_df["%s_pred" % task_name]
    w = pred_y_df["%s_weight" % task_name]

    y = undo_transform(y, y_means, y_stds, output_transforms)
    y_pred = undo_transform(y_pred, y_means, y_stds, output_transforms)

    if task_type == "classification":
      y, y_pred = y[w.nonzero()], y_pred[w.nonzero()][:, 1]
      auc = compute_roc_auc_scores(y, y_pred, w)
+1 −1
Original line number Diff line number Diff line
@@ -229,6 +229,6 @@ def featurize_inputs(feature_dir, input_files, input_type, fields, field_types,

  #for input_file in input_files:
  #  featurize_input_partial(input_file)
  pool = mp.Pool(mp.cpu_count())
  pool = mp.Pool(int(mp.cpu_count()/2))
  pool.map(featurize_input_partial, input_files)
  pool.terminate()
+2 −4
Original line number Diff line number Diff line
@@ -42,13 +42,11 @@ def fit_model(model_name, model_params, model_dir, data_dir):
  MAX_GPU_RAM = float(691007488/50)
  for i, row in train_metadata.iterrows():
    print("Training on batch %d out of %d" % (i+1, nb_batch))
    X = load_sharded_dataset(row['X'])
    y = load_sharded_dataset(row['y'])
    X = load_sharded_dataset(row['X-transformed'])
    y = load_sharded_dataset(row['y-transformed'])
    w = load_sharded_dataset(row['w'])

    print("sys.getsizeof(X): %s" % str(sys.getsizeof(X)))
    if sys.getsizeof(X) > MAX_GPU_RAM:
      print("X exceeds available GPU memory size. Sharding.")
      nb_block = float(sys.getsizeof(X))/MAX_GPU_RAM
      nb_sample = np.shape(X)[0]
      interval_points = np.linspace(0,nb_sample,nb_block+1).astype(int)
+214 −31
Original line number Diff line number Diff line
@@ -35,13 +35,27 @@ def get_train_test_files(paths, train_proportion=0.8):
  all_files = []
  for path in paths:
    all_files += glob(os.path.join(path, "*.joblib"))
  train_indices = list(np.random.choice(len(all_files), int(len(all_files)*train_proportion)))
  train_indices = list(np.random.choice(len(all_files), int(len(all_files)*train_proportion), replace=False))
  test_indices = list(set(range(len(all_files)))-set(train_indices))

  train_files = [all_files[i] for i in train_indices]
  test_files = [all_files[i] for i in test_indices]
  test_files = [f for f in all_files if f not in train_files]
  return train_files, test_files

def reshuffle_train_test_split(data_dir, train_proportion=0.8):
  metadata_df = load_sharded_dataset(get_metadata_filename(data_dir))
  num_indices = metadata_df.shape[0]
  train_indices = list(np.random.choice(num_indices, num_indices*train_proportion, replace=False))
  test_indices = [i for i in range(0, num_indices) if i not in train_indices]
  print("Train indices:")
  print(train_indices)
  print("Test indices:")
  print(test_indices)
  metadata_df.iloc[train_indices]["split"] = "train"
  metadata_df.iloc[test_indices]["split"] = "test"
  save_sharded_dataset(metadata_df, get_metadata_filename(data_dir))
  return metadata_df

def get_metadata_filename(data_dir):
  """
  Get standard location for metadata file.
@@ -49,7 +63,7 @@ def get_metadata_filename(data_dir):
  metadata_filename = os.path.join(data_dir, "metadata.joblib")
  return metadata_filename

def train_test_split(paths, output_transforms, input_transforms,
def train_test_split(paths, input_transforms, output_transforms,
                     feature_types, splittype, mode, data_dir):
  """Saves transformed model."""

@@ -61,22 +75,17 @@ def train_test_split(paths, output_transforms, input_transforms,

  print("About to train/test split dataset")
  train_files, test_files = get_train_test_files(paths)
  print("train_files")
  print(train_files)
  print("test_files")
  print(test_files)
  print("About to write numpy arrays for train & test")
  train_metadata = write_dataset(train_files, data_dir, mode)
  train_metadata["split"] = "train"
  test_metadata = write_dataset(test_files, data_dir, mode)
  test_metadata["split"] = "test"

  metadata = pd.concat([train_metadata, test_metadata])
  print("metadata[:3]")
  print(metadata[:3])
  metadata['input_transforms'] = ",".join(input_transforms)
  metadata['output_transforms'] = ",".join(output_transforms)

  metadata = transform_data(metadata, input_transforms, output_transforms)

  metadata_filename = get_metadata_filename(data_dir)
  print("Saving metadata file to %s" % metadata_filename)
  save_sharded_dataset(metadata, metadata_filename)
@@ -101,14 +110,44 @@ def train_test_split(paths, output_transforms, input_transforms,
  save_sharded_dataset(stored_train, train_out)
  save_sharded_dataset(stored_test, test_out)
  '''
def write_dataset(df_files, out_dir, mode, transforms=[]):
  """
  Turns featurized dataframes into numpy files, writes them & metadata to disk.
  """
  if not os.path.exists(out_dir):
    os.makedirs(out_dir)

  write_dataset_single_partial = partial(write_dataset_single, out_dir=out_dir, mode=mode)

  #pool = mp.Pool(mp.cpu_count())
  #metadata_rows = pool.map(write_dataset_single_partial, df_files)
  #pool.terminate()
  metadata_rows = []
  for df_file in df_files:
    metadata_rows.append(write_dataset_single_partial(df_file))

  metadata_df = pd.DataFrame(metadata_rows, 
                             columns=('df_file', 'task_names', 'ids', 
                                      'X', 'X-transformed', 'y', 'y-transformed', 
                                      'w',
                                      'X_sums', 'X_sum_squares', 'X_n',
                                      'y_sums', 'y_sum_squares', 'y_n')) 

  return metadata_df

def write_dataset_single(df_file, out_dir, mode):
  print("Examining %s" % df_file)
  df = load_sharded_dataset(df_file)
  task_names = get_sorted_task_names(df)
  ids, X, y, w = df_to_numpy(df, mode)
  X_sums, X_sum_squares, X_n = compute_sums_and_nb_sample(X)
  y_sums, y_sum_squares, y_n = compute_sums_and_nb_sample(y, w)

  basename = os.path.splitext(os.path.basename(df_file))[0]
  out_X = os.path.join(out_dir, "%s-X.joblib" % basename)
  out_X_transformed = os.path.join(out_dir, "%s-X-transformed.joblib" % basename)
  out_y = os.path.join(out_dir, "%s-y.joblib" % basename)
  out_y_transformed = os.path.join(out_dir, "%s-y-transformed.joblib" % basename)
  out_w = os.path.join(out_dir, "%s-w.joblib" % basename)
  out_ids = os.path.join(out_dir, "%s-ids.joblib" % basename)

@@ -116,26 +155,168 @@ def write_dataset_single(df_file, out_dir, mode):
  save_sharded_dataset(y, out_y)
  save_sharded_dataset(w, out_w)
  save_sharded_dataset(ids, out_ids)
  return([df_file, task_names, out_ids, out_X, out_y, out_w])

def write_dataset(df_files, out_dir, mode):
  """
  Turns featurized dataframes into numpy files, writes them & metadata to disk.
  """
  if not os.path.exists(out_dir):
    os.makedirs(out_dir)

  write_dataset_single_partial = partial(write_dataset_single, out_dir=out_dir, mode=mode)

  pool = mp.Pool(mp.cpu_count())
  metadata_rows = pool.map(write_dataset_single_partial, df_files)
  return([df_file, task_names, out_ids, out_X, out_X_transformed, out_y, 
          out_y_transformed, out_w,
          X_sums, X_sum_squares, X_n, 
          y_sums, y_sum_squares, y_n])

def compute_sums_and_nb_sample(tensor, W=None):
  if W is None:
    sums = np.sum(tensor, axis=0)
    sum_squares = np.sum(np.square(tensor), axis=0)
    nb_sample = np.shape(tensor)[0]
  else:
    nb_task = np.shape(tensor)[1]
    sums = np.zeros((nb_task))
    sum_squares = np.zeros((nb_task))
    nb_sample = np.zeros((nb_task))
    for task in range(0, nb_task):
      y_task = tensor[:,task]
      W_task = W[:,task]
      nonzero_indices = np.nonzero(W_task)
      y_task_nonzero = y_task[nonzero_indices]
      sums[task] = np.sum(y_task_nonzero)
      sum_squares[task] = np.dot(y_task_nonzero, y_task_nonzero)
      nb_sample[task] = np.shape(y_task_nonzero)[0]
  return (sums, sum_squares, nb_sample)

def compute_mean_and_std(df):
  X_sums, X_sum_squares, X_n = (df['X_sums'], 
                                df['X_sum_squares'],
                                df['X_n'])
  #X_sums = np.concatenate(X_sums, axis=0)
  #X_sum_squares = np.concatenate(X_sum_squares, axis=0)
  n = np.sum(X_n)
  overall_X_sums = np.sum(X_sums, axis=0)
  overall_X_means = overall_X_sums / n
  overall_X_sum_squares = np.sum(X_sum_squares, axis=0)

  X_vars = (overall_X_sum_squares - np.square(overall_X_sums)/n)/(n)

  y_sums, y_sum_squares, y_n = (df['y_sums'].values, 
                                df['y_sum_squares'].values,
                                df['y_n'].values)
  y_sums = np.vstack(y_sums)
  y_sum_squares = np.vstack(y_sum_squares)
  n = np.sum(y_n)
  y_means = np.sum(y_sums, axis=0)/n
  y_vars = np.sum(y_sum_squares,axis=0)/n - np.square(y_means)

  return overall_X_means, np.sqrt(X_vars), y_means, np.sqrt(y_vars)

def transform_row(i, df, normalize_X, normalize_y, truncate_X, truncate_y,
                      log_X, log_y, X_means, X_stds, y_means, y_stds, trunc):
  total = df.shape[0]
  row = df.iloc[i]
  X = load_sharded_dataset(row['X'])
  if normalize_X or log_X:
    if normalize_X:
      print("Normalizing X sample %d out of %d" % (i+1,total))
      X = np.nan_to_num((X - X_means) / X_stds)
      if truncate_X:
         print("Truncating X sample %d out of %d" % (i+1,total))
         X[X > trunc] = trunc
         X[X < (-1.0*trunc)] = -1.0 * trunc
    if log_X:
      X = np.log(X)
  save_sharded_dataset(X, row['X-transformed'])

  y = load_sharded_dataset(row['y'])
  if normalize_y or log_y:    
    if normalize_y:
      print("Normalizing y sample %d out of %d" % (i+1,total))
      y = np.nan_to_num((y - y_means) / y_stds)
      if truncate_y:
        y[y > trunc] = trunc
        y[y < (-1.0*trunc)] = -1.0 * trunc
    if log_y:
      y = np.log(y)
  save_sharded_dataset(y, row['y-transformed'])  

def transform(df, normalize_X=True, normalize_y=True, 
              truncate_X=True, truncate_y=True,
              log_X=False, log_y=False, parallel=False):
  trunc = 5.0
  X_means, X_stds, y_means, y_stds = compute_mean_and_std(df)
  total = df.shape[0]
  indices = range(0, df.shape[0])
  transform_row_partial = partial(transform_row, df=df, normalize_X=normalize_X, 
                                  normalize_y=normalize_y, truncate_X=truncate_X, 
                                  truncate_y=truncate_y, log_X=log_X,
                                 log_y=log_y, X_means=X_means, X_stds=X_stds,
                                 y_means=y_means, y_stds=y_stds, trunc=trunc)
  if parallel:
    pool = mp.Pool(int(mp.cpu_count()/4))
    pool.map(transform_row_partial, indices)
    pool.terminate()
  else:
    for index in indices:
      transform_row_partial(index)

  return X_means, X_stds, y_means, y_stds

def transform_data(metadata_df, input_transforms, output_transforms):
  train_df = metadata_df.loc[metadata_df["split"] == "train"]
  test_df = metadata_df.loc[metadata_df["split"] == "test"]
  (normalize_X, truncate_x, normalize_y, 
      truncate_y, log_X, log_y) = False, False, False, False, False, False

  if "normalize-and-truncate" in input_transforms:
    normalize_X=True 
    truncate_x=True
  elif "normalize" in input_transforms:
    normalize_X=True

  if "normalize" in output_transforms:
    normalize_y=True

  if "log" in input_transforms:
    log_X = True 
  if "log" in output_transforms:
    log_y = True

  print("Transforming training data.")
  X_means, X_stds, y_means, y_stds = transform(train_df, normalize_X, 
                                               normalize_y, truncate_x,
                                               truncate_y, log_X, log_y)
  nrow = train_df.shape[0]
  train_df['X_means'] = [X_means for i in range(0,nrow)]
  train_df['X_stds'] = [X_stds for i in range(0,nrow)]
  train_df['y_means'] = [y_means for i in range(0,nrow)]
  train_df['y_stds'] = [y_stds for i in range(0,nrow)]

  print("Transforming test data.")
  X_means, X_stds, y_means, y_stds = transform(test_df, normalize_X, 
                                               normalize_y, truncate_x,
                                               truncate_y, log_X, log_y)
  nrow = test_df.shape[0]
  test_df['X_means'] = [X_means for i in range(0,nrow)]
  test_df['X_stds'] = [X_stds for i in range(0,nrow)]
  test_df['y_means'] = [y_means for i in range(0,nrow)]
  test_df['y_stds'] = [y_stds for i in range(0,nrow)]

  return(pd.concat([train_df, test_df]))

def undo_normalization(y, y_means, y_stds):
  """Undo the applied normalization transform."""
  y = y * y_means + y_stds
  return y * y_means + y_stds

  metadata_df = pd.DataFrame(metadata_rows, 
                             columns=('df_file', 'task_names', 'ids', 'X', 'y', 'w'))
  print("metadata_df[:3]")
  print(metadata_df[:3])
  return metadata_df
def undo_transform(y, y_means, y_stds, output_transforms):
  """Undo transforms on y_pred, W_pred."""
  output_transforms = [output_transforms]
  print(output_transforms)
  if (output_transforms == [""] or output_transforms == ['']
    or output_transforms == []):
    return y
  elif output_transforms == ["log"]:
    return np.exp(y)
  elif output_transforms == ["normalize"]:
    return undo_normalization(y, y_means, y_stds)
  elif output_transforms == ["log", "normalize"]:
    return np.exp(undo_normalization(y, y_means, y_stds))
  else:
    raise ValueError("Unsupported output transforms.")

def get_sorted_task_names(df):
  """
@@ -179,7 +360,7 @@ def transform_inputs(X, input_transforms):
    raise ValueError("Only know how to transform vectorial data.")
  Z = np.zeros(np.shape(X))
  # Meant to be done after normalize
  trunc = 5
  trunc = 5.0
  for feature in range(n_features):
    feature_data = X[:, feature]
    for input_transform in input_transforms:
@@ -194,10 +375,11 @@ def transform_inputs(X, input_transforms):
          if np.amax(feature_data) > trunc or np.amin(feature_data) < -trunc:
            raise ValueError("Truncation failed on feature %d" % feature)
      else:
        raise ValueError("Unsupported Input Transform")
        raise ValueError("untilnsupported Input Transform")
    Z[:, feature] = feature_data
  return Z

'''
def undo_normalization(y_orig, y_pred):
  """Undo the applied normalization transform."""
  old_mean = np.mean(y_orig)
@@ -216,6 +398,7 @@ def undo_transform_outputs(y_raw, y_pred, output_transforms):
    return np.exp(undo_normalization(np.log(y_raw), y_pred))
  else:
    raise ValueError("Unsupported output transforms.")
'''

def transform_outputs(y, W, output_transforms):
  """Tranform the provided outputs