Commit 25f19f49 authored by Bharath Ramsundar's avatar Bharath Ramsundar
Browse files

Halfway through refactoring data-load/featurization code.

parent 6963843d
Loading
Loading
Loading
Loading
+14 −14
Original line number Diff line number Diff line
@@ -9,9 +9,9 @@ import numpy as np
import pandas as pd
import joblib
import os
from deepchem.utils.dataset import NumpyDataset
from deepchem.utils.dataset import load_sharded_dataset
from deepchem.utils.dataset import save_sharded_dataset
from deepchem.utils.dataset import ShardedDataset
from deepchem.utils.dataset import load_from_disk
from deepchem.utils.dataset import save_to_disk

class Model(object):
  """
@@ -85,7 +85,7 @@ class Model(object):

  def load(self, model_dir):
    """Dispatcher function for loading."""
    params = load_sharded_dataset(self.get_model_filename(model_dir))
    params = load_from_disk(self.get_model_filename(model_dir))
    self.model_params = params["model_params"]
    self.task_types = params["task_types"]
    self.model_type = params["model_type"]
@@ -95,16 +95,16 @@ class Model(object):
    params = {"model_params" : self.model_params,
              "task_types" : self.task_types,
              "model_type": self.model_type}
    save_sharded_dataset(params, self.get_params_filename(out_dir))
    save_to_disk(params, self.get_params_filename(out_dir))

  def fit(self, numpy_dataset):
  def fit(self, sharded_dataset):
    """
    Fits a model on data in a NumpyDataset object.
    Fits a model on data in a ShardedDataset object.
    """
    # TODO(rbharath/enf): This GPU_RAM is black magic. Needs to be removed/made
    # more general.
    MAX_GPU_RAM = float(691007488/50)
    for (X, y, w, _) in numpy_dataset.itershards():
    for (X, y, w, _) in sharded_dataset.itershards():
      if sys.getsizeof(X) > MAX_GPU_RAM:
        nb_block = float(sys.getsizeof(X))/MAX_GPU_RAM
        nb_sample = np.shape(X)[0]
@@ -123,11 +123,11 @@ class Model(object):

  # TODO(rbharath): The structure of the produced df might be
  # complicated. Better way to model?
  def predict(self, numpy_dataset):
  def predict(self, sharded_dataset):
    """
    Uses self to make predictions on provided NumpyDataset object.
    Uses self to make predictions on provided ShardedDataset object.
    """
    task_names = numpy_dataset.get_task_names()
    task_names = sharded_dataset.get_task_names()
    pred_task_names = ["%s_pred" % task_name for task_name in task_names]
    w_task_names = ["%s_weight" % task_name for task_name in task_names]
    column_names = (['ids'] + task_names + pred_task_names + w_task_names
@@ -137,7 +137,7 @@ class Model(object):
    # TODO(rbharath/enf): This is only for GPU models, and is currently depends
    # on magic numbers.
    MAX_GPU_RAM = float(691007488/50)
    for (X, y, w, ids) in numpy_dataset.itershards():
    for (X, y, w, ids) in sharded_dataset.itershards():
      if sys.getsizeof(X) > MAX_GPU_RAM:
        nb_block = float(sys.getsizeof(X))/MAX_GPU_RAM
        nb_sample = np.shape(X)[0]
@@ -156,8 +156,8 @@ class Model(object):
      shard_df[task_names] = y
      shard_df[pred_task_names] = y_pred
      shard_df[w_task_names] = w
      shard_df["y_means"] = numpy_dataset.get_label_means() 
      shard_df["y_stds"] = numpy_dataset.get_label_stds() 
      shard_df["y_means"] = sharded_dataset.get_label_means() 
      shard_df["y_stds"] = sharded_dataset.get_label_stds() 
      pred_y_df = pd.concat([pred_y_df, shard_df])

    return pred_y_df 
+9 −59
Original line number Diff line number Diff line
@@ -42,15 +42,20 @@ class SklearnModel(Model):
      else:
        raise ValueError("Invalid model type provided.")

  # TODO(rbharath): This is a partial implementation! Does not work for a
  # datasets with more than one shard. 
  # TODO(rbharath): This does not work with very large datasets! sklearn does
  # support partial_fit, but only for some models. Might make sense to make
  # PartialSklearnModel subclass at some point to support large data models.
  def fit(self, numpy_dataset):
    """
    Fits SKLearn model to data.
    """
    Xs, ys = [], []
    for (X, y, _, _) in numpy_dataset.itershards():
      Xs.append(X)
      ys.append(y)
    X = np.concatenate(Xs)
    y = np.concatenate(ys)
    self.raw_model.fit(X, y)
      return

  def predict_on_batch(self, X):
    """
@@ -76,58 +81,3 @@ Model.register_model_type("ridge", SklearnModel)
Model.register_model_type("lasso", SklearnModel)
Model.register_model_type("lasso_lars", SklearnModel)
Model.register_model_type("elastic_net", SklearnModel)


# TODO(rbharath): Need to fix singletask dataset support.
'''
def fit_singletask_models(train_data, modeltype):
  """Fits singletask linear regression models to potency.

  Parameters
  ----------
  paths: list
    List of paths to datasets.
  modeltype: String
    A string describing the model to be trained. Options are RandomForest,
  splittype: string
    Type of split for train/test. Either random or scaffold.
  seed: int (optional)
    Seed to initialize np.random.
  output_transforms: dict
    dict mapping task names to label transform. Each output type must be either
    None or "log". Only for regression outputs.
  """
  models = {}
  import numpy as np
  X_train = train_data["features"]
  sorted_tasks = train_data["sorted_tasks"]
  for task in sorted_tasks:
    print "Building model for task %s" % task
    (y_train, W_train) = train_data[task]
    W_train = W_train.ravel()
    task_X_train = X_train[W_train.nonzero()]
    task_y_train = y_train[W_train.nonzero()]
    if modeltype == "rf_regressor":
      model = RandomForestRegressor(
          n_estimators=500, n_jobs=-1, warm_start=True, max_features="sqrt")
    elif modeltype == "rf_classifier":
      model = RandomForestClassifier(
          n_estimators=500, n_jobs=-1, warm_start=True, max_features="sqrt")
    elif modeltype == "logistic":
      model = LogisticRegression(class_weight="auto")
    elif modeltype == "linear":
      model = LinearRegression(normalize=True)
    elif modeltype == "ridge":
      model = RidgeCV(alphas=[0.01, 0.1, 1.0, 10.0], normalize=True)
    elif modeltype == "lasso":
      model = LassoCV(max_iter=2000, n_jobs=-1)
    elif modeltype == "lasso_lars":
      model = LassoLarsCV(max_iter=2000, n_jobs=-1)
    elif modeltype == "elastic_net":
      model = ElasticNetCV(max_iter=2000, n_jobs=-1)
    else:
      raise ValueError("Invalid model type provided.")
    model.fit(task_X_train, task_y_train.ravel())
    models[task] = model
  return models
'''
+26 −29
Original line number Diff line number Diff line
@@ -10,11 +10,11 @@ from rdkit import Chem
import joblib
from vs_utils.utils import ScaffoldGenerator

def save_sharded_dataset(dataset, filename):
def save_to_disk(dataset, filename):
  """Save a dataset to file."""
  joblib.dump(dataset, filename, compress=0)

def load_sharded_dataset(filename):
def load_from_disk(filename):
  """Load a dataset from file."""
  dataset = joblib.load(filename)
  return dataset
@@ -73,10 +73,10 @@ def get_sorted_task_names(df):
  """
  column_names = df.keys()
  task_names = (set(column_names) - 
                set(FeaturizedDataset.colnames))
                set(FeaturizedSamples.colnames))
  return sorted(list(task_names))

class FeaturizedDataset(object):
class FeaturizedSamples(object):
  """
  Wrapper class for featurized data on disk.
  """
@@ -87,9 +87,6 @@ class FeaturizedDataset(object):

  def __init__(self, paths=None, dataset_files=[], compound_df=None):
    if paths is not None:
      print("FeaturizedDataset()")
      print("paths")
      print(paths)
      for path in paths:
        dataset_files += glob.glob(os.path.join(path, "*.joblib"))
    self.dataset_files = dataset_files
@@ -103,7 +100,7 @@ class FeaturizedDataset(object):
    """
    compound_rows = []
    for dataset_file in self.dataset_files:
      df = load_sharded_dataset(dataset_file)
      df = load_from_disk(dataset_file)
      compound_ids = list(df["mol_id"])
      smiles = list(df["smiles"])
      splits = list(df["split"])
@@ -127,12 +124,12 @@ class FeaturizedDataset(object):

  def _train_test_from_indices(self, train_inds, test_inds):
    """
    Helper to generate train/test FeaturizedDatasets.
    Helper to generate train/test FeaturizedSampless.
    """
    train_dataset = FeaturizedDataset(
    train_dataset = FeaturizedSamples(
        dataset_files=self.dataset_files,
        compound_df=self.compound_df.iloc[train_inds])
    test_dataset = FeaturizedDataset(
    test_dataset = FeaturizedSamples(
        dataset_files=self.dataset_files,
        compound_df=self.compound_df.iloc[test_inds])
    return train_dataset, test_dataset
@@ -208,11 +205,11 @@ class FeaturizedDataset(object):
                                        'w',
                                        'X_sums', 'X_sum_squares', 'X_n',
                                        'y_sums', 'y_sum_squares', 'y_n')) 
    return NumpyDataset(out_dir, metadata_df)
    return ShardedDataset(out_dir, metadata_df)

def write_dataset_single(df_file, out_dir, mode, feature_types):
  print("Examining %s" % df_file)
  df = load_sharded_dataset(df_file)
  df = load_from_disk(df_file)
  task_names = get_sorted_task_names(df)
  ids, X, y, w = df_to_numpy(df, mode, feature_types)
  X_sums, X_sum_squares, X_n = compute_sums_and_nb_sample(X)
@@ -226,17 +223,17 @@ def write_dataset_single(df_file, out_dir, mode, feature_types):
  out_w = os.path.join(out_dir, "%s-w.joblib" % basename)
  out_ids = os.path.join(out_dir, "%s-ids.joblib" % basename)

  save_sharded_dataset(X, out_X)
  save_sharded_dataset(y, out_y)
  save_sharded_dataset(w, out_w)
  save_sharded_dataset(ids, out_ids)
  save_to_disk(X, out_X)
  save_to_disk(y, out_y)
  save_to_disk(w, out_w)
  save_to_disk(ids, out_ids)
  return([df_file, task_names, out_ids, out_X, out_X_transformed, out_y, 
          out_y_transformed, out_w,
          X_sums, X_sum_squares, X_n, 
          y_sums, y_sum_squares, y_n])


class NumpyDataset(object):
class ShardedDataset(object):
  """
  Wrapper class for dataset transformed into X, y, w numpy ndarrays.
  """
@@ -247,7 +244,7 @@ class NumpyDataset(object):
    """
    self.data_dir = data_dir
    if metadata_df is None:
      metadata_df = load_sharded_dataset(self.get_metadata_filename())
      metadata_df = load_from_disk(self.get_metadata_filename())
    self.metadata_df = metadata_df
    self.save_metadata()

@@ -265,7 +262,7 @@ class NumpyDataset(object):
    """
    if not len(self.metadata_df):
      raise ValueError("No data in dataset.")
    sample_X = load_sharded_dataset(self.metadata_df.iterrows().next()[1]['X'])[0]
    sample_X = load_from_disk(self.metadata_df.iterrows().next()[1]['X'])[0]
    return np.shape(sample_X)

  def get_metadata_filename(self):
@@ -279,7 +276,7 @@ class NumpyDataset(object):
    """
    Save metadata file to disk.
    """
    save_sharded_dataset(
    save_to_disk(
      self.metadata_df, self.get_metadata_filename())

  def get_number_shards(self):
@@ -295,10 +292,10 @@ class NumpyDataset(object):
    nb_shards = self.get_number_shards()
    for i, row in self.metadata_df.iterrows():
      print("Loading shard %d out of %d" % (i+1, nb_shards))
      X = load_sharded_dataset(row['X-transformed'])
      y = load_sharded_dataset(row['y-transformed'])
      w = load_sharded_dataset(row['w'])
      ids = load_sharded_dataset(row['ids'])
      X = load_from_disk(row['X-transformed'])
      y = load_from_disk(row['y-transformed'])
      w = load_from_disk(row['w'])
      ids = load_from_disk(row['ids'])
      yield (X, y, w, ids)
      

@@ -364,7 +361,7 @@ def _transform_row(i, df, normalize_X, normalize_y, truncate_X, truncate_y,
                      log_X, log_y, X_means, X_stds, y_means, y_stds, trunc):
  total = df.shape[0]
  row = df.iloc[i]
  X = load_sharded_dataset(row['X'])
  X = load_from_disk(row['X'])
  if normalize_X or log_X:
    if normalize_X:
      print("Normalizing X sample %d out of %d" % (i+1,total))
@@ -375,9 +372,9 @@ def _transform_row(i, df, normalize_X, normalize_y, truncate_X, truncate_y,
         X[X < (-1.0*trunc)] = -1.0 * trunc
    if log_X:
      X = np.log(X)
  save_sharded_dataset(X, row['X-transformed'])
  save_to_disk(X, row['X-transformed'])

  y = load_sharded_dataset(row['y'])
  y = load_from_disk(row['y'])
  if normalize_y or log_y:    
    if normalize_y:
      print("Normalizing y sample %d out of %d" % (i+1,total))
@@ -387,7 +384,7 @@ def _transform_row(i, df, normalize_X, normalize_y, truncate_X, truncate_y,
        y[y < (-1.0*trunc)] = -1.0 * trunc
    if log_y:
      y = np.log(y)
  save_sharded_dataset(y, row['y-transformed'])  
  save_to_disk(y, row['y-transformed'])  

# TODO(rbharath/enf): These need to be better integrated with new OO paradigm.
def compute_sums_and_nb_sample(tensor, W=None):
+8 −8
Original line number Diff line number Diff line
@@ -10,10 +10,10 @@ import numpy as np
import warnings
#from deepchem.utils.preprocess import undo_transform_outputs
#from deepchem.utils.preprocess import get_metadata_filename
from deepchem.utils.dataset import NumpyDataset
from deepchem.utils.dataset import ShardedDataset
from deepchem.utils.preprocess import get_task_type
from deepchem.utils.preprocess import undo_transform
from deepchem.utils.dataset import load_sharded_dataset
from deepchem.utils.dataset import load_from_disk
from deepchem.models import Model 
from sklearn.metrics import mean_squared_error
from sklearn.metrics import roc_auc_score
@@ -44,10 +44,10 @@ def compute_y_pred(model, data_dir, csv_out, split):
  """
  Computes model predictions on data and stores csv to disk.
  """
  test = NumpyDataset(data_dir)
  test = ShardedDataset(data_dir)
  task_names = test.get_task_names()
  #metadata_filename = get_metadata_filename(data_dir)
  #metadata_df = load_sharded_dataset(metadata_filename)
  #metadata_df = load_from_disk(metadata_filename)
  #task_names = metadata_df.iterrows().next()[1]['task_names']

  pred_y_df = model.predict(test)
@@ -66,10 +66,10 @@ def compute_y_pred(model, data_dir, csv_out, split):
  '''
  for i, row in split_df.iterrows():
    print("Evaluating on %s batch %d out of %d" % (split, i+1, nb_batch))
    X = load_sharded_dataset(row['X-transformed'])
    y = load_sharded_dataset(row['y-transformed'])
    w = load_sharded_dataset(row['w'])
    ids = load_sharded_dataset(row['ids'])
    X = load_from_disk(row['X-transformed'])
    y = load_from_disk(row['y-transformed'])
    w = load_from_disk(row['w'])
    ids = load_from_disk(row['ids'])
  '''
  '''
  MAX_GPU_RAM = float(691007488/50)
+125 −114
Original line number Diff line number Diff line
@@ -15,91 +15,137 @@ from functools import partial
from rdkit import Chem
from vs_utils.features.fingerprints import CircularFingerprint
from vs_utils.features.basic import SimpleDescriptors
from deepchem.utils.dataset import save_sharded_dataset
from deepchem.utils.dataset import load_sharded_dataset
from deepchem.utils.dataset import save_to_disk
from deepchem.utils.dataset import load_from_disk


def parse_float_input(val):
  """Safely parses a float input."""
  # TODO(rbharath): Correctly parse float ranges.
def _process_field(val):
  """Parse data in a field."""
  if isinstance(val, float) or isinstance(val, np.ndarray):
    return val
  elif isinstance(val, list):
    return [process_field(elt) for elt in val]
  elif isinstance(val, str):
    try:
    if val is None:
      return float(val)
    except ValueError:
      return val
  else:
      fval = float(val)
      return fval
  except ValueError:
    if ">" in val or "<" in val or "-" in val:
      return np.nan
    raise ValueError("Field of unrecognized type: %s" % str(val))

class Samples(object):
  """
  Handles loading/featurizing of chemical samples (datapoints).

  Currently knows how to load csv-files/pandas-dataframes/SDF-files.
  """
    
#TODO(enf/rbharath): make agnostic to input type.
def get_rows(input_file, input_type):
  def __init__(self, input_file, tasks, smiles_field, threshold,
               log_every_n=1000):
    """Extracts data from input as Pandas data frame"""
    rows = []
    self.tasks = tasks
    self.threshold = threshold
    self.input_file = input_file
    self.input_type = self._get_input_type(input_file)
    self.fields = self._get_fields(input_file)

    for ind, row in enumerate(self._get_raw_samples()):
      if ind % log_every_n == 0:
        print("Loading sample %d" % row_index)
      row.append(self._process_raw_sample(row))
    self.df = pd.DataFrame(rows)

  def get_samples(self):
    """Accessor for samples in this object."""
    return self.df.iterrows()

  def _get_fields(self):
    """Get the names of fields and field_types for input data."""
    # If CSV input, assume that first row contains labels
    if self.input_type == "csv":
      return self._get_raw_samples(self.input_file).next()
    elif self.input_type == "pandas":
      df = load_from_disk(self.input_file)
      return df.keys()
    elif self.input_type == "sdf":
      sample_mol = self.get_rows(self.input_file).next()
      return list(sample_mol.GetPropNames())
    else:
      raise ValueError("Unrecognized extension for %s" % self.input_file)

  def _get_input_type(self):
    """Get type of input file. Must be csv/pkl.gz/sdf file."""
    filename, file_extension = os.path.splitext(self.input_file)
    # If gzipped, need to compute extension again
    if file_extension == ".gz":
      filename, file_extension = os.path.splitext(filename)
    if file_extension == "csv":
      return "csv"
    elif file_extension == "pkl":
      return "pandas"
    elif file_extension == "sdf":
      return "sdf"
    else:
      raise ValueError("Unrecognized extension for %s" % input_file)

  def _get_raw_samples(self):
    """Returns an iterator over all rows in input_file"""
  # TODO(rbharath): This function loads into memory, which can be painful. The
  # right option here might be to create a class which internally handles data
  # loading.
    input_type = self.get_input_type(self.input_file)
    if input_type == "csv":
    with open(input_file, "rb") as inp_file_obj:
      reader = csv.reader(inp_file_obj)
      return [row for row in reader]
      with open(self.input_file, "rb") as inp_file_obj:
        for row in csv.reader(inp_file_obj):
          if row is not None:
            yield row
    elif input_type == "pandas":
    dataframe = load_sharded_dataset(input_file)
    return dataframe.iterrows()
      dataframe = load_from_disk(self.input_file)
      for row in dataframe.iterrows():
        yield row
    elif input_type == "sdf":
    if ".gz" in input_file:
      with gzip.open(input_file) as inp_file_obj:
      if ".gz" in self.input_file:
        with gzip.open(self.input_file) as inp_file_obj:
          supp = Chem.ForwardSDMolSupplier(inp_file_obj)
        mols = [mol for mol in supp if mol is not None]
      return mols
          for mol in supp:
            if mol is not None:
              yield mol
      else:
      with open(input_file) as inp_file_obj:
        with open(self.input_file) as inp_file_obj:
          supp = Chem.ForwardSDMolSupplier(inp_file_obj)
          mols = [mol for mol in supp if mol is not None]
      return mols

def get_colnames(row, input_type):
  """Get names of all columns."""
  if input_type == "csv":
    return row
          for mol in supp:
            if mol is not None:
              yield mol

def get_row_data(row, input_type, fields, smiles_field, colnames=None):
  def _process_raw_sample(self, row):
    """Extract information from row data."""
  row_data = {}
  if input_type == "csv":
    for ind, colname in enumerate(colnames):
      if colname in fields:
        row_data[colname] = row[ind]
  elif input_type == "pandas":
    # pandas rows are tuples (row_num, row_data)
    data = {}
    if self.input_type == "csv":
      for ind, field in enumerate(self.fields):
        data[field] = _process_field(row[ind])
      return data
    elif self.input_type == "pandas":
      # pandas rows are tuples (row_num, data)
      row = row[1]
    for field in fields:
      row_data[field] = row[field]
  elif input_type == "sdf":
      for field in self.fields:
        data[field] = _process_field(row[field])
    elif self.input_type == "sdf":
      mol = row
    for field in fields:
      row_data[smiles_field] = Chem.MolToSmiles(mol)
      for field in self.fields:
        if not mol.HasProp(field):
        row_data[field] = None
          data[field] = None
        else:
        row_data[field] = mol.GetProp(field)
  return row_data

def process_field(data, field_type):
  """Parse data in a field."""
  if field_type == "string":
    return data
  elif field_type == "float":
    return parse_float_input(data)
  elif field_type == "list-string":
    if isinstance(data, list):
      return data
          data[field] = _process_field(mol.GetProp(field))
      data["smiles"] = Chem.MolToSmiles(mol)
    else:
      return data.split(",")
  elif field_type == "list-float":
    return np.array(data.split(","))
  elif field_type == "ndarray":
      raise ValueError("Unrecognized input_type")
    if self.threshold is not None:
      for task in self.tasks:
        raw = _process_field(data[task])
        if not isinstance(raw, float):
          raise ValueError("Cannot threshold non-float fields.")
        data[field] = 1 if raw > threshold else 0
    return data


def add_vs_utils_features(df, featuretype, log_every_n=1000):
  """Generates circular fingerprints for dataset."""
  if featuretype == "fingerprints":
@@ -151,38 +197,7 @@ def standardize_df(ori_df, feature_fields, task_fields, smiles_field,

  return(df)

#TODO(enf/rbharath): This is broken for sdf files.
def extract_data(input_file, input_type, fields, field_types,
                 task_fields, smiles_field, threshold,
                 log_every_n=1000):
  """Extracts data from input as Pandas data frame"""
  rows = []
  colnames = []
  for row_index, raw_row in enumerate(get_rows(input_file, input_type)):
    if row_index % log_every_n == 0:
      print(row_index)
    # Skip empty rows
    if raw_row is None:
      continue
    # TODO(rbharath): The script expects that all columns in csv files
    # have column names attached. Check that this holds true somewhere
    # Get column names if csv and continue
    if input_type == "csv" and row_index == 0:
      colnames = get_colnames(raw_row, input_type)
      continue
    row, row_data = {}, get_row_data(raw_row, input_type, fields, smiles_field, colnames)
    for (field, field_type) in zip(fields, field_types):
      if field in task_fields and threshold is not None:
        raw_val = process_field(row_data[field], field_type)
        row[field] = 1 if raw_val > threshold else 0
      else:
        row[field] = process_field(row_data[field], field_type)
    #row["smiles"] = smiles.get_smiles(mol)
    rows.append(row)
  dataframe = pd.DataFrame(rows)
  return dataframe

def featurize_input(input_file, feature_dir, input_type, fields, field_types,
def featurize_input(self, feature_dir,
                    feature_fields, task_fields, smiles_field,
                    split_field, id_field, threshold):
  """Featurizes raw input data."""
@@ -191,7 +206,7 @@ def featurize_input(input_file, feature_dir, input_type, fields, field_types,
  if id_field is None:
    id_field = smiles_field

  df = extract_data(input_file, input_type, fields, field_types, task_fields,
  df = extract_data(self.input_file, input_type, fields, field_types, task_fields,
                    smiles_field, threshold)
  print("Standardizing User DataFrame")
  df = standardize_df(df, feature_fields, task_fields, smiles_field,
@@ -204,11 +219,9 @@ def featurize_input(input_file, feature_dir, input_type, fields, field_types,
  print("Writing DataFrame")
  df_filename = os.path.join(
      feature_dir, "%s.joblib" %(os.path.splitext(os.path.basename(input_file))[0]))
  save_sharded_dataset(df, df_filename)
  save_to_disk(df, df_filename)
  print("Finished saving.")

  return

def featurize_inputs(feature_dir, input_files, input_type, fields, field_types,
                     feature_fields, task_fields, smiles_field,
                     split_field, id_field, threshold):
@@ -222,8 +235,6 @@ def featurize_inputs(feature_dir, input_files, input_type, fields, field_types,
                                    split_field=split_field, id_field=id_field,
                                    threshold=threshold)

  #for input_file in input_files:
  #  featurize_input_partial(input_file)
  pool = mp.Pool(int(mp.cpu_count()/2))
  pool.map(featurize_input_partial, input_files)
  pool.terminate()
Loading