作者: Victor Basu
创建日期 2022/03/10
最后修改日期 2024/12/17
描述: 实现卷积变分自编码器 (VAE) 用于药物发现。
在本示例中,我们使用变分自编码器来生成用于药物发现的分子。我们参考了研究论文 《使用数据的连续表示进行自动化学设计》 和 《MolGAN:小型分子图的隐式生成模型》。
论文 《使用数据的连续表示进行自动化学设计》 中描述的模型通过高效探索开放式的化合物空间来生成新分子。该模型包含三个组件:编码器、解码器和预测器。编码器将分子的离散表示转换为实值连续向量,解码器将这些连续向量转换回离散的分子表示。预测器从分子的潜在连续向量表示中估计算效。连续表示允许使用基于梯度的优化来有效地指导搜索优化的功能化合物。
图 (a) - 用于分子设计的自编码器图,包括联合属性预测模型。从离散的分子表示(例如 SMILES 字符串)开始,编码器网络将每个分子转换为潜在空间的向量,该向量有效地表示连续的分子。给定潜在空间中的一个点,解码器网络会生成相应的 SMILES 字符串。多层感知机网络估算与每个分子相关的目标属性值。
图 (b) - 在连续潜在空间中进行基于梯度的优化。在训练了一个用于根据其潜在表示 z 预测分子属性的代理模型 f(z) 后,我们可以对 f(z) 关于 z 进行优化,以找到预期匹配特定期望属性的新潜在表示。然后可以将这些新的潜在表示解码为 SMILES 字符串,届时可以实证测试其属性。
有关 MolGAN 的解释和实现,请参考 Keras 示例 《WGAN-GP 与 R-GCN 用于生成小型分子图》 (作者:Alexander Kensert)。本示例中的许多函数都源自上述 Keras 示例。
RDKit 是一个开源的计算化学和机器学习工具包。如果您从事药物发现领域,这个工具包会非常有用。在本示例中,RDKit 用于方便高效地将 SMILES 转换为分子对象,然后从中获取原子和键的集合。
引用自 《WGAN-GP 与 R-GCN 用于生成小型分子图》)
"SMILES 以 ASCII 字符串的形式表示给定分子的结构。SMILES 字符串是一种紧凑的编码,对于较小的分子来说,相对易于人类阅读。将分子编码为字符串可以减轻和促进给定分子的数据库和/或网络搜索。RDKit 使用算法将给定的 SMILES 精确地转换为分子对象,然后可以使用该对象计算大量的分子属性/特征。"
!pip -q install rdkit-pypi==2021.9.4
import os
os.environ["KERAS_BACKEND"] = "tensorflow"
import ast
import pandas as pd
import numpy as np
import tensorflow as tf
import keras
from keras import layers
from keras import ops
import matplotlib.pyplot as plt
from rdkit import Chem, RDLogger
from rdkit.Chem import BondType
from rdkit.Chem.Draw import MolsToGridImage
RDLogger.DisableLog("rdApp.*")
我们使用 ZINC – 可商购化合物虚拟筛选免费数据库 数据集。该数据集包含 SMILE 表示的分子公式以及它们各自的分子属性,如 logP(水-辛醇分配系数)、SAS(合成可及性分数)和 QED(药物相似性定性估计)。
csv_path = keras.utils.get_file(
"250k_rndm_zinc_drugs_clean_3.csv",
"https://raw.githubusercontent.com/aspuru-guzik-group/chemical_vae/master/models/zinc_properties/250k_rndm_zinc_drugs_clean_3.csv",
)
df = pd.read_csv(csv_path)
df["smiles"] = df["smiles"].apply(lambda s: s.replace("\n", ""))
df.head()
| smiles | logP | qed | SAS | |
|---|---|---|---|---|
| 0 | CC(C)(C)c1ccc2occ(CC(=O)Nc3ccccc3F)c2c1 | 5.05060 | 0.702012 | 2.084095 |
| 1 | C[C@@H]1CC(Nc2cncc(-c3nncn3C)c2)C[C@@H](C)C1 | 3.11370 | 0.928975 | 3.432004 |
| 2 | N#Cc1ccc(-c2ccc(O[C@@H](C(=O)N3CCCC3)c3ccccc3)... | 4.96778 | 0.599682 | 2.470633 |
| 3 | CCOC(=O)[C@@H]1CCCN(C(=O)c2nc(-c3ccc(C)cc3)n3c... | 4.00022 | 0.690944 | 2.822753 |
| 4 | N#CC1=C(SCC(=O)Nc2cccc(Cl)c2)N=C([O-])[C@H](C#... | 3.60956 | 0.789027 | 4.035182 |
---
## Hyperparameters
```python
SMILE_CHARSET = '["C", "B", "F", "I", "H", "O", "N", "S", "P", "Cl", "Br"]'
bond_mapping = {"SINGLE": 0, "DOUBLE": 1, "TRIPLE": 2, "AROMATIC": 3}
bond_mapping.update(
{0: BondType.SINGLE, 1: BondType.DOUBLE, 2: BondType.TRIPLE, 3: BondType.AROMATIC}
)
SMILE_CHARSET = ast.literal_eval(SMILE_CHARSET)
MAX_MOLSIZE = max(df["smiles"].str.len())
SMILE_to_index = dict((c, i) for i, c in enumerate(SMILE_CHARSET))
index_to_SMILE = dict((i, c) for i, c in enumerate(SMILE_CHARSET))
atom_mapping = dict(SMILE_to_index)
atom_mapping.update(index_to_SMILE)
BATCH_SIZE = 100
EPOCHS = 10
VAE_LR = 5e-4
NUM_ATOMS = 120 # Maximum number of atoms
ATOM_DIM = len(SMILE_CHARSET) # Number of atom types
BOND_DIM = 4 + 1 # Number of bond types
LATENT_DIM = 435 # Size of the latent space
def smiles_to_graph(smiles):
# Converts SMILES to molecule object
molecule = Chem.MolFromSmiles(smiles)
# Initialize adjacency and feature tensor
adjacency = np.zeros((BOND_DIM, NUM_ATOMS, NUM_ATOMS), "float32")
features = np.zeros((NUM_ATOMS, ATOM_DIM), "float32")
# loop over each atom in molecule
for atom in molecule.GetAtoms():
i = atom.GetIdx()
atom_type = atom_mapping[atom.GetSymbol()]
features[i] = np.eye(ATOM_DIM)[atom_type]
# loop over one-hop neighbors
for neighbor in atom.GetNeighbors():
j = neighbor.GetIdx()
bond = molecule.GetBondBetweenAtoms(i, j)
bond_type_idx = bond_mapping[bond.GetBondType().name]
adjacency[bond_type_idx, [i, j], [j, i]] = 1
# Where no bond, add 1 to last channel (indicating "non-bond")
# Notice: channels-first
adjacency[-1, np.sum(adjacency, axis=0) == 0] = 1
# Where no atom, add 1 to last column (indicating "non-atom")
features[np.where(np.sum(features, axis=1) == 0)[0], -1] = 1
return adjacency, features
def graph_to_molecule(graph):
# Unpack graph
adjacency, features = graph
# RWMol is a molecule object intended to be edited
molecule = Chem.RWMol()
# Remove "no atoms" & atoms with no bonds
keep_idx = np.where(
(np.argmax(features, axis=1) != ATOM_DIM - 1)
& (np.sum(adjacency[:-1], axis=(0, 1)) != 0)
)[0]
features = features[keep_idx]
adjacency = adjacency[:, keep_idx, :][:, :, keep_idx]
# Add atoms to molecule
for atom_type_idx in np.argmax(features, axis=1):
atom = Chem.Atom(atom_mapping[atom_type_idx])
_ = molecule.AddAtom(atom)
# Add bonds between atoms in molecule; based on the upper triangles
# of the [symmetric] adjacency tensor
(bonds_ij, atoms_i, atoms_j) = np.where(np.triu(adjacency) == 1)
for bond_ij, atom_i, atom_j in zip(bonds_ij, atoms_i, atoms_j):
if atom_i == atom_j or bond_ij == BOND_DIM - 1:
continue
bond_type = bond_mapping[bond_ij]
molecule.AddBond(int(atom_i), int(atom_j), bond_type)
# Sanitize the molecule; for more information on sanitization, see
# https://www.rdkit.org/docs/RDKit_Book.html#molecular-sanitization
flag = Chem.SanitizeMol(molecule, catchErrors=True)
# Let's be strict. If sanitization fails, return None
if flag != Chem.SanitizeFlags.SANITIZE_NONE:
return None
return molecule
train_df = df.sample(frac=0.75, random_state=42) # random state is a seed value
train_df.reset_index(drop=True, inplace=True)
adjacency_tensor, feature_tensor, qed_tensor = [], [], []
for idx in range(8000):
adjacency, features = smiles_to_graph(train_df.loc[idx]["smiles"])
qed = train_df.loc[idx]["qed"]
adjacency_tensor.append(adjacency)
feature_tensor.append(features)
qed_tensor.append(qed)
adjacency_tensor = np.array(adjacency_tensor)
feature_tensor = np.array(feature_tensor)
qed_tensor = np.array(qed_tensor)
class RelationalGraphConvLayer(keras.layers.Layer):
def __init__(
self,
units=128,
activation="relu",
use_bias=False,
kernel_initializer="glorot_uniform",
bias_initializer="zeros",
kernel_regularizer=None,
bias_regularizer=None,
**kwargs
):
super().__init__(**kwargs)
self.units = units
self.activation = keras.activations.get(activation)
self.use_bias = use_bias
self.kernel_initializer = keras.initializers.get(kernel_initializer)
self.bias_initializer = keras.initializers.get(bias_initializer)
self.kernel_regularizer = keras.regularizers.get(kernel_regularizer)
self.bias_regularizer = keras.regularizers.get(bias_regularizer)
def build(self, input_shape):
bond_dim = input_shape[0][1]
atom_dim = input_shape[1][2]
self.kernel = self.add_weight(
shape=(bond_dim, atom_dim, self.units),
initializer=self.kernel_initializer,
regularizer=self.kernel_regularizer,
trainable=True,
name="W",
dtype="float32",
)
if self.use_bias:
self.bias = self.add_weight(
shape=(bond_dim, 1, self.units),
initializer=self.bias_initializer,
regularizer=self.bias_regularizer,
trainable=True,
name="b",
dtype="float32",
)
self.built = True
def call(self, inputs, training=False):
adjacency, features = inputs
# Aggregate information from neighbors
x = ops.matmul(adjacency, features[:, None])
# Apply linear transformation
x = ops.matmul(x, self.kernel)
if self.use_bias:
x += self.bias
# Reduce bond types dim
x_reduced = ops.sum(x, axis=1)
# Apply non-linear transformation
return self.activation(x_reduced)
编码器以分子的图邻接矩阵和特征矩阵为输入。这些特征通过图卷积层进行处理,然后展平并通过多个密集层处理,以导出 z_mean 和 log_var,即分子的潜在空间表示。
图卷积层:关系图卷积层实现了非线性变换的邻域聚合。我们可以这样定义这些层:
H_hat**(l+1) = σ(D_hat**(-1) * A_hat * H_hat**(l+1) * W**(l))
其中 σ 表示非线性变换(通常是 ReLU 激活),A 表示邻接张量,H_hat**(l) 表示 l-th 层的特征张量,D_hat**(-1) 表示 A_hat 的逆对角度张量,W_hat**(l) 表示 l-th 层的可训练权重张量。具体来说,对于每种键类型(关系),度张量在其对角线上表示连接到每个原子的键的数量。
来源:《WGAN-GP 与 R-GCN 用于生成小型分子图》)
解码器以潜在空间表示为输入,并预测相应分子的图邻接矩阵和特征矩阵。
def get_encoder(
gconv_units, latent_dim, adjacency_shape, feature_shape, dense_units, dropout_rate
):
adjacency = layers.Input(shape=adjacency_shape)
features = layers.Input(shape=feature_shape)
# Propagate through one or more graph convolutional layers
features_transformed = features
for units in gconv_units:
features_transformed = RelationalGraphConvLayer(units)(
[adjacency, features_transformed]
)
# Reduce 2-D representation of molecule to 1-D
x = layers.GlobalAveragePooling1D()(features_transformed)
# Propagate through one or more densely connected layers
for units in dense_units:
x = layers.Dense(units, activation="relu")(x)
x = layers.Dropout(dropout_rate)(x)
z_mean = layers.Dense(latent_dim, dtype="float32", name="z_mean")(x)
log_var = layers.Dense(latent_dim, dtype="float32", name="log_var")(x)
encoder = keras.Model([adjacency, features], [z_mean, log_var], name="encoder")
return encoder
def get_decoder(dense_units, dropout_rate, latent_dim, adjacency_shape, feature_shape):
latent_inputs = keras.Input(shape=(latent_dim,))
x = latent_inputs
for units in dense_units:
x = layers.Dense(units, activation="tanh")(x)
x = layers.Dropout(dropout_rate)(x)
# Map outputs of previous layer (x) to [continuous] adjacency tensors (x_adjacency)
x_adjacency = layers.Dense(np.prod(adjacency_shape))(x)
x_adjacency = layers.Reshape(adjacency_shape)(x_adjacency)
# Symmetrify tensors in the last two dimensions
x_adjacency = (x_adjacency + ops.transpose(x_adjacency, (0, 1, 3, 2))) / 2
x_adjacency = layers.Softmax(axis=1)(x_adjacency)
# Map outputs of previous layer (x) to [continuous] feature tensors (x_features)
x_features = layers.Dense(np.prod(feature_shape))(x)
x_features = layers.Reshape(feature_shape)(x_features)
x_features = layers.Softmax(axis=2)(x_features)
decoder = keras.Model(
latent_inputs, outputs=[x_adjacency, x_features], name="decoder"
)
return decoder
class Sampling(layers.Layer):
def __init__(self, seed=None, **kwargs):
super().__init__(**kwargs)
self.seed_generator = keras.random.SeedGenerator(seed)
def call(self, inputs):
z_mean, z_log_var = inputs
batch, dim = ops.shape(z_log_var)
epsilon = keras.random.normal(shape=(batch, dim), seed=self.seed_generator)
return z_mean + ops.exp(0.5 * z_log_var) * epsilon
该模型经过训练以优化四个损失:
分类交叉熵损失函数衡量模型的重建准确性。属性预测损失估计了将潜在表示通过属性预测模型运行后的预测属性与实际属性之间的均方误差。模型的属性预测通过二元交叉熵进行优化。梯度惩罚进一步由模型的属性(QED)预测进行引导。
梯度惩罚是 1-Lipschitz 连续性的替代软约束,作为原始神经网络的梯度裁剪方案的改进(“1-Lipschitz 连续性”意味着函数在每一点的梯度范数最多为 1)。它向损失函数添加了一个正则化项。
class MoleculeGenerator(keras.Model):
def __init__(self, encoder, decoder, max_len, seed=None, **kwargs):
super().__init__(**kwargs)
self.encoder = encoder
self.decoder = decoder
self.property_prediction_layer = layers.Dense(1)
self.max_len = max_len
self.seed_generator = keras.random.SeedGenerator(seed)
self.sampling_layer = Sampling(seed=seed)
self.train_total_loss_tracker = keras.metrics.Mean(name="train_total_loss")
self.val_total_loss_tracker = keras.metrics.Mean(name="val_total_loss")
def train_step(self, data):
adjacency_tensor, feature_tensor, qed_tensor = data[0]
graph_real = [adjacency_tensor, feature_tensor]
self.batch_size = ops.shape(qed_tensor)[0]
with tf.GradientTape() as tape:
z_mean, z_log_var, qed_pred, gen_adjacency, gen_features = self(
graph_real, training=True
)
graph_generated = [gen_adjacency, gen_features]
total_loss = self._compute_loss(
z_log_var, z_mean, qed_tensor, qed_pred, graph_real, graph_generated
)
grads = tape.gradient(total_loss, self.trainable_weights)
self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
self.train_total_loss_tracker.update_state(total_loss)
return {"loss": self.train_total_loss_tracker.result()}
def _compute_loss(
self, z_log_var, z_mean, qed_true, qed_pred, graph_real, graph_generated
):
adjacency_real, features_real = graph_real
adjacency_gen, features_gen = graph_generated
adjacency_loss = ops.mean(
ops.sum(
keras.losses.categorical_crossentropy(
adjacency_real, adjacency_gen, axis=1
),
axis=(1, 2),
)
)
features_loss = ops.mean(
ops.sum(
keras.losses.categorical_crossentropy(features_real, features_gen),
axis=(1),
)
)
kl_loss = -0.5 * ops.sum(
1 + z_log_var - z_mean**2 - ops.minimum(ops.exp(z_log_var), 1e6), 1
)
kl_loss = ops.mean(kl_loss)
property_loss = ops.mean(
keras.losses.binary_crossentropy(qed_true, ops.squeeze(qed_pred, axis=1))
)
graph_loss = self._gradient_penalty(graph_real, graph_generated)
return kl_loss + property_loss + graph_loss + adjacency_loss + features_loss
def _gradient_penalty(self, graph_real, graph_generated):
# Unpack graphs
adjacency_real, features_real = graph_real
adjacency_generated, features_generated = graph_generated
# Generate interpolated graphs (adjacency_interp and features_interp)
alpha = keras.random.uniform(shape=(self.batch_size,), seed=self.seed_generator)
alpha = ops.reshape(alpha, (self.batch_size, 1, 1, 1))
adjacency_interp = (adjacency_real * alpha) + (
1.0 - alpha
) * adjacency_generated
alpha = ops.reshape(alpha, (self.batch_size, 1, 1))
features_interp = (features_real * alpha) + (1.0 - alpha) * features_generated
# Compute the logits of interpolated graphs
with tf.GradientTape() as tape:
tape.watch(adjacency_interp)
tape.watch(features_interp)
_, _, logits, _, _ = self(
[adjacency_interp, features_interp], training=True
)
# Compute the gradients with respect to the interpolated graphs
grads = tape.gradient(logits, [adjacency_interp, features_interp])
# Compute the gradient penalty
grads_adjacency_penalty = (1 - ops.norm(grads[0], axis=1)) ** 2
grads_features_penalty = (1 - ops.norm(grads[1], axis=2)) ** 2
return ops.mean(
ops.mean(grads_adjacency_penalty, axis=(-2, -1))
+ ops.mean(grads_features_penalty, axis=(-1))
)
def inference(self, batch_size):
z = keras.random.normal(
shape=(batch_size, LATENT_DIM), seed=self.seed_generator
)
reconstruction_adjacency, reconstruction_features = model.decoder.predict(z)
# obtain one-hot encoded adjacency tensor
adjacency = ops.argmax(reconstruction_adjacency, axis=1)
adjacency = ops.one_hot(adjacency, num_classes=BOND_DIM, axis=1)
# Remove potential self-loops from adjacency
adjacency = adjacency * (1.0 - ops.eye(NUM_ATOMS, dtype="float32")[None, None])
# obtain one-hot encoded feature tensor
features = ops.argmax(reconstruction_features, axis=2)
features = ops.one_hot(features, num_classes=ATOM_DIM, axis=2)
return [
graph_to_molecule([adjacency[i].numpy(), features[i].numpy()])
for i in range(batch_size)
]
def call(self, inputs):
z_mean, log_var = self.encoder(inputs)
z = self.sampling_layer([z_mean, log_var])
gen_adjacency, gen_features = self.decoder(z)
property_pred = self.property_prediction_layer(z_mean)
return z_mean, log_var, property_pred, gen_adjacency, gen_features
vae_optimizer = keras.optimizers.Adam(learning_rate=VAE_LR)
encoder = get_encoder(
gconv_units=[9],
adjacency_shape=(BOND_DIM, NUM_ATOMS, NUM_ATOMS),
feature_shape=(NUM_ATOMS, ATOM_DIM),
latent_dim=LATENT_DIM,
dense_units=[512],
dropout_rate=0.0,
)
decoder = get_decoder(
dense_units=[128, 256, 512],
dropout_rate=0.2,
latent_dim=LATENT_DIM,
adjacency_shape=(BOND_DIM, NUM_ATOMS, NUM_ATOMS),
feature_shape=(NUM_ATOMS, ATOM_DIM),
)
model = MoleculeGenerator(encoder, decoder, MAX_MOLSIZE)
model.compile(vae_optimizer)
history = model.fit([adjacency_tensor, feature_tensor, qed_tensor], epochs=EPOCHS)
Epoch 1/10
250/250 ━━━━━━━━━━━━━━━━━━━━ 125s 481ms/step - loss: 2841.6440
Epoch 2/10
250/250 ━━━━━━━━━━━━━━━━━━━━ 113s 451ms/step - loss: 197.5607
Epoch 3/10
250/250 ━━━━━━━━━━━━━━━━━━━━ 115s 460ms/step - loss: 220.5820
Epoch 4/10
250/250 ━━━━━━━━━━━━━━━━━━━━ 109s 434ms/step - loss: 394.0200
Epoch 5/10
250/250 ━━━━━━━━━━━━━━━━━━━━ 109s 436ms/step - loss: 388.5954
Epoch 6/10
250/250 ━━━━━━━━━━━━━━━━━━━━ 108s 431ms/step - loss: 323.4093
Epoch 7/10
250/250 ━━━━━━━━━━━━━━━━━━━━ 108s 432ms/step - loss: 278.2234
Epoch 8/10
250/250 ━━━━━━━━━━━━━━━━━━━━ 110s 439ms/step - loss: 393.4183
Epoch 9/10
250/250 ━━━━━━━━━━━━━━━━━━━━ 114s 456ms/step - loss: 523.3671
Epoch 10/10
250/250 ━━━━━━━━━━━━━━━━━━━━ 111s 445ms/step - loss: 223.5443
---
## Inference
We use our model to generate new valid molecules from different points of the latent space.
### Generate unique Molecules with the model
```python
molecules = model.inference(1000)
MolsToGridImage(
[m for m in molecules if m is not None][:1000], molsPerRow=5, subImgSize=(260, 160)
)
def plot_latent(vae, data, labels):
# display a 2D plot of the property in the latent space
z_mean, _ = vae.encoder.predict(data)
plt.figure(figsize=(12, 10))
plt.scatter(z_mean[:, 0], z_mean[:, 1], c=labels)
plt.colorbar()
plt.xlabel("z[0]")
plt.ylabel("z[1]")
plt.show()
plot_latent(model, [adjacency_tensor[:8000], feature_tensor[:8000]], qed_tensor[:8000])