Commit 995cfb79 authored by Shakthi Visagan's avatar Shakthi Visagan
Browse files

lint error

parent a28ce71a
Loading
Loading
Loading
Loading
+443 −537
Original line number Original line Diff line number Diff line
@@ -29,7 +29,7 @@ class TestLayers(test_util.TensorFlowTestCase):
    assert tf.reduce_sum(cos_sim_orth) == 0  # True
    assert tf.reduce_sum(cos_sim_orth) == 0  # True
    assert all([cos_sim_orth.shape[dim] == 256 for dim in range(2)])  # True
    assert all([cos_sim_orth.shape[dim] == 256 for dim in range(2)])  # True


def test_highway():
  def test_highway(self):
    """Test invoking Highway."""
    """Test invoking Highway."""
    width = 5
    width = 5
    batch_size = 10
    batch_size = 10
@@ -51,8 +51,7 @@ def test_highway():
    result3 = layer(input)
    result3 = layer(input)
    assert np.allclose(result, result3)
    assert np.allclose(result, result3)



  def test_combine_mean_std(self):
def test_combine_mean_std():
    """Test invoking CombineMeanStd."""
    """Test invoking CombineMeanStd."""
    mean = np.random.rand(5, 3).astype(np.float32)
    mean = np.random.rand(5, 3).astype(np.float32)
    std = np.random.rand(5, 3).astype(np.float32)
    std = np.random.rand(5, 3).astype(np.float32)
@@ -63,8 +62,7 @@ def test_combine_mean_std():
    assert not np.array_equal(result2, mean)
    assert not np.array_equal(result2, mean)
    assert np.allclose(result2, mean, atol=0.1)
    assert np.allclose(result2, mean, atol=0.1)



  def test_stack(self):
def test_stack():
    """Test invoking Stack."""
    """Test invoking Stack."""
    input1 = np.random.rand(5, 4).astype(np.float32)
    input1 = np.random.rand(5, 4).astype(np.float32)
    input2 = np.random.rand(5, 4).astype(np.float32)
    input2 = np.random.rand(5, 4).astype(np.float32)
@@ -73,8 +71,7 @@ def test_stack():
    assert np.array_equal(input1, result[:, 0, :])
    assert np.array_equal(input1, result[:, 0, :])
    assert np.array_equal(input2, result[:, 1, :])
    assert np.array_equal(input2, result[:, 1, :])



  def test_variable(self):
def test_variable():
    """Test invoking Variable."""
    """Test invoking Variable."""
    value = np.random.rand(5, 4).astype(np.float32)
    value = np.random.rand(5, 4).astype(np.float32)
    layer = layers.Variable(value)
    layer = layers.Variable(value)
@@ -83,8 +80,7 @@ def test_variable():
    assert np.allclose(result, value)
    assert np.allclose(result, value)
    assert len(layer.trainable_variables) == 1
    assert len(layer.trainable_variables) == 1



  def test_interatomic_l2_distances(self):
def test_interatomic_l2_distances():
    """Test invoking InteratomicL2Distances."""
    """Test invoking InteratomicL2Distances."""
    atoms = 5
    atoms = 5
    neighbors = 2
    neighbors = 2
@@ -99,14 +95,13 @@ def test_interatomic_l2_distances():
        dist2 = np.dot(delta, delta)
        dist2 = np.dot(delta, delta)
        assert np.allclose(dist2, result[atom, neighbor])
        assert np.allclose(dist2, result[atom, neighbor])



  def test_weave_layer(self):
def test_weave_layer():
    """Test invoking WeaveLayer."""
    """Test invoking WeaveLayer."""
    out_channels = 2
    out_channels = 2
    n_atoms = 4  # In CCC and C, there are 4 atoms
    n_atoms = 4  # In CCC and C, there are 4 atoms
    raw_smiles = ['CCC', 'C']
    raw_smiles = ['CCC', 'C']
  from rdkit import Chem
    import rdkit
  mols = [Chem.MolFromSmiles(s) for s in raw_smiles]
    mols = [rdkit.Chem.MolFromSmiles(s) for s in raw_smiles]
    featurizer = dc.feat.WeaveFeaturizer()
    featurizer = dc.feat.WeaveFeaturizer()
    mols = featurizer.featurize(mols)
    mols = featurizer.featurize(mols)
    weave = layers.WeaveLayer()
    weave = layers.WeaveLayer()
@@ -142,89 +137,13 @@ def test_weave_layer():
    outputs = weave(inputs)
    outputs = weave(inputs)
    assert len(outputs) == 2
    assert len(outputs) == 2



  def test_graph_conv(self):
def test_weave_gather():
  """Test invoking WeaveGather."""
  out_channels = 2
  n_atoms = 4  # In CCC and C, there are 4 atoms
  raw_smiles = ['CCC', 'C']
  from rdkit import Chem
  mols = [Chem.MolFromSmiles(s) for s in raw_smiles]
  featurizer = dc.feat.WeaveFeaturizer()
  mols = featurizer.featurize(mols)
  atom_feat = []
  atom_split = []
  for im, mol in enumerate(mols):
    n_atoms = mol.get_num_atoms()
    atom_split.extend([im] * n_atoms)

    # atom features
    atom_feat.append(mol.get_atom_features())
  inputs = [
      np.array(np.concatenate(atom_feat, axis=0), dtype=np.float32),
      np.array(atom_split)
  ]
  # Try without compression
  gather = layers.WeaveGather(batch_size=2, n_input=75, gaussian_expand=True)
  # Outputs should be [mol1_vec, mol2_vec)
  outputs = gather(inputs)
  assert len(outputs) == 2
  assert np.array(outputs[0]).shape == (11 * 75,)
  assert np.array(outputs[1]).shape == (11 * 75,)

  # Try with compression
  gather = layers.WeaveGather(
      batch_size=2,
      n_input=75,
      gaussian_expand=True,
      compress_post_gaussian_expansion=True)
  # Outputs should be [mol1_vec, mol2_vec)
  outputs = gather(inputs)
  assert len(outputs) == 2
  assert np.array(outputs[0]).shape == (75,)
  assert np.array(outputs[1]).shape == (75,)


def test_weave_gather_gaussian_histogram():
  """Test Gaussian Histograms."""
  import tensorflow as tf
  from rdkit import Chem
  out_channels = 2
  n_atoms = 4  # In CCC and C, there are 4 atoms
  raw_smiles = ['CCC', 'C']
  mols = [Chem.MolFromSmiles(s) for s in raw_smiles]
  featurizer = dc.feat.WeaveFeaturizer()
  mols = featurizer.featurize(mols)
  gather = layers.WeaveGather(batch_size=2, n_input=75)
  atom_feat = []
  atom_split = []
  for im, mol in enumerate(mols):
    n_atoms = mol.get_num_atoms()
    atom_split.extend([im] * n_atoms)

    # atom features
    atom_feat.append(mol.get_atom_features())
  inputs = [
      np.array(np.concatenate(atom_feat, axis=0), dtype=np.float32),
      np.array(atom_split)
  ]
  #per_mol_features = tf.math.segment_sum(inputs[0], inputs[1])
  outputs = gather.gaussian_histogram(inputs[0])
  # Gaussian histograms expands into 11 Gaussian buckets.
  assert np.array(outputs).shape == (
      4,
      11 * 75,
  )
  #assert np.array(outputs[1]).shape == (11 * 75,)


def test_graph_conv():
    """Test invoking GraphConv."""
    """Test invoking GraphConv."""
    out_channels = 2
    out_channels = 2
    n_atoms = 4  # In CCC and C, there are 4 atoms
    n_atoms = 4  # In CCC and C, there are 4 atoms
    raw_smiles = ['CCC', 'C']
    raw_smiles = ['CCC', 'C']
  from rdkit import Chem
    import rdkit
  mols = [Chem.MolFromSmiles(s) for s in raw_smiles]
    mols = [rdkit.Chem.MolFromSmiles(s) for s in raw_smiles]
    featurizer = dc.feat.graph_features.ConvMolFeaturizer()
    featurizer = dc.feat.graph_features.ConvMolFeaturizer()
    mols = featurizer.featurize(mols)
    mols = featurizer.featurize(mols)
    multi_mol = dc.feat.mol_graphs.ConvMol.agglomerate_mols(mols)
    multi_mol = dc.feat.mol_graphs.ConvMol.agglomerate_mols(mols)
@@ -239,13 +158,12 @@ def test_graph_conv():
    num_deg = 2 * layer.max_degree + (1 - layer.min_degree)
    num_deg = 2 * layer.max_degree + (1 - layer.min_degree)
    assert len(layer.trainable_variables) == 2 * num_deg
    assert len(layer.trainable_variables) == 2 * num_deg



  def test_graph_pool(self):
def test_graph_pool():
    """Test invoking GraphPool."""
    """Test invoking GraphPool."""
    n_atoms = 4  # In CCC and C, there are 4 atoms
    n_atoms = 4  # In CCC and C, there are 4 atoms
    raw_smiles = ['CCC', 'C']
    raw_smiles = ['CCC', 'C']
  from rdkit import Chem
    import rdkit
  mols = [Chem.MolFromSmiles(s) for s in raw_smiles]
    mols = [rdkit.Chem.MolFromSmiles(s) for s in raw_smiles]
    featurizer = dc.feat.graph_features.ConvMolFeaturizer()
    featurizer = dc.feat.graph_features.ConvMolFeaturizer()
    mols = featurizer.featurize(mols)
    mols = featurizer.featurize(mols)
    multi_mol = dc.feat.mol_graphs.ConvMol.agglomerate_mols(mols)
    multi_mol = dc.feat.mol_graphs.ConvMol.agglomerate_mols(mols)
@@ -258,15 +176,14 @@ def test_graph_pool():
    assert result.shape[0] == n_atoms
    assert result.shape[0] == n_atoms
    # TODO What should shape[1] be?  It's not documented.
    # TODO What should shape[1] be?  It's not documented.



  def test_graph_gather(self):
def test_graph_gather():
    """Test invoking GraphGather."""
    """Test invoking GraphGather."""
    batch_size = 2
    batch_size = 2
    n_features = 75
    n_features = 75
    n_atoms = 4  # In CCC and C, there are 4 atoms
    n_atoms = 4  # In CCC and C, there are 4 atoms
    raw_smiles = ['CCC', 'C']
    raw_smiles = ['CCC', 'C']
  from rdkit import Chem
    import rdkit
  mols = [Chem.MolFromSmiles(s) for s in raw_smiles]
    mols = [rdkit.Chem.MolFromSmiles(s) for s in raw_smiles]
    featurizer = dc.feat.graph_features.ConvMolFeaturizer()
    featurizer = dc.feat.graph_features.ConvMolFeaturizer()
    mols = featurizer.featurize(mols)
    mols = featurizer.featurize(mols)
    multi_mol = dc.feat.mol_graphs.ConvMol.agglomerate_mols(mols)
    multi_mol = dc.feat.mol_graphs.ConvMol.agglomerate_mols(mols)
@@ -279,8 +196,7 @@ def test_graph_gather():
    # TODO(rbharath): Why is it 2*n_features instead of n_features?
    # TODO(rbharath): Why is it 2*n_features instead of n_features?
    assert result.shape == (batch_size, 2 * n_features)
    assert result.shape == (batch_size, 2 * n_features)



  def test_lstm_step(self):
def test_lstm_step():
    """Test invoking LSTMStep."""
    """Test invoking LSTMStep."""
    max_depth = 5
    max_depth = 5
    n_test = 5
    n_test = 5
@@ -296,8 +212,7 @@ def test_lstm_step():
    assert c_out.shape == (n_test, n_feat)
    assert c_out.shape == (n_test, n_feat)
    assert len(layer.trainable_variables) == 1
    assert len(layer.trainable_variables) == 1



  def test_attn_lstm_embedding(self):
def test_attn_lstm_embedding():
    """Test invoking AttnLSTMEmbedding."""
    """Test invoking AttnLSTMEmbedding."""
    max_depth = 5
    max_depth = 5
    n_test = 5
    n_test = 5
@@ -311,8 +226,7 @@ def test_attn_lstm_embedding():
    assert support_out.shape == (n_support, n_feat)
    assert support_out.shape == (n_support, n_feat)
    assert len(layer.trainable_variables) == 4
    assert len(layer.trainable_variables) == 4



  def test_iter_ref_lstm_embedding(self):
def test_iter_ref_lstm_embedding():
    """Test invoking IterRefLSTMEmbedding."""
    """Test invoking IterRefLSTMEmbedding."""
    max_depth = 5
    max_depth = 5
    n_test = 5
    n_test = 5
@@ -326,8 +240,7 @@ def test_iter_ref_lstm_embedding():
    assert support_out.shape == (n_support, n_feat)
    assert support_out.shape == (n_support, n_feat)
    assert len(layer.trainable_variables) == 8
    assert len(layer.trainable_variables) == 8



  def test_vina_free_energy(self):
def test_vina_free_energy():
    """Test invoking VinaFreeEnergy."""
    """Test invoking VinaFreeEnergy."""
    n_atoms = 5
    n_atoms = 5
    m_nbrs = 1
    m_nbrs = 1
@@ -337,7 +250,8 @@ def test_vina_free_energy():
    stop = 4
    stop = 4
    X = np.random.rand(n_atoms, ndim).astype(np.float32)
    X = np.random.rand(n_atoms, ndim).astype(np.float32)
    Z = np.random.randint(0, 2, (n_atoms)).astype(np.float32)
    Z = np.random.randint(0, 2, (n_atoms)).astype(np.float32)
  layer = layers.VinaFreeEnergy(n_atoms, m_nbrs, ndim, nbr_cutoff, start, stop)
    layer = layers.VinaFreeEnergy(n_atoms, m_nbrs, ndim, nbr_cutoff, start,
                                  stop)
    result = layer([X, Z])
    result = layer([X, Z])
    assert len(layer.trainable_variables) == 6
    assert len(layer.trainable_variables) == 6
    assert result.shape == tuple()
    assert result.shape == tuple()
@@ -345,7 +259,8 @@ def test_vina_free_energy():
    # Creating a second layer should produce different results, since it has
    # Creating a second layer should produce different results, since it has
    # different random weights.
    # different random weights.


  layer2 = layers.VinaFreeEnergy(n_atoms, m_nbrs, ndim, nbr_cutoff, start, stop)
    layer2 = layers.VinaFreeEnergy(n_atoms, m_nbrs, ndim, nbr_cutoff, start,
                                   stop)
    result2 = layer2([X, Z])
    result2 = layer2([X, Z])
    assert not np.allclose(result, result2)
    assert not np.allclose(result, result2)


@@ -354,8 +269,7 @@ def test_vina_free_energy():
    result3 = layer([X, Z])
    result3 = layer([X, Z])
    assert np.allclose(result, result3)
    assert np.allclose(result, result3)



  def test_weighted_linear_combo(self):
def test_weighted_linear_combo():
    """Test invoking WeightedLinearCombo."""
    """Test invoking WeightedLinearCombo."""
    input1 = np.random.rand(5, 10).astype(np.float32)
    input1 = np.random.rand(5, 10).astype(np.float32)
    input2 = np.random.rand(5, 10).astype(np.float32)
    input2 = np.random.rand(5, 10).astype(np.float32)
@@ -365,8 +279,7 @@ def test_weighted_linear_combo():
    expected = input1 * layer.trainable_variables[0] + input2 * layer.trainable_variables[1]
    expected = input1 * layer.trainable_variables[0] + input2 * layer.trainable_variables[1]
    assert np.allclose(result, expected)
    assert np.allclose(result, expected)



  def test_neighbor_list(self):
def test_neighbor_list():
    """Test invoking NeighborList."""
    """Test invoking NeighborList."""
    N_atoms = 5
    N_atoms = 5
    start = 0
    start = 0
@@ -380,25 +293,25 @@ def test_neighbor_list():
    result = layer(coords)
    result = layer(coords)
    assert result.shape == (N_atoms, M_nbrs)
    assert result.shape == (N_atoms, M_nbrs)



  def test_atomic_convolution(self):
def test_atomic_convolution():
    """Test invoking AtomicConvolution."""
    """Test invoking AtomicConvolution."""
    batch_size = 4
    batch_size = 4
    max_atoms = 5
    max_atoms = 5
    max_neighbors = 2
    max_neighbors = 2
    dimensions = 3
    dimensions = 3
    params = [[5.0, 2.0, 0.5], [10.0, 2.0, 0.5]]
    params = [[5.0, 2.0, 0.5], [10.0, 2.0, 0.5]]
  input1 = np.random.rand(batch_size, max_atoms, dimensions).astype(np.float32)
    input1 = np.random.rand(batch_size, max_atoms,
                            dimensions).astype(np.float32)
    input2 = np.random.randint(
    input2 = np.random.randint(
        max_atoms, size=(batch_size, max_atoms, max_neighbors))
        max_atoms, size=(batch_size, max_atoms, max_neighbors))
  input3 = np.random.randint(1, 10, size=(batch_size, max_atoms, max_neighbors))
    input3 = np.random.randint(
        1, 10, size=(batch_size, max_atoms, max_neighbors))
    layer = layers.AtomicConvolution(radial_params=params)
    layer = layers.AtomicConvolution(radial_params=params)
    result = layer([input1, input2, input3])
    result = layer([input1, input2, input3])
    assert result.shape == (batch_size, max_atoms, len(params))
    assert result.shape == (batch_size, max_atoms, len(params))
    assert len(layer.trainable_variables) == 3
    assert len(layer.trainable_variables) == 3



  def test_alpha_share_layer(self):
def test_alpha_share_layer():
    """Test invoking AlphaShareLayer."""
    """Test invoking AlphaShareLayer."""
    batch_size = 10
    batch_size = 10
    length = 6
    length = 6
@@ -423,16 +336,14 @@ def test_alpha_share_layer():
    assert np.allclose(result[0], result3[0])
    assert np.allclose(result[0], result3[0])
    assert np.allclose(result[1], result3[1])
    assert np.allclose(result[1], result3[1])



  def test_sluice_loss(self):
def test_sluice_loss():
    """Test invoking SluiceLoss."""
    """Test invoking SluiceLoss."""
    input1 = np.ones((3, 4)).astype(np.float32)
    input1 = np.ones((3, 4)).astype(np.float32)
    input2 = np.ones((2, 2)).astype(np.float32)
    input2 = np.ones((2, 2)).astype(np.float32)
    result = layers.SluiceLoss()([input1, input2])
    result = layers.SluiceLoss()([input1, input2])
    assert np.allclose(result, 40.0)
    assert np.allclose(result, 40.0)



  def test_beta_share(self):
def test_beta_share():
    """Test invoking BetaShare."""
    """Test invoking BetaShare."""
    batch_size = 10
    batch_size = 10
    length = 6
    length = 6
@@ -455,8 +366,7 @@ def test_beta_share():
    result3 = layer([input1, input2])
    result3 = layer([input1, input2])
    assert np.allclose(result, result3)
    assert np.allclose(result, result3)



  def test_ani_feat(self):
def test_ani_feat():
    """Test invoking ANIFeat."""
    """Test invoking ANIFeat."""
    batch_size = 10
    batch_size = 10
    max_atoms = 5
    max_atoms = 5
@@ -466,8 +376,7 @@ def test_ani_feat():
    # TODO What should the output shape be?  It's not documented, and there
    # TODO What should the output shape be?  It's not documented, and there
    # are no other test cases for it.
    # are no other test cases for it.



  def test_graph_embed_pool_layer(self):
def test_graph_embed_pool_layer():
    """Test invoking GraphEmbedPoolLayer."""
    """Test invoking GraphEmbedPoolLayer."""
    V = np.random.uniform(size=(10, 100, 50)).astype(np.float32)
    V = np.random.uniform(size=(10, 100, 50)).astype(np.float32)
    adjs = np.random.uniform(size=(10, 100, 5, 100)).astype(np.float32)
    adjs = np.random.uniform(size=(10, 100, 5, 100)).astype(np.float32)
@@ -490,8 +399,7 @@ def test_graph_embed_pool_layer():
    assert np.allclose(result[0], result3[0])
    assert np.allclose(result[0], result3[0])
    assert np.allclose(result[1], result3[1])
    assert np.allclose(result[1], result3[1])



  def test_graph_cnn(self):
def test_graph_cnn():
    """Test invoking GraphCNN."""
    """Test invoking GraphCNN."""
    V = np.random.uniform(size=(10, 100, 50)).astype(np.float32)
    V = np.random.uniform(size=(10, 100, 50)).astype(np.float32)
    adjs = np.random.uniform(size=(10, 100, 5, 100)).astype(np.float32)
    adjs = np.random.uniform(size=(10, 100, 5, 100)).astype(np.float32)
@@ -511,8 +419,7 @@ def test_graph_cnn():
    result3 = layer([V, adjs])
    result3 = layer([V, adjs])
    assert np.allclose(result, result3)
    assert np.allclose(result, result3)



  def test_DAG_layer(self):
def test_DAG_layer():
    """Test invoking DAGLayer."""
    """Test invoking DAGLayer."""
    batch_size = 10
    batch_size = 10
    n_graph_feat = 30
    n_graph_feat = 30
@@ -547,8 +454,7 @@ def test_DAG_layer():
    ## TODO(rbharath): What is the shape of outputs supposed to be?
    ## TODO(rbharath): What is the shape of outputs supposed to be?
    ## I'm getting (7, 30) here. Where does 7 come from??
    ## I'm getting (7, 30) here. Where does 7 come from??



  def test_DAG_gather(self):
def test_DAG_gather():
    """Test invoking DAGGather."""
    """Test invoking DAGGather."""
    # TODO(rbharath): We need more documentation about why
    # TODO(rbharath): We need more documentation about why
    # these numbers work.
    # these numbers work.