Commit fe97bf6c authored by Atreya Majumdar's avatar Atreya Majumdar
Browse files

Combined MATAttention and MultiHeadedAttention

parent 904c8fad
Loading
Loading
Loading
Loading
+119 −211
Original line number Diff line number Diff line
@@ -66,6 +66,7 @@ class ScaleNorm(nn.Module):
    return x * norm


<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
class MultiHeadedMATAttention(nn.Module):
@@ -315,9 +316,14 @@ class MATEncoder(nn.Module):
>>>>>>> Tests for encoder
class MATEncoderLayer(nn.Module):
  """Encoder layer for use in the Molecular Attention Transformer [1]_.
=======
class MultiHeadedMATAttention(nn.Module):
  """Converts an existing attention layer to a multi-headed attention module.
>>>>>>> Combined MATAttention and MultiHeadedAttention

  The MATEncoder layer primarily consists of a self-attention layer (MultiHeadedMATAttention) and a feed-forward layer (PositionwiseFeedForward).
  This layer can be stacked multiple times to form an encoder.
  Multi-Headed attention the attention mechanism multiple times parallely through the multiple attention heads.
  Thus, different subsequences of a given sequences can be processed differently.
  The query, key and value parameters are split multiple ways and each split is passed separately through a different attention head.

  References
  ----------
@@ -326,252 +332,154 @@ class MATEncoderLayer(nn.Module):
  Examples
  --------
  >>> import deepchem as dc
  >>> from rdkit import Chem
  >>> mol = Chem.MolFromSmiles("CC")
  >>> adj_matrix = Chem.GetAdjacencyMatrix(mol)
  >>> distance_matrix = Chem.GetDistanceMatrix(mol)
  >>> layer = dc.models.torch_models.layers.MATEncoderLayer(dist_kernel = 'softmax', lambda_attention = 0.33, lambda_distance = 0.33, h = 8, sa_hsize = 1024, sa_dropout_p = 0.1, d_input = 1024, activation = 'relu', n_layers = 1, ff_dropout_p = 0.1, encoder_hsize = 1024, encoder_dropout_p = 0.1)
  >>> x = torch.Tensor([[1., 2.], [5., 6.]])
  >>> mask = torch.Tensor([[1., 1.], [1., 1.]])
  >>> output = layer(x, mask, sa_dropout_p = 0.0, adj_matrix = adj_matrix, distance_matrix = distance_matrix)
  >>> attention = dc.models.torch_models.layers.MATAttention('softmax', 0.33, 0.33')
  >>> self_attn_layer = dc.models.torch_models.layers.MultiHeadedAttention(dist_kernel = 'softmax', lambda_attention = 0.33, lambda_adistance = 0.33, h = 8, hsize = 1024, dropout_p = 0.1)
  """

  def __init__(self, dist_kernel: str, lambda_attention: float,
               lambda_distance: float, h: int, sa_hsize: int,
               sa_dropout_p: float, output_bias: bool, d_input: int,
               d_hidden: int, d_output: int, activation: str, n_layers: int,
               ff_dropout_p: float, encoder_hsize: int,
               encoder_dropout_p: float):
    """Initialize a MATEncoder layer.
  def __init__(self,
               dist_kernel,
               lambda_attention,
               lambda_distance,
               h,
               hsize,
               dropout_p,
               output_bias=True):
    """Initialize a multi-headed attention layer.

    Parameters
    ----------
    dist_kernel: str
      Kernel activation to be used. Can be either 'softmax' for softmax or 'exp' for exponential, for the self-attention layer.
      Kernel activation to be used. Can be either 'softmax' for softmax or 'exp' for exponential.
    lambda_attention: float
      Constant to be multiplied with the attention matrix in the self-attention layer.
      Constant to be multiplied with the attention matrix.
    lambda_distance: float
      Constant to be multiplied with the distance matrix in the self-attention layer.
      Constant to be multiplied with the distance matrix.
    h: int
      Number of attention heads for the self-attention layer.
    sa_hsize: int
      Size of dense layer in the self-attention layer.
    sa_dropout_p: float
      Dropout probability for the self-attention layer.
    output_bias: bool
      If True, dense layers will use bias vectors in the self-attention layer.
    d_input: int
      Size of input layer in the feed-forward layer.
    d_hidden: int
      Size of hidden layer in the feed-forward layer.
    d_output: int
      Size of output layer in the feed-forward layer.
    activation: str
      Activation function to be used in the feed-forward layer.
      Can choose between 'relu' for ReLU, 'leakyrelu' for LeakyReLU, 'prelu' for PReLU,
      'tanh' for TanH, 'selu' for SELU, 'elu' for ELU and 'linear' for linear activation.
    n_layers: int
      Number of layers in the feed-forward layer.
      Number of attention heads.
    hsize: int
      Size of dense layer.
    dropout_p: float
      Dropout probability in the feeed-forward layer.
    encoder_hsize: int
      Size of Dense layer for the encoder itself.
    encoder_dropout_p: float
      Dropout probability for connections in the encoder layer.
    """
    super(MATEncoderLayer, self).__init__()
    self.self_attn = MultiHeadedMATAttention(dist_kernel, lambda_attention,
                                             lambda_distance, h, sa_hsize,
                                             sa_dropout_p, output_bias)
    self.feed_forward = PositionwiseFeedForward(
        d_input, d_hidden, d_output, activation, n_layers, ff_dropout_p)
    layer = SublayerConnection(size=encoder_hsize, dropout_p=encoder_dropout_p)
    self.sublayer = nn.ModuleList([layer for _ in range(2)])
    self.size = encoder_hsize

  def forward(self, x: torch.Tensor, mask: torch.Tensor, sa_dropout_p: float,
              adj_matrix: np.ndarray, distance_matrix: np.ndarray):
    """Output computation for the MATEncoder layer.

    Parameters
    ----------
    x: torch.Tensor
      Input tensor.
    mask: torch.Tensor
      Masks out padding values so that they are not taken into account when computing the attention score.
    sa_dropout_p: float
      Dropout probability for the self-attention layer (MultiHeadedMATAttention).
    adj_matrix: np.ndarray
      Adjacency matrix of a molecule.
    distance_matrix: np.ndarray
      Distance matrix of a molecule.
      Dropout probability.
    output_bias: bool
      If True, dense layers will use bias vectors.
    """
    x = self.sublayer[0](x,
                         self.self_attn(
                             x,
                             x,
                             x,
                             mask=mask,
                             dropout_p=sa_dropout_p,
                             adj_matrix=adj_matrix,
                             distance_matrix=distance_matrix))
    return self.sublayer[1](x, self.feed_forward(x))


class SublayerConnection(nn.Module):
  """SublayerConnection layer which establishes a residual connection, as used in the Molecular Attention Transformer [1]_.

  The SublayerConnection layer is a residual layer which is then passed through Layer Normalization.
  The residual connection is established by computing the dropout-adjusted layer output of a normalized tensor and adding this to the original input tensor.

  References
  ----------
  .. [1] Lukasz Maziarka et al. "Molecule Attention Transformer" Graph Representation Learning workshop and Machine Learning and the Physical Sciences workshop at NeurIPS 2019. 2020. https://arxiv.org/abs/2002.08264

  Examples
  --------
  >>> import deepchem as dc
  >>> scale = 0.35
  >>> layer = dc.models.torch_models.layers.SublayerConnection(2, 0.)
  >>> input_ar = torch.tensor([[1., 2.], [5., 6.]])
  >>> output = layer(input_ar, input_ar)
  """
    super().__init__()
    if dist_kernel == "softmax":
      self.dist_kernel = lambda x: torch.softmax(-x, dim=-1)
    elif dist_kernel == "exp":
      self.dist_kernel = lambda x: torch.exp(-x)
    self.lambda_attention = lambda_attention
    self.lambda_distance = lambda_distance
    self.lambda_adjacency = 1.0 - self.lambda_attention - self.lambda_distance
    self.d_k = hsize // h
    self.h = h
    linear_layer = nn.Linear(hsize, hsize)
    self.linear_layers = nn.ModuleList([linear_layer for _ in range(3)])
    self.dropout_p = nn.Dropout(dropout_p)
    self.output_linear = nn.Linear(hsize, hsize, output_bias)

  def __init__(self, size: int, dropout_p: float):
    """Initialize a SublayerConnection Layer.
  def _singleAttention(self,
                       query,
                       key,
                       value,
                       mask,
                       dropout_p,
                       adj_matrix,
                       distance_matrix,
                       eps=1e-6,
                       inf=1e12):
    """Defining and computing output for a single MAT attention layer.

    Parameters
    ----------
    size: int
      Size of layer.
    query: torch.Tensor
      Standard query parameter for attention.
    key: torch.Tensor
      Standard key parameter for attention.
    value: torch.Tensor
      Standard value parameter for attention.
    mask: torch.Tensor
      Masks out padding values so that they are not taken into account when computing the attention score.
    dropout_p: float
      Dropout probability.
    adj_matrix: np.ndarray
      Adjacency matrix of the input molecule, returned from dc.feat.MATFeaturizer()
    dist_matrix: np.ndarray
      Distance matrix of the input molecule, returned from dc.feat.MATFeaturizer()
    eps: float
      Epsilon value
    inf: float
      Value of infinity to be used.
    """
    super(SublayerConnection, self).__init__()
    self.norm = nn.LayerNorm(size)
    self.dropout_p = nn.Dropout(dropout_p)

  def forward(self, x: torch.Tensor, output: torch.Tensor):
    """Output computation for the SublayerConnection layer.

    Takes an input tensor x, then adds the dropout-adjusted sublayer output for normalized x to it.
    This is done to add a residual connection followed by LayerNorm.

    Parameters
    ----------
    x: torch.Tensor
      Input tensor.
    output: torch.Tensor
      Layer whose normalized output will be added to x.
    """
    if x is None:
      return self.dropout_p(self.norm(output))

    if len(x.shape) < len(output.shape):
      temp_ar = x
      op_ar = output
      adjusted = temp_ar.unsqueeze(1).repeat(1, op_ar.shape[1], 1)
    elif len(x.shape) > len(output.shape):
      temp_ar = output
      op_ar = x
      adjusted = temp_ar.unsqueeze(1).repeat(1, op_ar.shape[1], 1)
    else:
      return x + self.dropout_p(self.norm(output))

    return adjusted + self.dropout_p(self.norm(op_ar))
    d_k = query.size(-1)
    scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)

    if mask is not None:
      scores = scores.masked_fill(
          mask.unsqueeze(1).repeat(1, query.shape[1], query.shape[2], 1) == 0,
          -inf)
    p_attn = F.softmax(scores, dim=-1)

class PositionwiseFeedForward(nn.Module):
  """PositionwiseFeedForward is a layer used to define the position-wise feed-forward (FFN) algorithm for the Molecular Attention Transformer [1]_
    adj_matrix = adj_matrix / (adj_matrix.sum(dim=-1).unsqueeze(2) + eps)
    p_adj = adj_matrix.unsqueeze(1).repeat(1, query.shape[1], 1, 1)

  Each layer in the MAT encoder contains a fully connected feed-forward network which applies two linear transformations and the given activation function.
  This is done in addition to the SublayerConnection module.
    distance_matrix = distance_matrix.masked_fill(
        mask.repeat(1, mask.shape[-1], 1) == 0, np.inf)
    distance_matrix = self.dist_kernel(distance_matrix)
    p_dist = distance_matrix.unsqueeze(1).repeat(1, query.shape[1], 1, 1)

  References
  ----------
  .. [1] Lukasz Maziarka et al. "Molecule Attention Transformer" Graph Representation Learning workshop and Machine Learning and the Physical Sciences workshop at NeurIPS 2019. 2020. https://arxiv.org/abs/2002.08264
    p_weighted = self.lambda_attention * p_attn + self.lambda_distance * p_dist + self.lambda_adjacency * p_adj
    p_weighted = dropout_p(p_weighted)

  Examples
  --------
  >>> import deepchem as dc
  >>> feed_fwd_layer = dc.models.torch_models.layers.PositionwiseFeedForward(d_input = 1024, d_hidden = None, d_output = None, activation = 'relu', n_layers = 1, dropout_p = 0.1)
  """
    return torch.matmul(p_weighted, value), p_attn

  def __init__(self, d_input: int, d_hidden: int, d_output: int,
               activation: str, n_layers: int, dropout_p: float):
    """Initialize a PositionwiseFeedForward layer.
  def forward(self,
              query,
              key,
              value,
              mask,
              dropout_p,
              adj_matrix,
              distance_matrix,
              eps=1e-6,
              inf=1e12,
              **kwargs):
    """Output computation for the MultiHeadedAttention layer.

    Parameters
    ----------
    d_input: int
      Size of input layer.
    d_hidden: int (same as d_input if d_output = 0)
      Size of hidden layer.
    d_output: int (same as d_input if d_output = 0)
      Size of output layer.
    activation: str
      Activation function to be used. Can choose between 'relu' for ReLU, 'leakyrelu' for LeakyReLU, 'prelu' for PReLU,
      'tanh' for TanH, 'selu' for SELU, 'elu' for ELU and 'linear' for linear activation.
    n_layers: int
      Number of layers.
    dropout_p: float
      Dropout probability.
    query: torch.Tensor
      Standard query parameter for attention.
    key: torch.Tensor
      Standard key parameter for attention.
    value: torch.Tensor
      Standard value parameter for attention.
    mask: torch.Tensor
      Masks out padding values so that they are not taken into account when computing the attention score.
    """
    super(PositionwiseFeedForward, self).__init__()

    if activation == 'relu':
      self.activation = nn.ReLU()

    elif activation == 'leakyrelu':
      self.activation = nn.LeakyReLU(0.1)

    elif activation == 'prelu':
      self.activation = nn.PReLU()

    elif activation == 'tanh':
      self.activation = nn.Tanh()

    elif activation == 'selu':
      self.activation = nn.SELU()

    elif activation == 'elu':
      self.activation = nn.ELU()

    elif activation == "linear":
      self.activation = lambda x: x

    self.n_layers = n_layers
    d_output = d_output if d_output != 0 else d_input
    d_hidden = d_hidden if d_hidden != 0 else d_input

    if n_layers == 1:
      self.linears = [nn.Linear(d_input, d_output)]

    else:
      self.linears = [nn.Linear(d_input, d_hidden)] + \
                      [nn.Linear(d_hidden, d_hidden) for _ in range(n_layers - 2)] + \
                      [nn.Linear(d_hidden, d_output)]

    self.linears = nn.ModuleList(self.linears)
    dropout_layer = nn.Dropout(dropout_p)
    self.dropout_p = nn.ModuleList([dropout_layer for _ in range(n_layers)])
    if mask is not None:
      mask = mask.unsqueeze(1)

  def forward(self, x: torch.Tensor):
    """Output Computation for the PositionwiseFeedForward layer.
    batch_size = query.size(0)

    Parameters
    ----------
    x: torch.Tensor
      Input tensor.
    """
    if not self.n_layers:
      return x
    query, key, value = [
        layer(x).view(batch_size, -1, self.h, self.d_k).transpose(1, 2)
        for layer, x in zip(self.linear_layers, (query, key, value))
    ]

    if self.n_layers == 1:
      return self.dropout_p[0](self.activation(self.linears[0](x)))
    x, _ = self._singleAttention(query, key, value, mask, dropout_p, adj_matrix,
                                 distance_matrix, eps, inf, **kwargs)
    x = x.transpose(1, 2).contiguous().view(batch_size, -1, self.h * self.d_k)

<<<<<<< HEAD
    else:
      for i in range(self.n_layers - 1):
        x = self.dropout_p[i](self.activation(self.linears[i](x)))
      return self.linears[-1](x)
>>>>>>> Added encoder layers
=======
    return self.output_linear(x)
>>>>>>> Combined MATAttention and MultiHeadedAttention