Model.train_step
作者: A_K_Nain
创建日期 2020/05/9
上次修改日期 2023/08/3
描述:Wasserstein GAN 结合梯度惩罚的实现。
最初的 Wasserstein GAN 利用 Wasserstein 距离生成一个值函数,该函数比原始 GAN 论文中使用的值函数具有更好的理论特性。WGAN 要求判别器(也称为评论家)位于 1-Lipschitz 函数空间内。作者提出了权重裁剪的想法来实现此约束。虽然权重裁剪有效,但它可能是执行 1-Lipschitz 约束的一种有问题的途径,并且可能导致不良行为,例如,非常深的 WGAN 判别器(评论家)通常无法收敛。
该 WGAN-GP 方法提出了一种替代权重裁剪的方法来确保平滑训练。作者没有裁剪权重,而是通过添加一个损失项来进行“梯度惩罚”,该损失项使判别器梯度的 L2 范数保持接近 1。
import os
os.environ["KERAS_BACKEND"] = "tensorflow"
import keras
import tensorflow as tf
from keras import layers
为了演示如何训练 WGAN-GP,我们将使用 Fashion-MNIST 数据集。此数据集中的每个样本都是一个 28x28 的灰度图像,与来自 10 个类别(例如,裤子、套头衫、运动鞋等)的标签相关联。
IMG_SHAPE = (28, 28, 1)
BATCH_SIZE = 512
# Size of the noise vector
noise_dim = 128
fashion_mnist = keras.datasets.fashion_mnist
(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()
print(f"Number of examples: {len(train_images)}")
print(f"Shape of the images in the dataset: {train_images.shape[1:]}")
# Reshape each sample to (28, 28, 1) and normalize the pixel values in the [-1, 1] range
train_images = train_images.reshape(train_images.shape[0], *IMG_SHAPE).astype("float32")
train_images = (train_images - 127.5) / 127.5
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-labels-idx1-ubyte.gz
29515/29515 ━━━━━━━━━━━━━━━━━━━━ 0s 0us/step
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-images-idx3-ubyte.gz
26421880/26421880 ━━━━━━━━━━━━━━━━━━━━ 0s 0us/step
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-labels-idx1-ubyte.gz
5148/5148 ━━━━━━━━━━━━━━━━━━━━ 0s 0us/step
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-images-idx3-ubyte.gz
4422102/4422102 ━━━━━━━━━━━━━━━━━━━━ 0s 0us/step
Number of examples: 60000
Shape of the images in the dataset: (28, 28)
数据集中的样本具有 (28, 28, 1) 的形状。因为我们将使用步长卷积,所以这可能导致形状具有奇数维。例如,(28, 28) -> Conv_s2 -> (14, 14) -> Conv_s2 -> (7, 7) -> Conv_s2 ->(3, 3)
。
在网络的生成器部分执行上采样时,如果我们不小心,我们将无法获得与原始图像相同的输入形状。为了避免这种情况,我们将执行更简单的事情:- 在判别器中:“零填充”输入以将每个样本的形状更改为 (32, 32, 1)
;以及 - 生成器:裁剪最终输出以使其形状与输入形状匹配。
def conv_block(
x,
filters,
activation,
kernel_size=(3, 3),
strides=(1, 1),
padding="same",
use_bias=True,
use_bn=False,
use_dropout=False,
drop_value=0.5,
):
x = layers.Conv2D(
filters, kernel_size, strides=strides, padding=padding, use_bias=use_bias
)(x)
if use_bn:
x = layers.BatchNormalization()(x)
x = activation(x)
if use_dropout:
x = layers.Dropout(drop_value)(x)
return x
def get_discriminator_model():
img_input = layers.Input(shape=IMG_SHAPE)
# Zero pad the input to make the input images size to (32, 32, 1).
x = layers.ZeroPadding2D((2, 2))(img_input)
x = conv_block(
x,
64,
kernel_size=(5, 5),
strides=(2, 2),
use_bn=False,
use_bias=True,
activation=layers.LeakyReLU(0.2),
use_dropout=False,
drop_value=0.3,
)
x = conv_block(
x,
128,
kernel_size=(5, 5),
strides=(2, 2),
use_bn=False,
activation=layers.LeakyReLU(0.2),
use_bias=True,
use_dropout=True,
drop_value=0.3,
)
x = conv_block(
x,
256,
kernel_size=(5, 5),
strides=(2, 2),
use_bn=False,
activation=layers.LeakyReLU(0.2),
use_bias=True,
use_dropout=True,
drop_value=0.3,
)
x = conv_block(
x,
512,
kernel_size=(5, 5),
strides=(2, 2),
use_bn=False,
activation=layers.LeakyReLU(0.2),
use_bias=True,
use_dropout=False,
drop_value=0.3,
)
x = layers.Flatten()(x)
x = layers.Dropout(0.2)(x)
x = layers.Dense(1)(x)
d_model = keras.models.Model(img_input, x, name="discriminator")
return d_model
d_model = get_discriminator_model()
d_model.summary()
Model: "discriminator"
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓ ┃ Layer (type) ┃ Output Shape ┃ Param # ┃ ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩ │ input_layer (InputLayer) │ (None, 28, 28, 1) │ 0 │ ├─────────────────────────────────┼───────────────────────────┼────────────┤ │ zero_padding2d (ZeroPadding2D) │ (None, 32, 32, 1) │ 0 │ ├─────────────────────────────────┼───────────────────────────┼────────────┤ │ conv2d (Conv2D) │ (None, 16, 16, 64) │ 1,664 │ ├─────────────────────────────────┼───────────────────────────┼────────────┤ │ leaky_re_lu (LeakyReLU) │ (None, 16, 16, 64) │ 0 │ ├─────────────────────────────────┼───────────────────────────┼────────────┤ │ conv2d_1 (Conv2D) │ (None, 8, 8, 128) │ 204,928 │ ├─────────────────────────────────┼───────────────────────────┼────────────┤ │ leaky_re_lu_1 (LeakyReLU) │ (None, 8, 8, 128) │ 0 │ ├─────────────────────────────────┼───────────────────────────┼────────────┤ │ dropout (Dropout) │ (None, 8, 8, 128) │ 0 │ ├─────────────────────────────────┼───────────────────────────┼────────────┤ │ conv2d_2 (Conv2D) │ (None, 4, 4, 256) │ 819,456 │ ├─────────────────────────────────┼───────────────────────────┼────────────┤ │ leaky_re_lu_2 (LeakyReLU) │ (None, 4, 4, 256) │ 0 │ ├─────────────────────────────────┼───────────────────────────┼────────────┤ │ dropout_1 (Dropout) │ (None, 4, 4, 256) │ 0 │ ├─────────────────────────────────┼───────────────────────────┼────────────┤ │ conv2d_3 (Conv2D) │ (None, 2, 2, 512) │ 3,277,312 │ ├─────────────────────────────────┼───────────────────────────┼────────────┤ │ leaky_re_lu_3 (LeakyReLU) │ (None, 2, 2, 512) │ 0 │ ├─────────────────────────────────┼───────────────────────────┼────────────┤ │ flatten (Flatten) │ (None, 2048) │ 0 │ ├─────────────────────────────────┼───────────────────────────┼────────────┤ │ dropout_2 (Dropout) │ (None, 2048) │ 0 │ ├─────────────────────────────────┼───────────────────────────┼────────────┤ │ dense (Dense) │ (None, 1) │ 2,049 │ └─────────────────────────────────┴───────────────────────────┴────────────┘
Total params: 4,305,409 (16.42 MB)
Trainable params: 4,305,409 (16.42 MB)
Non-trainable params: 0 (0.00 B)
def upsample_block(
x,
filters,
activation,
kernel_size=(3, 3),
strides=(1, 1),
up_size=(2, 2),
padding="same",
use_bn=False,
use_bias=True,
use_dropout=False,
drop_value=0.3,
):
x = layers.UpSampling2D(up_size)(x)
x = layers.Conv2D(
filters, kernel_size, strides=strides, padding=padding, use_bias=use_bias
)(x)
if use_bn:
x = layers.BatchNormalization()(x)
if activation:
x = activation(x)
if use_dropout:
x = layers.Dropout(drop_value)(x)
return x
def get_generator_model():
noise = layers.Input(shape=(noise_dim,))
x = layers.Dense(4 * 4 * 256, use_bias=False)(noise)
x = layers.BatchNormalization()(x)
x = layers.LeakyReLU(0.2)(x)
x = layers.Reshape((4, 4, 256))(x)
x = upsample_block(
x,
128,
layers.LeakyReLU(0.2),
strides=(1, 1),
use_bias=False,
use_bn=True,
padding="same",
use_dropout=False,
)
x = upsample_block(
x,
64,
layers.LeakyReLU(0.2),
strides=(1, 1),
use_bias=False,
use_bn=True,
padding="same",
use_dropout=False,
)
x = upsample_block(
x, 1, layers.Activation("tanh"), strides=(1, 1), use_bias=False, use_bn=True
)
# At this point, we have an output which has the same shape as the input, (32, 32, 1).
# We will use a Cropping2D layer to make it (28, 28, 1).
x = layers.Cropping2D((2, 2))(x)
g_model = keras.models.Model(noise, x, name="generator")
return g_model
g_model = get_generator_model()
g_model.summary()
Model: "generator"
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓ ┃ Layer (type) ┃ Output Shape ┃ Param # ┃ ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩ │ input_layer_1 (InputLayer) │ (None, 128) │ 0 │ ├─────────────────────────────────┼───────────────────────────┼────────────┤ │ dense_1 (Dense) │ (None, 4096) │ 524,288 │ ├─────────────────────────────────┼───────────────────────────┼────────────┤ │ batch_normalization │ (None, 4096) │ 16,384 │ │ (BatchNormalization) │ │ │ ├─────────────────────────────────┼───────────────────────────┼────────────┤ │ leaky_re_lu_4 (LeakyReLU) │ (None, 4096) │ 0 │ ├─────────────────────────────────┼───────────────────────────┼────────────┤ │ reshape (Reshape) │ (None, 4, 4, 256) │ 0 │ ├─────────────────────────────────┼───────────────────────────┼────────────┤ │ up_sampling2d (UpSampling2D) │ (None, 8, 8, 256) │ 0 │ ├─────────────────────────────────┼───────────────────────────┼────────────┤ │ conv2d_4 (Conv2D) │ (None, 8, 8, 128) │ 294,912 │ ├─────────────────────────────────┼───────────────────────────┼────────────┤ │ batch_normalization_1 │ (None, 8, 8, 128) │ 512 │ │ (BatchNormalization) │ │ │ ├─────────────────────────────────┼───────────────────────────┼────────────┤ │ leaky_re_lu_5 (LeakyReLU) │ (None, 8, 8, 128) │ 0 │ ├─────────────────────────────────┼───────────────────────────┼────────────┤ │ up_sampling2d_1 (UpSampling2D) │ (None, 16, 16, 128) │ 0 │ ├─────────────────────────────────┼───────────────────────────┼────────────┤ │ conv2d_5 (Conv2D) │ (None, 16, 16, 64) │ 73,728 │ ├─────────────────────────────────┼───────────────────────────┼────────────┤ │ batch_normalization_2 │ (None, 16, 16, 64) │ 256 │ │ (BatchNormalization) │ │ │ ├─────────────────────────────────┼───────────────────────────┼────────────┤ │ leaky_re_lu_6 (LeakyReLU) │ (None, 16, 16, 64) │ 0 │ ├─────────────────────────────────┼───────────────────────────┼────────────┤ │ up_sampling2d_2 (UpSampling2D) │ (None, 32, 32, 64) │ 0 │ ├─────────────────────────────────┼───────────────────────────┼────────────┤ │ conv2d_6 (Conv2D) │ (None, 32, 32, 1) │ 576 │ ├─────────────────────────────────┼───────────────────────────┼────────────┤ │ batch_normalization_3 │ (None, 32, 32, 1) │ 4 │ │ (BatchNormalization) │ │ │ ├─────────────────────────────────┼───────────────────────────┼────────────┤ │ activation (Activation) │ (None, 32, 32, 1) │ 0 │ ├─────────────────────────────────┼───────────────────────────┼────────────┤ │ cropping2d (Cropping2D) │ (None, 28, 28, 1) │ 0 │ └─────────────────────────────────┴───────────────────────────┴────────────┘
Total params: 910,660 (3.47 MB)
Trainable params: 902,082 (3.44 MB)
Non-trainable params: 8,578 (33.51 KB)
现在我们已经定义了生成器和判别器,是时候实现 WGAN-GP 模型了。我们还将重写 train_step
以进行训练。
class WGAN(keras.Model):
def __init__(
self,
discriminator,
generator,
latent_dim,
discriminator_extra_steps=3,
gp_weight=10.0,
):
super().__init__()
self.discriminator = discriminator
self.generator = generator
self.latent_dim = latent_dim
self.d_steps = discriminator_extra_steps
self.gp_weight = gp_weight
def compile(self, d_optimizer, g_optimizer, d_loss_fn, g_loss_fn):
super().compile()
self.d_optimizer = d_optimizer
self.g_optimizer = g_optimizer
self.d_loss_fn = d_loss_fn
self.g_loss_fn = g_loss_fn
def gradient_penalty(self, batch_size, real_images, fake_images):
"""Calculates the gradient penalty.
This loss is calculated on an interpolated image
and added to the discriminator loss.
"""
# Get the interpolated image
alpha = tf.random.uniform([batch_size, 1, 1, 1], 0.0, 1.0)
diff = fake_images - real_images
interpolated = real_images + alpha * diff
with tf.GradientTape() as gp_tape:
gp_tape.watch(interpolated)
# 1. Get the discriminator output for this interpolated image.
pred = self.discriminator(interpolated, training=True)
# 2. Calculate the gradients w.r.t to this interpolated image.
grads = gp_tape.gradient(pred, [interpolated])[0]
# 3. Calculate the norm of the gradients.
norm = tf.sqrt(tf.reduce_sum(tf.square(grads), axis=[1, 2, 3]))
gp = tf.reduce_mean((norm - 1.0) ** 2)
return gp
def train_step(self, real_images):
if isinstance(real_images, tuple):
real_images = real_images[0]
# Get the batch size
batch_size = tf.shape(real_images)[0]
# For each batch, we are going to perform the
# following steps as laid out in the original paper:
# 1. Train the generator and get the generator loss
# 2. Train the discriminator and get the discriminator loss
# 3. Calculate the gradient penalty
# 4. Multiply this gradient penalty with a constant weight factor
# 5. Add the gradient penalty to the discriminator loss
# 6. Return the generator and discriminator losses as a loss dictionary
# Train the discriminator first. The original paper recommends training
# the discriminator for `x` more steps (typically 5) as compared to
# one step of the generator. Here we will train it for 3 extra steps
# as compared to 5 to reduce the training time.
for i in range(self.d_steps):
# Get the latent vector
random_latent_vectors = tf.random.normal(
shape=(batch_size, self.latent_dim)
)
with tf.GradientTape() as tape:
# Generate fake images from the latent vector
fake_images = self.generator(random_latent_vectors, training=True)
# Get the logits for the fake images
fake_logits = self.discriminator(fake_images, training=True)
# Get the logits for the real images
real_logits = self.discriminator(real_images, training=True)
# Calculate the discriminator loss using the fake and real image logits
d_cost = self.d_loss_fn(real_img=real_logits, fake_img=fake_logits)
# Calculate the gradient penalty
gp = self.gradient_penalty(batch_size, real_images, fake_images)
# Add the gradient penalty to the original discriminator loss
d_loss = d_cost + gp * self.gp_weight
# Get the gradients w.r.t the discriminator loss
d_gradient = tape.gradient(d_loss, self.discriminator.trainable_variables)
# Update the weights of the discriminator using the discriminator optimizer
self.d_optimizer.apply_gradients(
zip(d_gradient, self.discriminator.trainable_variables)
)
# Train the generator
# Get the latent vector
random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))
with tf.GradientTape() as tape:
# Generate fake images using the generator
generated_images = self.generator(random_latent_vectors, training=True)
# Get the discriminator logits for fake images
gen_img_logits = self.discriminator(generated_images, training=True)
# Calculate the generator loss
g_loss = self.g_loss_fn(gen_img_logits)
# Get the gradients w.r.t the generator loss
gen_gradient = tape.gradient(g_loss, self.generator.trainable_variables)
# Update the weights of the generator using the generator optimizer
self.g_optimizer.apply_gradients(
zip(gen_gradient, self.generator.trainable_variables)
)
return {"d_loss": d_loss, "g_loss": g_loss}
class GANMonitor(keras.callbacks.Callback):
def __init__(self, num_img=6, latent_dim=128):
self.num_img = num_img
self.latent_dim = latent_dim
def on_epoch_end(self, epoch, logs=None):
random_latent_vectors = tf.random.normal(shape=(self.num_img, self.latent_dim))
generated_images = self.model.generator(random_latent_vectors)
generated_images = (generated_images * 127.5) + 127.5
for i in range(self.num_img):
img = generated_images[i].numpy()
img = keras.utils.array_to_img(img)
img.save("generated_img_{i}_{epoch}.png".format(i=i, epoch=epoch))
# Instantiate the optimizer for both networks
# (learning_rate=0.0002, beta_1=0.5 are recommended)
generator_optimizer = keras.optimizers.Adam(
learning_rate=0.0002, beta_1=0.5, beta_2=0.9
)
discriminator_optimizer = keras.optimizers.Adam(
learning_rate=0.0002, beta_1=0.5, beta_2=0.9
)
# Define the loss functions for the discriminator,
# which should be (fake_loss - real_loss).
# We will add the gradient penalty later to this loss function.
def discriminator_loss(real_img, fake_img):
real_loss = tf.reduce_mean(real_img)
fake_loss = tf.reduce_mean(fake_img)
return fake_loss - real_loss
# Define the loss functions for the generator.
def generator_loss(fake_img):
return -tf.reduce_mean(fake_img)
# Set the number of epochs for training.
epochs = 20
# Instantiate the customer `GANMonitor` Keras callback.
cbk = GANMonitor(num_img=3, latent_dim=noise_dim)
# Get the wgan model
wgan = WGAN(
discriminator=d_model,
generator=g_model,
latent_dim=noise_dim,
discriminator_extra_steps=3,
)
# Compile the wgan model
wgan.compile(
d_optimizer=discriminator_optimizer,
g_optimizer=generator_optimizer,
g_loss_fn=generator_loss,
d_loss_fn=discriminator_loss,
)
# Start training
wgan.fit(train_images, batch_size=BATCH_SIZE, epochs=epochs, callbacks=[cbk])
Epoch 1/20
118/118 ━━━━━━━━━━━━━━━━━━━━ 79s 345ms/step - d_loss: -7.7597 - g_loss: -17.2858 - loss: 0.0000e+00
Epoch 2/20
118/118 ━━━━━━━━━━━━━━━━━━━━ 14s 118ms/step - d_loss: -7.0841 - g_loss: -13.8542 - loss: 0.0000e+00
Epoch 3/20
118/118 ━━━━━━━━━━━━━━━━━━━━ 14s 118ms/step - d_loss: -6.1011 - g_loss: -13.2763 - loss: 0.0000e+00
Epoch 4/20
118/118 ━━━━━━━━━━━━━━━━━━━━ 14s 119ms/step - d_loss: -5.5292 - g_loss: -13.3122 - loss: 0.0000e+00
Epoch 5/20
118/118 ━━━━━━━━━━━━━━━━━━━━ 14s 119ms/step - d_loss: -5.1012 - g_loss: -12.1395 - loss: 0.0000e+00
Epoch 6/20
118/118 ━━━━━━━━━━━━━━━━━━━━ 14s 119ms/step - d_loss: -4.7557 - g_loss: -11.2559 - loss: 0.0000e+00
Epoch 7/20
118/118 ━━━━━━━━━━━━━━━━━━━━ 14s 119ms/step - d_loss: -4.4727 - g_loss: -10.3075 - loss: 0.0000e+00
Epoch 8/20
118/118 ━━━━━━━━━━━━━━━━━━━━ 14s 119ms/step - d_loss: -4.2056 - g_loss: -10.0340 - loss: 0.0000e+00
Epoch 9/20
118/118 ━━━━━━━━━━━━━━━━━━━━ 14s 120ms/step - d_loss: -4.0116 - g_loss: -9.9283 - loss: 0.0000e+00
Epoch 10/20
118/118 ━━━━━━━━━━━━━━━━━━━━ 14s 120ms/step - d_loss: -3.8050 - g_loss: -9.7392 - loss: 0.0000e+00
Epoch 11/20
118/118 ━━━━━━━━━━━━━━━━━━━━ 14s 120ms/step - d_loss: -3.6608 - g_loss: -9.4686 - loss: 0.0000e+00
Epoch 12/20
118/118 ━━━━━━━━━━━━━━━━━━━━ 14s 121ms/step - d_loss: -3.4623 - g_loss: -8.9601 - loss: 0.0000e+00
Epoch 13/20
118/118 ━━━━━━━━━━━━━━━━━━━━ 14s 120ms/step - d_loss: -3.3659 - g_loss: -8.4620 - loss: 0.0000e+00
Epoch 14/20
118/118 ━━━━━━━━━━━━━━━━━━━━ 14s 120ms/step - d_loss: -3.2486 - g_loss: -7.9598 - loss: 0.0000e+00
Epoch 15/20
118/118 ━━━━━━━━━━━━━━━━━━━━ 14s 120ms/step - d_loss: -3.1436 - g_loss: -7.5392 - loss: 0.0000e+00
Epoch 16/20
118/118 ━━━━━━━━━━━━━━━━━━━━ 14s 120ms/step - d_loss: -3.0370 - g_loss: -7.3694 - loss: 0.0000e+00
Epoch 17/20
118/118 ━━━━━━━━━━━━━━━━━━━━ 14s 120ms/step - d_loss: -2.9256 - g_loss: -7.6105 - loss: 0.0000e+00
Epoch 18/20
118/118 ━━━━━━━━━━━━━━━━━━━━ 14s 120ms/step - d_loss: -2.8976 - g_loss: -6.5240 - loss: 0.0000e+00
Epoch 19/20
118/118 ━━━━━━━━━━━━━━━━━━━━ 14s 120ms/step - d_loss: -2.7944 - g_loss: -6.6281 - loss: 0.0000e+00
Epoch 20/20
118/118 ━━━━━━━━━━━━━━━━━━━━ 14s 120ms/step - d_loss: -2.7175 - g_loss: -6.5900 - loss: 0.0000e+00
<keras.src.callbacks.history.History at 0x7fc763a8e950>
显示最后生成的图像
from IPython.display import Image, display
display(Image("generated_img_0_19.png"))
display(Image("generated_img_1_19.png"))
display(Image("generated_img_2_19.png"))