作者: fchollet
创建日期 2019/03/01
最后修改 2023/06/25
描述: 从头开始编写 Layer
和 Model
对象的完整指南。
本指南涵盖了构建自己的子类化层和模型所需的全部知识。特别是,你将学习以下特性:
Layer
类add_weight()
方法build()
方法add_loss()
方法call()
方法中的 training
参数call()
方法中的 mask
参数让我们开始吧。
import numpy as np
import keras
from keras import ops
from keras import layers
Layer
类:状态(权重)与计算的结合Keras 中的核心抽象之一是 Layer
类。一个层既封装了状态(层的“权重”),也封装了从输入到输出的转换(一个“调用”,即层的正向传播)。
这是一个密集连接层。它有两个状态变量:变量 w
和 b
。
class Linear(keras.layers.Layer):
def __init__(self, units=32, input_dim=32):
super().__init__()
self.w = self.add_weight(
shape=(input_dim, units),
initializer="random_normal",
trainable=True,
)
self.b = self.add_weight(shape=(units,), initializer="zeros", trainable=True)
def call(self, inputs):
return ops.matmul(inputs, self.w) + self.b
你可以通过在一些张量输入上调用层来使用它,非常类似于 Python 函数。
x = ops.ones((2, 2))
linear_layer = Linear(4, 2)
y = linear_layer(x)
print(y)
[[ 0.085416 -0.06821361 -0.00741937 -0.03429271]
[ 0.085416 -0.06821361 -0.00741937 -0.03429271]]
请注意,权重 w
和 b
在被设置为层属性后会自动被层跟踪。
assert linear_layer.weights == [linear_layer.w, linear_layer.b]
除了可训练权重,你还可以向层添加不可训练权重。这些权重在进行反向传播(即训练层时)时不会被考虑。
以下是如何添加和使用不可训练权重:
class ComputeSum(keras.layers.Layer):
def __init__(self, input_dim):
super().__init__()
self.total = self.add_weight(
initializer="zeros", shape=(input_dim,), trainable=False
)
def call(self, inputs):
self.total.assign_add(ops.sum(inputs, axis=0))
return self.total
x = ops.ones((2, 2))
my_sum = ComputeSum(2)
y = my_sum(x)
print(y.numpy())
y = my_sum(x)
print(y.numpy())
[2. 2.]
[4. 4.]
它属于 layer.weights
,但被归类为不可训练权重。
print("weights:", len(my_sum.weights))
print("non-trainable weights:", len(my_sum.non_trainable_weights))
# It's not included in the trainable weights:
print("trainable_weights:", my_sum.trainable_weights)
weights: 1
non-trainable weights: 1
trainable_weights: []
我们上面定义的 Linear
层接收一个 input_dim
参数,用于在 __init__()
中计算权重 w
和 b
的形状。
class Linear(keras.layers.Layer):
def __init__(self, units=32, input_dim=32):
super().__init__()
self.w = self.add_weight(
shape=(input_dim, units),
initializer="random_normal",
trainable=True,
)
self.b = self.add_weight(shape=(units,), initializer="zeros", trainable=True)
def call(self, inputs):
return ops.matmul(inputs, self.w) + self.b
在许多情况下,你可能无法提前知道输入的尺寸,并且希望在实例化层之后,当该值已知时再延迟创建权重。
在 Keras API 中,我们建议在层的 build(self, inputs_shape)
方法中创建层权重。像这样:
class Linear(keras.layers.Layer):
def __init__(self, units=32):
super().__init__()
self.units = units
def build(self, input_shape):
self.w = self.add_weight(
shape=(input_shape[-1], self.units),
initializer="random_normal",
trainable=True,
)
self.b = self.add_weight(
shape=(self.units,), initializer="random_normal", trainable=True
)
def call(self, inputs):
return ops.matmul(inputs, self.w) + self.b
你的层的 __call__()
方法会在第一次被调用时自动运行 build 方法。现在你拥有一个延迟创建权重且更易于使用的层。
# At instantiation, we don't know on what inputs this is going to get called
linear_layer = Linear(32)
# The layer's weights are created dynamically the first time the layer is called
y = linear_layer(x)
如上所示,单独实现 build()
方法能很好地将“仅创建一次权重”与“在每次调用中使用权重”这两步分离开来。
如果你将一个 Layer 实例赋值给另一个 Layer 的属性,则外部层将开始跟踪内部层创建的权重。
我们建议在 __init__()
方法中创建此类子层,并让第一次 __call__()
调用触发其权重的构建。
class MLPBlock(keras.layers.Layer):
def __init__(self):
super().__init__()
self.linear_1 = Linear(32)
self.linear_2 = Linear(32)
self.linear_3 = Linear(1)
def call(self, inputs):
x = self.linear_1(inputs)
x = keras.activations.relu(x)
x = self.linear_2(x)
x = keras.activations.relu(x)
return self.linear_3(x)
mlp = MLPBlock()
y = mlp(ops.ones(shape=(3, 64))) # The first call to the `mlp` will create the weights
print("weights:", len(mlp.weights))
print("trainable weights:", len(mlp.trainable_weights))
weights: 6
trainable weights: 6
只要层仅使用来自 keras.ops
命名空间(或其它 Keras 命名空间,例如 keras.activations
、keras.random
或 keras.layers
)的 API,那么它就可以与任何后端一起使用——TensorFlow、JAX 或 PyTorch。
到目前为止,你在本指南中看到的所有层都可以与所有 Keras 后端一起使用。
keras.ops
命名空间为你提供了访问:
ops.matmul
、ops.sum
、ops.reshape
、ops.stack
等。ops.softmax
、ops.conv
、ops.binary_crossentropy
、ops.relu
等。你也可以在层中使用后端原生的 API(例如 tf.nn
函数),但如果这样做,你的层将只能与该后端一起使用。例如,你可以使用 jax.numpy
编写以下 JAX 特定的层:
import jax
class Linear(keras.layers.Layer):
...
def call(self, inputs):
return jax.numpy.matmul(inputs, self.w) + self.b
这将是等效的 TensorFlow 特定层:
import tensorflow as tf
class Linear(keras.layers.Layer):
...
def call(self, inputs):
return tf.matmul(inputs, self.w) + self.b
这将是等效的 PyTorch 特定层:
import torch
class Linear(keras.layers.Layer):
...
def call(self, inputs):
return torch.matmul(inputs, self.w) + self.b
因为跨后端兼容性是一个非常有用的特性,我们强烈建议你始终只利用 Keras API 来使你的层与后端无关。
add_loss()
方法在编写层的 call()
方法时,你可以创建损失张量,这些张量稍后在编写训练循环时会用到。这可以通过调用 self.add_loss(value)
来实现。
# A layer that creates an activity regularization loss
class ActivityRegularizationLayer(keras.layers.Layer):
def __init__(self, rate=1e-2):
super().__init__()
self.rate = rate
def call(self, inputs):
self.add_loss(self.rate * ops.mean(inputs))
return inputs
这些损失(包括由任何内部层创建的损失)可以通过 layer.losses
进行检索。此属性在顶层层的每次 __call__()
开始时重置,因此 layer.losses
始终包含最后一次前向传播期间创建的损失值。
class OuterLayer(keras.layers.Layer):
def __init__(self):
super().__init__()
self.activity_reg = ActivityRegularizationLayer(1e-2)
def call(self, inputs):
return self.activity_reg(inputs)
layer = OuterLayer()
assert len(layer.losses) == 0 # No losses yet since the layer has never been called
_ = layer(ops.zeros((1, 1)))
assert len(layer.losses) == 1 # We created one loss value
# `layer.losses` gets reset at the start of each __call__
_ = layer(ops.zeros((1, 1)))
assert len(layer.losses) == 1 # This is the loss created during the call above
此外,loss
属性还包含为任何内部层权重创建的正则化损失。
class OuterLayerWithKernelRegularizer(keras.layers.Layer):
def __init__(self):
super().__init__()
self.dense = keras.layers.Dense(
32, kernel_regularizer=keras.regularizers.l2(1e-3)
)
def call(self, inputs):
return self.dense(inputs)
layer = OuterLayerWithKernelRegularizer()
_ = layer(ops.zeros((1, 1)))
# This is `1e-3 * sum(layer.dense.kernel ** 2)`,
# created by the `kernel_regularizer` above.
print(layer.losses)
[Array(0.00217911, dtype=float32)]
这些损失应在编写自定义训练循环时予以考虑。
它们也可以与 fit()
无缝协作(如果存在主损失,它们会被自动求和并添加到主损失中)。
inputs = keras.Input(shape=(3,))
outputs = ActivityRegularizationLayer()(inputs)
model = keras.Model(inputs, outputs)
# If there is a loss passed in `compile`, the regularization
# losses get added to it
model.compile(optimizer="adam", loss="mse")
model.fit(np.random.random((2, 3)), np.random.random((2, 3)))
# It's also possible not to pass any loss in `compile`,
# since the model already has a loss to minimize, via the `add_loss`
# call during the forward pass!
model.compile(optimizer="adam")
model.fit(np.random.random((2, 3)), np.random.random((2, 3)))
1/1 ━━━━━━━━━━━━━━━━━━━━ 0s 60ms/step - loss: 0.2650
1/1 ━━━━━━━━━━━━━━━━━━━━ 0s 15ms/step - loss: 0.0050
<keras.src.callbacks.history.History at 0x146f71960>
如果你需要自定义层可以作为 函数式模型 的一部分进行序列化,你可以选择性地实现 get_config()
方法。
class Linear(keras.layers.Layer):
def __init__(self, units=32):
super().__init__()
self.units = units
def build(self, input_shape):
self.w = self.add_weight(
shape=(input_shape[-1], self.units),
initializer="random_normal",
trainable=True,
)
self.b = self.add_weight(
shape=(self.units,), initializer="random_normal", trainable=True
)
def call(self, inputs):
return ops.matmul(inputs, self.w) + self.b
def get_config(self):
return {"units": self.units}
# Now you can recreate the layer from its config:
layer = Linear(64)
config = layer.get_config()
print(config)
new_layer = Linear.from_config(config)
{'units': 64}
注意,基类 Layer
的 __init__()
方法接受一些关键字参数,特别是 name
和 dtype
。一个好的实践是在 __init__()
中将这些参数传递给父类,并将它们包含在层的配置中。
class Linear(keras.layers.Layer):
def __init__(self, units=32, **kwargs):
super().__init__(**kwargs)
self.units = units
def build(self, input_shape):
self.w = self.add_weight(
shape=(input_shape[-1], self.units),
initializer="random_normal",
trainable=True,
)
self.b = self.add_weight(
shape=(self.units,), initializer="random_normal", trainable=True
)
def call(self, inputs):
return ops.matmul(inputs, self.w) + self.b
def get_config(self):
config = super().get_config()
config.update({"units": self.units})
return config
layer = Linear(64)
config = layer.get_config()
print(config)
new_layer = Linear.from_config(config)
{'name': 'linear_7', 'trainable': True, 'dtype': 'float32', 'units': 64}
如果你需要更灵活地从配置中反序列化层,你还可以覆盖 from_config()
类方法。这是 from_config()
的基本实现:
def from_config(cls, config):
return cls(**config)
要了解有关序列化和保存的更多信息,请参阅完整的保存和序列化模型指南。
call()
方法中的特权参数 training
有些层,特别是 BatchNormalization
层和 Dropout
层,在训练和推理期间有不同的行为。对于此类层,标准做法是在 call()
方法中公开一个 training
(布尔类型)参数。
通过在 call()
中公开此参数,你可以使内置的训练和评估循环(例如 fit()
)在训练和推理中正确使用该层。
class CustomDropout(keras.layers.Layer):
def __init__(self, rate, **kwargs):
super().__init__(**kwargs)
self.rate = rate
self.seed_generator = keras.random.SeedGenerator(1337)
def call(self, inputs, training=None):
if training:
return keras.random.dropout(
inputs, rate=self.rate, seed=self.seed_generator
)
return inputs
call()
方法中的特权参数 mask
call()
支持的另一个特权参数是 mask
参数。
你会在所有 Keras RNN 层中找到它。掩码是一个布尔张量(输入中每个时间步对应一个布尔值),用于在处理时间序列数据时跳过某些输入时间步。
当先前的层生成掩码时,Keras 会自动将正确的 mask
参数传递给支持此参数的层的 __call__()
方法。生成掩码的层是配置了 mask_zero=True
的 Embedding
层和 Masking
层。
Model
类通常,你会使用 Layer
类定义内部计算块,而使用 Model
类定义外部模型——即你将要训练的对象。
例如,在 ResNet50 模型中,你会有几个继承自 Layer
的 ResNet 块,以及一个包含整个 ResNet50 网络的 Model
。
Model
类拥有与 Layer
相同的 API,但有以下区别:
model.fit()
、model.evaluate()
、model.predict()
)。model.layers
属性公开其内部层的列表。save()
、save_weights()
...)。实际上,Layer
类对应于文献中我们称之为“层”(如“卷积层”或“循环层”)或“块”(如“ResNet 块”或“Inception 块”)的概念。
同时,Model
类对应于文献中称为“模型”(如“深度学习模型”)或“网络”(如“深度神经网络”)的概念。
所以如果你想知道,“我应该使用 Layer
类还是 Model
类?”,问问自己:我是否需要调用 fit()
?我是否需要调用 save()
?如果是,就使用 Model
。如果不是(因为你的类只是一个更大系统中的一个块,或者因为你自己编写训练和保存代码),则使用 Layer
。
例如,我们可以使用上面迷你 ResNet 的例子来构建一个 Model
,我们可以使用 fit()
来训练它,并使用 save_weights()
来保存它。
class ResNet(keras.Model):
def __init__(self, num_classes=1000):
super().__init__()
self.block_1 = ResNetBlock()
self.block_2 = ResNetBlock()
self.global_pool = layers.GlobalAveragePooling2D()
self.classifier = Dense(num_classes)
def call(self, inputs):
x = self.block_1(inputs)
x = self.block_2(x)
x = self.global_pool(x)
return self.classifier(x)
resnet = ResNet()
dataset = ...
resnet.fit(dataset, epochs=10)
resnet.save(filepath.keras)
到目前为止,你已经学到了什么:
Layer
封装了状态(在 __init__()
或 build()
中创建)和一些计算(在 call()
中定义)。jax.numpy
、torch.nn
或 tf.nn
),但这样你的层将只能与该特定后端一起使用。add_loss()
创建和跟踪损失(通常是正则化损失)。Model
。Model
就像一个 Layer
,但增加了训练和序列化功能。让我们将所有这些内容结合起来,创建一个端到端的示例:我们将以与后端无关的方式实现一个变分自编码器(VAE)——这样它就可以在 TensorFlow、JAX 和 PyTorch 中以相同的方式运行。我们将在 MNIST 数字数据集上训练它。
我们的 VAE 将是 Model
的一个子类,由继承自 Layer
的层嵌套组合而成。它将包含正则化损失(KL 散度)。
class Sampling(layers.Layer):
"""Uses (z_mean, z_log_var) to sample z, the vector encoding a digit."""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.seed_generator = keras.random.SeedGenerator(1337)
def call(self, inputs):
z_mean, z_log_var = inputs
batch = ops.shape(z_mean)[0]
dim = ops.shape(z_mean)[1]
epsilon = keras.random.normal(shape=(batch, dim), seed=self.seed_generator)
return z_mean + ops.exp(0.5 * z_log_var) * epsilon
class Encoder(layers.Layer):
"""Maps MNIST digits to a triplet (z_mean, z_log_var, z)."""
def __init__(self, latent_dim=32, intermediate_dim=64, name="encoder", **kwargs):
super().__init__(name=name, **kwargs)
self.dense_proj = layers.Dense(intermediate_dim, activation="relu")
self.dense_mean = layers.Dense(latent_dim)
self.dense_log_var = layers.Dense(latent_dim)
self.sampling = Sampling()
def call(self, inputs):
x = self.dense_proj(inputs)
z_mean = self.dense_mean(x)
z_log_var = self.dense_log_var(x)
z = self.sampling((z_mean, z_log_var))
return z_mean, z_log_var, z
class Decoder(layers.Layer):
"""Converts z, the encoded digit vector, back into a readable digit."""
def __init__(self, original_dim, intermediate_dim=64, name="decoder", **kwargs):
super().__init__(name=name, **kwargs)
self.dense_proj = layers.Dense(intermediate_dim, activation="relu")
self.dense_output = layers.Dense(original_dim, activation="sigmoid")
def call(self, inputs):
x = self.dense_proj(inputs)
return self.dense_output(x)
class VariationalAutoEncoder(keras.Model):
"""Combines the encoder and decoder into an end-to-end model for training."""
def __init__(
self,
original_dim,
intermediate_dim=64,
latent_dim=32,
name="autoencoder",
**kwargs
):
super().__init__(name=name, **kwargs)
self.original_dim = original_dim
self.encoder = Encoder(latent_dim=latent_dim, intermediate_dim=intermediate_dim)
self.decoder = Decoder(original_dim, intermediate_dim=intermediate_dim)
def call(self, inputs):
z_mean, z_log_var, z = self.encoder(inputs)
reconstructed = self.decoder(z)
# Add KL divergence regularization loss.
kl_loss = -0.5 * ops.mean(
z_log_var - ops.square(z_mean) - ops.exp(z_log_var) + 1
)
self.add_loss(kl_loss)
return reconstructed
让我们使用 fit()
API 在 MNIST 上训练它:
(x_train, _), _ = keras.datasets.mnist.load_data()
x_train = x_train.reshape(60000, 784).astype("float32") / 255
original_dim = 784
vae = VariationalAutoEncoder(784, 64, 32)
optimizer = keras.optimizers.Adam(learning_rate=1e-3)
vae.compile(optimizer, loss=keras.losses.MeanSquaredError())
vae.fit(x_train, x_train, epochs=2, batch_size=64)
Epoch 1/2
938/938 ━━━━━━━━━━━━━━━━━━━━ 2s 1ms/step - loss: 0.0942
Epoch 2/2
938/938 ━━━━━━━━━━━━━━━━━━━━ 1s 859us/step - loss: 0.0677
<keras.src.callbacks.history.History at 0x146fe62f0>