代码示例 / 生成式深度学习 / 使用 VAE 生成药物分子

使用 VAE 生成药物分子

作者: Victor Basu
创建日期 2022/03/10
最后修改日期 2024/12/17
描述: 实现一个用于药物发现的卷积变分自编码器 (VAE)。

ⓘ 本示例使用 Keras 3

在 Colab 中查看 GitHub 源代码


引言

在本示例中,我们使用变分自编码器 (Variational Autoencoder) 生成用于药物发现的分子。我们参考了论文《基于分子数据驱动连续表示的自动化化学设计》和《MolGAN:一种用于小型分子图的隐式生成模型》。

论文中描述的、题为 使用数据驱动的分子连续表示的自动化化学设计 的模型,通过高效探索开放式化学化合物空间来生成新分子。该模型由三个组件构成:编码器、解码器和预测器。编码器将分子的离散表示转换为实值连续向量,解码器则将这些连续向量转换回分子的离散表示。预测器根据分子的潜在连续向量表示来估计化学性质。连续表示使得使用基于梯度的优化成为可能,从而能够高效地指导寻找优化后的功能性化合物的搜索过程。

intro

图 (a) - 用于分子设计的自编码器图,包括联合属性预测模型。从离散分子表示(例如 SMILES 字符串)开始,编码器网络将每个分子转换为潜在空间中的向量,这有效地实现了连续分子表示。给定潜在空间中的一个点,解码器网络产生相应的 SMILES 字符串。多层感知器网络估计与每个分子相关的目标属性值。

图 (b) - 连续潜在空间中的基于梯度优化。在训练一个代理模型 f(z) 以基于分子的潜在表示 z 预测其属性后,我们可以相对于 z 优化 f(z),以找到预期符合特定期望属性的新潜在表示。这些新的潜在表示随后可以解码为 SMILES 字符串,此时可以凭经验测试它们的属性。

关于 MolGAN 的解释和实现,请参考 Alexander Kensert 撰写的 Keras 示例《使用 R-GCN 和 WGAN-GP 生成小型分子图》。本示例中使用的许多函数均来自上述 Keras 示例。


环境配置

RDKit 是一个用于化学信息学和机器学习的开源工具包。如果从事药物发现领域,这个工具包会派上用场。在本示例中,RDKit 用于方便有效地将 SMILES 转换为分子对象,然后从这些对象中获取原子和键的集合。

引用自 使用 R-GCN 和 WGAN-GP 生成小型分子图)

"SMILES 以 ASCII 字符串的形式表示给定分子的结构。SMILES 字符串是一种紧凑的编码,对于较小的分子,相对来说易于人类阅读。将分子编码为字符串既减轻又方便了数据库和/或网络搜索给定分子。RDKit 使用算法准确地将给定的 SMILES 转换为分子对象,然后可以使用该对象计算大量的分子性质/特征。"

!pip -q install rdkit-pypi==2021.9.4
import os

os.environ["KERAS_BACKEND"] = "tensorflow"

import ast

import pandas as pd
import numpy as np

import tensorflow as tf
import keras
from keras import layers
from keras import ops

import matplotlib.pyplot as plt
from rdkit import Chem, RDLogger
from rdkit.Chem import BondType
from rdkit.Chem.Draw import MolsToGridImage

RDLogger.DisableLog("rdApp.*")

数据集

我们使用 ZINC - 一个用于虚拟筛选的免费可商购化合物数据库 数据集。该数据集包含 SMILES 表示的分子式以及它们各自的分子性质,例如 logP(水辛醇分配系数)、SAS(合成可及性分数)和 QED(药物相似性定性估计)。

csv_path = keras.utils.get_file(
    "250k_rndm_zinc_drugs_clean_3.csv",
    "https://raw.githubusercontent.com/aspuru-guzik-group/chemical_vae/master/models/zinc_properties/250k_rndm_zinc_drugs_clean_3.csv",
)

df = pd.read_csv(csv_path)
df["smiles"] = df["smiles"].apply(lambda s: s.replace("\n", ""))
df.head()
smiles logP qed SAS
0 CC(C)(C)c1ccc2occ(CC(=O)Nc3ccccc3F)c2c1 5.05060 0.702012 2.084095
1 C[C@@H]1CC(Nc2cncc(-c3nncn3C)c2)C[C@@H](C)C1 3.11370 0.928975 3.432004
2 N#Cc1ccc(-c2ccc(O[C@@H](C(=O)N3CCCC3)c3ccccc3)... 4.96778 0.599682 2.470633
3 CCOC(=O)[C@@H]1CCCN(C(=O)c2nc(-c3ccc(C)cc3)n3c... 4.00022 0.690944 2.822753
4 N#CC1=C(SCC(=O)Nc2cccc(Cl)c2)N=C([O-])[C@H](C#... 3.60956 0.789027 4.035182
---
## Hyperparameters


```python
SMILE_CHARSET = '["C", "B", "F", "I", "H", "O", "N", "S", "P", "Cl", "Br"]'

bond_mapping = {"SINGLE": 0, "DOUBLE": 1, "TRIPLE": 2, "AROMATIC": 3}
bond_mapping.update(
    {0: BondType.SINGLE, 1: BondType.DOUBLE, 2: BondType.TRIPLE, 3: BondType.AROMATIC}
)
SMILE_CHARSET = ast.literal_eval(SMILE_CHARSET)

MAX_MOLSIZE = max(df["smiles"].str.len())
SMILE_to_index = dict((c, i) for i, c in enumerate(SMILE_CHARSET))
index_to_SMILE = dict((i, c) for i, c in enumerate(SMILE_CHARSET))
atom_mapping = dict(SMILE_to_index)
atom_mapping.update(index_to_SMILE)

BATCH_SIZE = 100
EPOCHS = 10

VAE_LR = 5e-4
NUM_ATOMS = 120  # Maximum number of atoms

ATOM_DIM = len(SMILE_CHARSET)  # Number of atom types
BOND_DIM = 4 + 1  # Number of bond types
LATENT_DIM = 435  # Size of the latent space


def smiles_to_graph(smiles):
    # Converts SMILES to molecule object
    molecule = Chem.MolFromSmiles(smiles)

    # Initialize adjacency and feature tensor
    adjacency = np.zeros((BOND_DIM, NUM_ATOMS, NUM_ATOMS), "float32")
    features = np.zeros((NUM_ATOMS, ATOM_DIM), "float32")

    # loop over each atom in molecule
    for atom in molecule.GetAtoms():
        i = atom.GetIdx()
        atom_type = atom_mapping[atom.GetSymbol()]
        features[i] = np.eye(ATOM_DIM)[atom_type]
        # loop over one-hop neighbors
        for neighbor in atom.GetNeighbors():
            j = neighbor.GetIdx()
            bond = molecule.GetBondBetweenAtoms(i, j)
            bond_type_idx = bond_mapping[bond.GetBondType().name]
            adjacency[bond_type_idx, [i, j], [j, i]] = 1

    # Where no bond, add 1 to last channel (indicating "non-bond")
    # Notice: channels-first
    adjacency[-1, np.sum(adjacency, axis=0) == 0] = 1

    # Where no atom, add 1 to last column (indicating "non-atom")
    features[np.where(np.sum(features, axis=1) == 0)[0], -1] = 1

    return adjacency, features


def graph_to_molecule(graph):
    # Unpack graph
    adjacency, features = graph

    # RWMol is a molecule object intended to be edited
    molecule = Chem.RWMol()

    # Remove "no atoms" & atoms with no bonds
    keep_idx = np.where(
        (np.argmax(features, axis=1) != ATOM_DIM - 1)
        & (np.sum(adjacency[:-1], axis=(0, 1)) != 0)
    )[0]
    features = features[keep_idx]
    adjacency = adjacency[:, keep_idx, :][:, :, keep_idx]

    # Add atoms to molecule
    for atom_type_idx in np.argmax(features, axis=1):
        atom = Chem.Atom(atom_mapping[atom_type_idx])
        _ = molecule.AddAtom(atom)

    # Add bonds between atoms in molecule; based on the upper triangles
    # of the [symmetric] adjacency tensor
    (bonds_ij, atoms_i, atoms_j) = np.where(np.triu(adjacency) == 1)
    for bond_ij, atom_i, atom_j in zip(bonds_ij, atoms_i, atoms_j):
        if atom_i == atom_j or bond_ij == BOND_DIM - 1:
            continue
        bond_type = bond_mapping[bond_ij]
        molecule.AddBond(int(atom_i), int(atom_j), bond_type)

    # Sanitize the molecule; for more information on sanitization, see
    # https://www.rdkit.org/docs/RDKit_Book.html#molecular-sanitization
    flag = Chem.SanitizeMol(molecule, catchErrors=True)
    # Let's be strict. If sanitization fails, return None
    if flag != Chem.SanitizeFlags.SANITIZE_NONE:
        return None

    return molecule

生成训练集

train_df = df.sample(frac=0.75, random_state=42)  # random state is a seed value
train_df.reset_index(drop=True, inplace=True)

adjacency_tensor, feature_tensor, qed_tensor = [], [], []
for idx in range(8000):
    adjacency, features = smiles_to_graph(train_df.loc[idx]["smiles"])
    qed = train_df.loc[idx]["qed"]
    adjacency_tensor.append(adjacency)
    feature_tensor.append(features)
    qed_tensor.append(qed)

adjacency_tensor = np.array(adjacency_tensor)
feature_tensor = np.array(feature_tensor)
qed_tensor = np.array(qed_tensor)


class RelationalGraphConvLayer(keras.layers.Layer):
    def __init__(
        self,
        units=128,
        activation="relu",
        use_bias=False,
        kernel_initializer="glorot_uniform",
        bias_initializer="zeros",
        kernel_regularizer=None,
        bias_regularizer=None,
        **kwargs
    ):
        super().__init__(**kwargs)

        self.units = units
        self.activation = keras.activations.get(activation)
        self.use_bias = use_bias
        self.kernel_initializer = keras.initializers.get(kernel_initializer)
        self.bias_initializer = keras.initializers.get(bias_initializer)
        self.kernel_regularizer = keras.regularizers.get(kernel_regularizer)
        self.bias_regularizer = keras.regularizers.get(bias_regularizer)

    def build(self, input_shape):
        bond_dim = input_shape[0][1]
        atom_dim = input_shape[1][2]

        self.kernel = self.add_weight(
            shape=(bond_dim, atom_dim, self.units),
            initializer=self.kernel_initializer,
            regularizer=self.kernel_regularizer,
            trainable=True,
            name="W",
            dtype="float32",
        )

        if self.use_bias:
            self.bias = self.add_weight(
                shape=(bond_dim, 1, self.units),
                initializer=self.bias_initializer,
                regularizer=self.bias_regularizer,
                trainable=True,
                name="b",
                dtype="float32",
            )

        self.built = True

    def call(self, inputs, training=False):
        adjacency, features = inputs
        # Aggregate information from neighbors
        x = ops.matmul(adjacency, features[:, None])
        # Apply linear transformation
        x = ops.matmul(x, self.kernel)
        if self.use_bias:
            x += self.bias
        # Reduce bond types dim
        x_reduced = ops.sum(x, axis=1)
        # Apply non-linear transformation
        return self.activation(x_reduced)

构建编码器和解码器

编码器将分子的图邻接矩阵和特征矩阵作为输入。这些特征通过图卷积层处理,然后展平并由几个 Dense 层处理,以导出 z_meanlog_var,即分子的潜在空间表示。

图卷积层:关系图卷积层实现非线性转换的邻域聚合。我们可以将这些层定义如下

H_hat**(l+1) = σ(D_hat**(-1) * A_hat * H_hat**(l+1) * W**(l))

其中 σ 表示非线性变换(通常是 ReLU 激活函数),A 是邻接张量,H_hat**(l) 是第 l 层的特征张量,D_hat**(-1)A_hat 的对角度张量逆矩阵,W_hat**(l) 是第 l 层的可训练权重张量。具体来说,对于每种键类型(关系),度张量在其对角线上表示连接到每个原子的键的数量。

来源:使用 R-GCN 和 WGAN-GP 生成小型分子图)

解码器将潜在空间表示作为输入,并预测相应分子的图邻接矩阵和特征矩阵。

def get_encoder(
    gconv_units, latent_dim, adjacency_shape, feature_shape, dense_units, dropout_rate
):
    adjacency = layers.Input(shape=adjacency_shape)
    features = layers.Input(shape=feature_shape)

    # Propagate through one or more graph convolutional layers
    features_transformed = features
    for units in gconv_units:
        features_transformed = RelationalGraphConvLayer(units)(
            [adjacency, features_transformed]
        )
    # Reduce 2-D representation of molecule to 1-D
    x = layers.GlobalAveragePooling1D()(features_transformed)

    # Propagate through one or more densely connected layers
    for units in dense_units:
        x = layers.Dense(units, activation="relu")(x)
        x = layers.Dropout(dropout_rate)(x)

    z_mean = layers.Dense(latent_dim, dtype="float32", name="z_mean")(x)
    log_var = layers.Dense(latent_dim, dtype="float32", name="log_var")(x)

    encoder = keras.Model([adjacency, features], [z_mean, log_var], name="encoder")

    return encoder


def get_decoder(dense_units, dropout_rate, latent_dim, adjacency_shape, feature_shape):
    latent_inputs = keras.Input(shape=(latent_dim,))

    x = latent_inputs
    for units in dense_units:
        x = layers.Dense(units, activation="tanh")(x)
        x = layers.Dropout(dropout_rate)(x)

    # Map outputs of previous layer (x) to [continuous] adjacency tensors (x_adjacency)
    x_adjacency = layers.Dense(np.prod(adjacency_shape))(x)
    x_adjacency = layers.Reshape(adjacency_shape)(x_adjacency)
    # Symmetrify tensors in the last two dimensions
    x_adjacency = (x_adjacency + ops.transpose(x_adjacency, (0, 1, 3, 2))) / 2
    x_adjacency = layers.Softmax(axis=1)(x_adjacency)

    # Map outputs of previous layer (x) to [continuous] feature tensors (x_features)
    x_features = layers.Dense(np.prod(feature_shape))(x)
    x_features = layers.Reshape(feature_shape)(x_features)
    x_features = layers.Softmax(axis=2)(x_features)

    decoder = keras.Model(
        latent_inputs, outputs=[x_adjacency, x_features], name="decoder"
    )

    return decoder

构建采样层

class Sampling(layers.Layer):
    def __init__(self, seed=None, **kwargs):
        super().__init__(**kwargs)
        self.seed_generator = keras.random.SeedGenerator(seed)

    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch, dim = ops.shape(z_log_var)
        epsilon = keras.random.normal(shape=(batch, dim), seed=self.seed_generator)
        return z_mean + ops.exp(0.5 * z_log_var) * epsilon

构建 VAE

该模型训练时优化以下四种损失:

  • 分类交叉熵
  • KL 散度损失
  • 属性预测损失
  • 图损失(梯度惩罚)

分类交叉熵损失函数衡量模型的重构准确性。属性预测损失估计潜在表示通过属性预测模型运行后,预测属性与实际属性之间的均方误差。模型的属性预测通过二元交叉熵进行优化。梯度惩罚进一步由模型的属性 (QED) 预测进行指导。

梯度惩罚是对 1-Lipschitz 连续性的一种替代软约束,是对原始神经网络中梯度裁剪方案的改进(“1-Lipschitz 连续性”意味着函数在每一点的梯度范数最大为 1)。它向损失函数添加一个正则化项。

class MoleculeGenerator(keras.Model):
    def __init__(self, encoder, decoder, max_len, seed=None, **kwargs):
        super().__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.property_prediction_layer = layers.Dense(1)
        self.max_len = max_len
        self.seed_generator = keras.random.SeedGenerator(seed)
        self.sampling_layer = Sampling(seed=seed)

        self.train_total_loss_tracker = keras.metrics.Mean(name="train_total_loss")
        self.val_total_loss_tracker = keras.metrics.Mean(name="val_total_loss")

    def train_step(self, data):
        adjacency_tensor, feature_tensor, qed_tensor = data[0]
        graph_real = [adjacency_tensor, feature_tensor]
        self.batch_size = ops.shape(qed_tensor)[0]
        with tf.GradientTape() as tape:
            z_mean, z_log_var, qed_pred, gen_adjacency, gen_features = self(
                graph_real, training=True
            )
            graph_generated = [gen_adjacency, gen_features]
            total_loss = self._compute_loss(
                z_log_var, z_mean, qed_tensor, qed_pred, graph_real, graph_generated
            )

        grads = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))

        self.train_total_loss_tracker.update_state(total_loss)
        return {"loss": self.train_total_loss_tracker.result()}

    def _compute_loss(
        self, z_log_var, z_mean, qed_true, qed_pred, graph_real, graph_generated
    ):
        adjacency_real, features_real = graph_real
        adjacency_gen, features_gen = graph_generated

        adjacency_loss = ops.mean(
            ops.sum(
                keras.losses.categorical_crossentropy(
                    adjacency_real, adjacency_gen, axis=1
                ),
                axis=(1, 2),
            )
        )
        features_loss = ops.mean(
            ops.sum(
                keras.losses.categorical_crossentropy(features_real, features_gen),
                axis=(1),
            )
        )
        kl_loss = -0.5 * ops.sum(
            1 + z_log_var - z_mean**2 - ops.minimum(ops.exp(z_log_var), 1e6), 1
        )
        kl_loss = ops.mean(kl_loss)

        property_loss = ops.mean(
            keras.losses.binary_crossentropy(qed_true, ops.squeeze(qed_pred, axis=1))
        )

        graph_loss = self._gradient_penalty(graph_real, graph_generated)

        return kl_loss + property_loss + graph_loss + adjacency_loss + features_loss

    def _gradient_penalty(self, graph_real, graph_generated):
        # Unpack graphs
        adjacency_real, features_real = graph_real
        adjacency_generated, features_generated = graph_generated

        # Generate interpolated graphs (adjacency_interp and features_interp)
        alpha = keras.random.uniform(shape=(self.batch_size,), seed=self.seed_generator)
        alpha = ops.reshape(alpha, (self.batch_size, 1, 1, 1))
        adjacency_interp = (adjacency_real * alpha) + (
            1.0 - alpha
        ) * adjacency_generated
        alpha = ops.reshape(alpha, (self.batch_size, 1, 1))
        features_interp = (features_real * alpha) + (1.0 - alpha) * features_generated

        # Compute the logits of interpolated graphs
        with tf.GradientTape() as tape:
            tape.watch(adjacency_interp)
            tape.watch(features_interp)
            _, _, logits, _, _ = self(
                [adjacency_interp, features_interp], training=True
            )

        # Compute the gradients with respect to the interpolated graphs
        grads = tape.gradient(logits, [adjacency_interp, features_interp])
        # Compute the gradient penalty
        grads_adjacency_penalty = (1 - ops.norm(grads[0], axis=1)) ** 2
        grads_features_penalty = (1 - ops.norm(grads[1], axis=2)) ** 2
        return ops.mean(
            ops.mean(grads_adjacency_penalty, axis=(-2, -1))
            + ops.mean(grads_features_penalty, axis=(-1))
        )

    def inference(self, batch_size):
        z = keras.random.normal(
            shape=(batch_size, LATENT_DIM), seed=self.seed_generator
        )
        reconstruction_adjacency, reconstruction_features = model.decoder.predict(z)
        # obtain one-hot encoded adjacency tensor
        adjacency = ops.argmax(reconstruction_adjacency, axis=1)
        adjacency = ops.one_hot(adjacency, num_classes=BOND_DIM, axis=1)
        # Remove potential self-loops from adjacency
        adjacency = adjacency * (1.0 - ops.eye(NUM_ATOMS, dtype="float32")[None, None])
        # obtain one-hot encoded feature tensor
        features = ops.argmax(reconstruction_features, axis=2)
        features = ops.one_hot(features, num_classes=ATOM_DIM, axis=2)
        return [
            graph_to_molecule([adjacency[i].numpy(), features[i].numpy()])
            for i in range(batch_size)
        ]

    def call(self, inputs):
        z_mean, log_var = self.encoder(inputs)
        z = self.sampling_layer([z_mean, log_var])

        gen_adjacency, gen_features = self.decoder(z)

        property_pred = self.property_prediction_layer(z_mean)

        return z_mean, log_var, property_pred, gen_adjacency, gen_features

训练模型

vae_optimizer = keras.optimizers.Adam(learning_rate=VAE_LR)

encoder = get_encoder(
    gconv_units=[9],
    adjacency_shape=(BOND_DIM, NUM_ATOMS, NUM_ATOMS),
    feature_shape=(NUM_ATOMS, ATOM_DIM),
    latent_dim=LATENT_DIM,
    dense_units=[512],
    dropout_rate=0.0,
)
decoder = get_decoder(
    dense_units=[128, 256, 512],
    dropout_rate=0.2,
    latent_dim=LATENT_DIM,
    adjacency_shape=(BOND_DIM, NUM_ATOMS, NUM_ATOMS),
    feature_shape=(NUM_ATOMS, ATOM_DIM),
)

model = MoleculeGenerator(encoder, decoder, MAX_MOLSIZE)

model.compile(vae_optimizer)
history = model.fit([adjacency_tensor, feature_tensor, qed_tensor], epochs=EPOCHS)
Epoch 1/10
 250/250 ━━━━━━━━━━━━━━━━━━━━ 125s 481ms/step - loss: 2841.6440

 Epoch 2/10
  250/250 ━━━━━━━━━━━━━━━━━━━━ 113s 451ms/step - loss: 197.5607

Epoch 3/10
 250/250 ━━━━━━━━━━━━━━━━━━━━ 115s 460ms/step - loss: 220.5820

Epoch 4/10
 250/250 ━━━━━━━━━━━━━━━━━━━━ 109s 434ms/step - loss: 394.0200

Epoch 5/10
 250/250 ━━━━━━━━━━━━━━━━━━━━ 109s 436ms/step - loss: 388.5954

Epoch 6/10
 250/250 ━━━━━━━━━━━━━━━━━━━━ 108s 431ms/step - loss: 323.4093

Epoch 7/10
 250/250 ━━━━━━━━━━━━━━━━━━━━ 108s 432ms/step - loss: 278.2234
Epoch 8/10
 250/250 ━━━━━━━━━━━━━━━━━━━━ 110s 439ms/step - loss: 393.4183


Epoch 9/10
 250/250 ━━━━━━━━━━━━━━━━━━━━ 114s 456ms/step - loss: 523.3671

Epoch 10/10
 250/250 ━━━━━━━━━━━━━━━━━━━━ 111s 445ms/step - loss: 223.5443


---
## Inference

We use our model to generate new valid molecules from different points of the latent space.

### Generate unique Molecules with the model


```python
molecules = model.inference(1000)

MolsToGridImage(
    [m for m in molecules if m is not None][:1000], molsPerRow=5, subImgSize=(260, 160)
)
![png](/img/examples/generative/molecule_generation/molecule_generation_21_18.png) ### 显示基于分子属性 (QAE) 的潜在空间聚类
def plot_latent(vae, data, labels):
    # display a 2D plot of the property in the latent space
    z_mean, _ = vae.encoder.predict(data)
    plt.figure(figsize=(12, 10))
    plt.scatter(z_mean[:, 0], z_mean[:, 1], c=labels)
    plt.colorbar()
    plt.xlabel("z[0]")
    plt.ylabel("z[1]")
    plt.show()


plot_latent(model, [adjacency_tensor[:8000], feature_tensor[:8000]], qed_tensor[:8000])
![png](/img/examples/generative/molecule_generation/molecule_generation_23_39.png) --- ## 结论 在此示例中,我们结合了两篇论文的模型架构,一篇是 2016 年的《基于分子数据驱动连续表示的自动化化学设计》,另一篇是 2018 年的《MolGAN》论文。前者将 SMILES 输入视为字符串,并旨在生成 SMILES 格式的分子字符串,而后者将 SMILES 输入视为图(邻接矩阵和特征矩阵的组合),并旨在生成分子图。这种混合方法实现了一种新型的、基于梯度的定向化学空间搜索。示例在 HuggingFace 上提供 | 训练好的模型 | 演示 | | :--: | :--: | | [![Generic badge](https://img.shields.io/badge/%F0%9F%A4%97%20Model-使用%20VAE%20生成药物分子-black.svg)](https://hugging-face.cn/keras-io/drug-molecule-generation-with-VAE) | [![Generic badge](https://img.shields.io/badge/%F0%9F%A4%97%20Spaces-使用%20VAE%20生成药物分子-black.svg)](https://hugging-face.cn/spaces/keras-io/generating-drug-molecule-with-VAE) |