Commit 3ef691a7 authored by Bharath Ramsundar's avatar Bharath Ramsundar
Browse files

Progres refactoring keras and sklearn models

parent d8c0d194
Loading
Loading
Loading
Loading
+4 −0
Original line number Diff line number Diff line
@@ -335,6 +335,10 @@ class Dataset(object):
        shard_batch_size = n_samples
      else:
        shard_batch_size = batch_size 
      ############################################################### DEBUG
      print("n_samples, shard_batch_size")
      print(n_samples, shard_batch_size)
      ############################################################### DEBUG
      interval_points = np.linspace(
          0, n_samples, np.ceil(float(n_samples)/shard_batch_size)+1, dtype=int)
      for j in range(len(interval_points)-1):
+28 −37
Original line number Diff line number Diff line
@@ -27,22 +27,21 @@ class Model(object):
  """
  Abstract base class for different ML models.
  """
  def __init__(self, model_instance, n_tasks, model_dir,
  def __init__(self, model_instance, model_dir,
               fit_transformers=None, verbosity=None):
    """Abstract class for all models.
    Parameters:
    -----------
    model_instance: object
      Wrapper around ScikitLearn/Keras/Tensorflow model object.
    n_tasks: int
      Number of tasks for this model.
    model_dir: str
      Path to directory where model will be stored.
    """
    self.model_dir = model_dir
    if not os.path.exists(self.model_dir):
      os.makedirs(self.model_dir)
    self.model_instance = model_instance
    self.model_class = model_instance.__class__
    self.model_dir = model_dir
    self.n_tasks = n_tasks
    self.fit_transformers = fit_transformers

    assert verbosity in [None, "low", "high"]
@@ -91,23 +90,19 @@ class Model(object):
    return os.path.join(model_dir, "model_params.joblib")

  def save(self):
    """Dispatcher function for saving."""
    params = {"model_params" : self.model_params,
              "model_class": self.__class__}
    save_to_disk(params, Model.get_params_filename(self.model_dir))
    """Dispatcher function for saving.

    Each subclass is responsible for overriding this method.
    """
    raise NotImplementedError

  def fit(self, dataset):
  def fit(self, dataset, nb_epoch=10, batch_size=50, pad_batches=False):
    """
    Fits a model on data in a Dataset object.
    """
    # TODO(rbharath/enf): We need a structured way to deal with potential GPU
    #                     memory overflows.
    batch_size = self.model_params["batch_size"]
    if "pad_batches" in self.model_params:
      pad_batches = self.model_params["pad_batches"]
    else:
      pad_batches = False
    for epoch in range(self.model_params["nb_epoch"]):
    for epoch in range(nb_epoch):
      log("Starting epoch %s" % str(epoch+1), self.verbosity)
      losses = []
      for (X_batch, y_batch, w_batch, ids_batch) in dataset.iterbatches(
@@ -134,7 +129,7 @@ class Model(object):

    return X, y, w

  def predict(self, dataset, transformers=[]):
  def predict(self, dataset, transformers=[], batch_size=None):
    """
    Uses self to make predictions on provided Dataset object.

@@ -142,8 +137,7 @@ class Model(object):
      y_pred: numpy ndarray of shape (n_samples,)
    """
    y_preds = []
    batch_size = self.model_params["batch_size"]
    n_tasks = self.n_tasks
    n_tasks = self.get_num_tasks()
    for (X_batch, y_batch, w_batch, ids_batch) in dataset.iterbatches(
        batch_size, deterministic=True):
      n_samples = len(X_batch)
@@ -161,7 +155,7 @@ class Model(object):
      y_pred = np.reshape(y_pred, (n_samples,)) 
    return y_pred

  def predict_grad(self, dataset, transformers=[]):
  def predict_grad(self, dataset, transformers=[], batch_size=50):
    """
    Uses self to calculate gradient on provided Dataset object.

@@ -172,7 +166,6 @@ class Model(object):
      y_pred: numpy ndarray of shape (n_samples,)
    """
    grads = []
    batch_size = self.model_params["batch_size"]
    for (X_batch, y_batch, w_batch, ids_batch) in dataset.iterbatches(batch_size):
      energy_batch = self.predict_on_batch(X_batch)
      grad_batch = self.predict_grad_on_batch(X_batch)
@@ -182,7 +175,7 @@ class Model(object):
  
    return grad

  def evaluate_error(self, dataset, transformers=[]):
  def evaluate_error(self, dataset, transformers=[], batch_size=50):
    """
    Evaluate the error in energy and gradient components, forcebalance-style.

@@ -192,7 +185,6 @@ class Model(object):
    """
    y_preds = []
    y_train = []
    batch_size = self.model_params["batch_size"]
    for (X_batch, y_batch, w_batch, ids_batch) in dataset.iterbatches(batch_size):

      y_pred_batch = self.predict_on_batch(X_batch)
@@ -207,7 +199,7 @@ class Model(object):
    y_pred = np.vstack(y_preds)
    y = np.vstack(y_train)

    n_samples, n_tasks = len(dataset), self.n_tasks
    n_samples, n_tasks = len(dataset), self.get_num_tasks()
    n_atoms = int((n_tasks-1)/3)

    y_pred = np.reshape(y_pred, (n_samples, n_tasks)) 
@@ -231,7 +223,7 @@ class Model(object):
    
    return energy_error, grad_error

  def evaluate_error_class2(self, dataset, transformers=[]):
  def evaluate_error_class2(self, dataset, transformers=[], batch_size=50):
    """
    Evaluate the error in energy and gradient components, forcebalance-style.

@@ -242,7 +234,6 @@ class Model(object):
    y_preds = []
    y_train = []
    grads = []
    batch_size = self.model_params["batch_size"]
    for (X_batch, y_batch, w_batch, ids_batch) in dataset.iterbatches(batch_size):

      # untransformed E is needed for undo_grad_transform
@@ -264,7 +255,7 @@ class Model(object):
    y = np.vstack(y_train)
    grad = np.vstack(grads)

    n_samples, n_tasks = len(dataset), self.n_tasks
    n_samples, n_tasks = len(dataset), self.get_num_tasks()
    n_atoms = int((n_tasks-1)/3)

    y_pred = np.reshape(y_pred, (n_samples, n_tasks)) 
@@ -285,7 +276,7 @@ class Model(object):
    
    return energy_error, grad_error

  def test_fd_grad(self, dataset, transformers=[]):
  def test_fd_grad(self, dataset, transformers=[], batch_size=50):
    """
    Uses self to calculate finite difference gradient on provided Dataset object.
    Currently only useful if your task is energy and self contains predict_grad_on_batch.
@@ -298,7 +289,6 @@ class Model(object):
      y_pred: numpy ndarray of shape (n_samples,)
    """
    y_preds = []
    batch_size = self.model_params["batch_size"]
    for (X_batch, y_batch, w_batch, ids_batch) in dataset.iterbatches(batch_size):

      for xb in X_batch:
@@ -345,7 +335,8 @@ class Model(object):
    return y_pred


  def predict_proba(self, dataset, transformers=[], n_classes=2):
  def predict_proba(self, dataset, transformers=[], batch_size=None,
                    n_classes=2):
    """
    TODO: Do transformers even make sense here?

@@ -353,8 +344,7 @@ class Model(object):
      y_pred: numpy ndarray of shape (n_samples, n_classes*n_tasks)
    """
    y_preds = []
    batch_size = self.model_params["batch_size"]
    n_tasks = self.n_tasks
    n_tasks = self.get_num_tasks()
    for (X_batch, y_batch, w_batch, ids_batch) in dataset.iterbatches(
        batch_size, deterministic=True):
      y_pred_batch = self.predict_proba_on_batch(X_batch)
@@ -374,9 +364,10 @@ class Model(object):
    """
    Currently models can only be classifiers or regressors.
    """
    ################################################################## DEBUG
    # TODO(rbharath): This is a hack based on fact that multi-tasktype models
    # aren't supported.
    #return self.task_types.itervalues().next()
    raise NotImplementedError
    ################################################################## DEBUG

  def get_num_tasks(self):
    """
    Get number of tasks.
    """
    raise NotImplementedError
+24 −8
Original line number Diff line number Diff line
@@ -8,6 +8,7 @@ from __future__ import unicode_literals
import os
import numpy as np
from keras.models import Graph
from keras.models import load_model
from keras.models import model_from_json
from keras.layers.core import Dense, Dropout, Activation
from keras.layers.normalization import BatchNormalization 
@@ -23,23 +24,23 @@ class KerasModel(Model):
    """
    Saves underlying keras model to disk.
    """
    super(KerasModel, self).save()
    model = self.model_instance
    filename, _ = os.path.splitext(Model.get_model_filename(self.model_dir))

    # Note that keras requires the model architecture and weights to be stored
    # separately. A json file is generated that specifies the model architecture.
    # The weights will be stored in an h5 file. The pkl.gz file with store the
    # target name.
    ## Note that keras requires the model architecture and weights to be stored
    ## separately. A json file is generated that specifies the model architecture.
    ## The weights will be stored in an h5 file. The pkl.gz file with store the
    ## target name.
    json_filename = "%s.%s" % (filename, "json")
    h5_filename = "%s.%s" % (filename, "h5")
    self.model_instance.save(h5_filename)
    # Save architecture
    json_string = model.to_json()
    with open(json_filename, "wb") as file_obj:
      file_obj.write(json_string)
    model.save_weights(h5_filename, overwrite=True)

  def reload(self):
  def reload(self, custom_objects={}):
    """
    Load keras multitask DNN from disk.
    """
@@ -50,6 +51,21 @@ class KerasModel(Model):
    h5_filename = "%s.%s" % (filename, "h5")

    with open(json_filename) as file_obj:
      model = model_from_json(file_obj.read())
      model = model_from_json(file_obj.read(), custom_objects=custom_objects)
    model.load_weights(h5_filename)
    self.model_instnace = model
    self.model_instance = model

  def predict_on_batch(self, X):
    return self.model_instance.predict_on_batch(X)

  # TODO(rbharath): The methods below aren't extensible and depend on
  # implementation details of fcnet. Better way to expose this information?
  def fit_on_batch(self, X, y, w):
    """Fit model on batch of data."""
    return self.model_instance.fit_on_batch(X, y, w)

  def get_num_tasks(self):
    return self.model_instance.n_tasks
  
  def predict_proba_on_batch(self, X, n_classes=2):
    return self.model_instance.predict_proba_on_batch(X, n_classes)
+26 −5
Original line number Diff line number Diff line
@@ -25,12 +25,13 @@ class MultiTaskDNN(Graph):
  def __init__(self, n_tasks, n_inputs, task_type, nb_layers=1, nb_hidden=1000,
               init="glorot_uniform", batchnorm=False, dropout=0.5,
               activation="relu", learning_rate=.001, decay=1e-6,
               momentum=0.9, nesterov=False, verbosity="low"):
               momentum=0.9, nesterov=False):
    super(MultiTaskDNN, self).__init__()
    # Store hyperparameters
    assert task_type in ["classification", "regression"]
    self.task_type = task_type
    self.n_inputs = n_inputs
    self.n_tasks = n_tasks
    self.nb_layers = nb_layers
    self.nb_hidden = nb_hidden
    self.init = init
@@ -89,6 +90,26 @@ class MultiTaskDNN(Graph):
              nesterov=self.nesterov)
    self.compile(optimizer=sgd, loss=loss_dict)

  def get_config(self):
    return {"n_inputs": self.n_inputs,
            "n_tasks": self.n_tasks,
            "task_type": self.task_type,
            "nb_layers": self.nb_layers,
            "nb_hidden": self.nb_hidden,
            "init": self.init,
            "batchnorm": self.batchnorm,
            "dropout": self.dropout,
            "activation": self.activation,
            "learning_rate": self.learning_rate,
            "decay": self.decay,
            "momentum": self.momentum,
            "nesterov": self.nesterov,
            }

  @classmethod
  def from_config(cls, config):
    return cls(**config)

  def get_data_dict(self, X, y=None):
    """Wrap data X in dict for graph computations (Keras graph only for now)."""
    data = {}
@@ -118,7 +139,7 @@ class MultiTaskDNN(Graph):
    w = w + eps * np.ones(np.shape(w))
    data = self.get_data_dict(X, y)
    sample_weight = self.get_sample_weight(w)
    loss = self.raw_model.train_on_batch(data, sample_weight=sample_weight)
    loss = self.train_on_batch(data, sample_weight=sample_weight)
    return loss

  def predict_on_batch(self, X):
@@ -126,7 +147,7 @@ class MultiTaskDNN(Graph):
    Makes predictions on given batch of new data.
    """
    data = self.get_data_dict(X)
    y_pred_dict = self.raw_model.predict_on_batch(data)
    y_pred_dict = super(MultiTaskDNN, self).predict_on_batch(data)
    nb_samples = np.shape(X)[0]
    y_pred = np.zeros((nb_samples, self.n_tasks))
    for task in range(self.n_tasks):
@@ -146,10 +167,10 @@ class MultiTaskDNN(Graph):
    Makes predictions on given batch of new data.
    """
    data = self.get_data_dict(X)
    y_pred_dict = self.raw_model.predict_on_batch(data)
    y_pred_dict = super(MultiTaskDNN, self).predict_on_batch(data)
    n_samples = np.shape(X)[0]
    y_pred = np.zeros((n_samples, self.n_tasks, n_classes))
    for task in rand(self.n_tasks):
    for task in range(self.n_tasks):
      taskname = "task%d" % task 
      y_pred_task = np.squeeze(y_pred_dict[taskname])
      y_pred[:, task] = y_pred_task
+12 −31
Original line number Diff line number Diff line
@@ -18,25 +18,7 @@ class SklearnModel(Model):
  """
  Abstract base class for different ML models.
  """
  def __init__(self, tasks, task_types, model_params, model_dir, fit_transformers=None,
               model_instance=None, initialize_raw_model=True, verbosity=None,
               mode="classification"):
    super(SklearnModel, self).__init__(
        tasks, task_types, model_params, model_dir,
        fit_transformers=fit_transformers, 
        initialize_raw_model=initialize_raw_model)
    self.model_dir = model_dir
    self.task_types = task_types
    self.model_params = model_params
    self.raw_model = model_instance
    self.verbosity = verbosity
    assert mode in ["classification", "regression"]
    self.mode = mode

  # TODO(rbharath): This does not work with very large datasets! sklearn does
  # support partial_fit, but only for some models. Might make sense to make
  # PartialSklearnModel subclass at some point to support large data models.
  # Also, use of batch_size=32 is arbitrary and kludgey
  def fit(self, dataset):
    """
    Fits SKLearn model to data.
@@ -44,39 +26,38 @@ class SklearnModel(Model):
    X, y, w, _ = dataset.to_numpy()
    y, w = np.squeeze(y), np.squeeze(w)
    # Logistic regression doesn't support weights
    if not isinstance(self.raw_model, LogisticRegression):
      self.raw_model.fit(X, y, w)
    if not isinstance(self.model_instance, LogisticRegression):
      self.model_instance.fit(X, y, w)
    else:
      self.raw_model.fit(X, y)
    y_pred_raw = self.raw_model.predict(X)
      self.model_instance.fit(X, y)
    y_pred_raw = self.model_instance.predict(X)

  def predict_on_batch(self, X):
    """
    Makes predictions on batch of data.
    """
    return self.raw_model.predict(X)
    return self.model_instance.predict(X)

  def predict_proba_on_batch(self, X):
    """
    Makes per-class predictions on batch of data.
    """
    return self.raw_model.predict_proba(X)
    return self.model_instance.predict_proba(X)

  def predict(self, X, transformers=[]):
    """
    Makes predictions on dataset.
    """
    # Sets batch_size which the default impl in Model expects
    #TODO(enf/rbharath): This is kludgy. Fix later.
    if "batch_size" not in self.model_params.keys():
      self.model_params["batch_size"] = None 
    return super(SklearnModel, self).predict(X, transformers)

  def save(self):
    """Saves sklearn model to disk using joblib."""
    super(SklearnModel, self).save()
    save_to_disk(self.raw_model, self.get_model_filename(self.model_dir))
    save_to_disk(self.model_instance, self.get_model_filename(self.model_dir))

  def reload(self):
    """Loads sklearn model from joblib file on disk."""
    self.raw_model = load_from_disk(Model.get_model_filename(self.model_dir))
    self.model_instance = load_from_disk(Model.get_model_filename(self.model_dir))

  def get_num_tasks(self):
    """Number of tasks for this model. Defaults to 1"""
    return 1
Loading