代码示例 / 计算机视觉 / 使用三元组损失的 Siamese 网络进行图像相似度估计

使用三元组损失的 Siamese 网络进行图像相似度估计

作者: Hazem EssamSantiago L. Valdarrama
创建日期 2021/03/25
上次修改日期 2021/03/25
描述: 训练 Siamese 网络以使用三元组损失函数比较图像的相似度。

ⓘ 此示例使用 Keras 3

在 Colab 中查看 GitHub 源代码


简介

Siamese 网络是一种网络架构,包含两个或多个相同的子网络,用于为每个输入生成特征向量并进行比较。

Siamese 网络可以应用于不同的用例,例如检测重复项、查找异常和人脸识别。

此示例使用具有三个相同子网络的 Siamese 网络。我们将向模型提供三个图像,其中两个图像相似(锚点正例样本),第三个图像不相关(负例)。我们的目标是让模型学习估计图像之间的相似度。

为了让网络学习,我们使用三元组损失函数。您可以在 Schroff 等人 2015 年的 FaceNet 论文中找到三元组损失的介绍。在此示例中,我们将三元组损失函数定义如下

L(A, P, N) = max(‖f(A) - f(P)‖² - ‖f(A) - f(N)‖² + margin, 0)

此示例使用 Totally Looks Like 数据集,由 Rosenfeld 等人,2018 提供。


设置

import matplotlib.pyplot as plt
import numpy as np
import os
import random
import tensorflow as tf
from pathlib import Path
from keras import applications
from keras import layers
from keras import losses
from keras import ops
from keras import optimizers
from keras import metrics
from keras import Model
from keras.applications import resnet


target_shape = (200, 200)

加载数据集

我们将加载 Totally Looks Like 数据集,并在本地环境的 ~/.keras 目录中解压缩它。

该数据集包含两个单独的文件

  • left.zip 包含我们将用作锚点的图像。
  • right.zip 包含我们将用作正例样本的图像(与锚点相似的图像)。
cache_dir = Path(Path.home()) / ".keras"
anchor_images_path = cache_dir / "left"
positive_images_path = cache_dir / "right"
!gdown --id 1jvkbTr_giSP3Ru8OwGNCg6B4PvVbcO34
!gdown --id 1EzBZUb_mh_Dp_FKD0P4XiYYSd0QBH5zW
!unzip -oq left.zip -d $cache_dir
!unzip -oq right.zip -d $cache_dir
Downloading...
From (uriginal): https://drive.google.com/uc?id=1jvkbTr_giSP3Ru8OwGNCg6B4PvVbcO34
From (redirected): https://drive.google.com/uc?id=1jvkbTr_giSP3Ru8OwGNCg6B4PvVbcO34&confirm=t&uuid=be98abe4-8be7-4c5f-a8f9-ca95d178fbda
To: /home/scottzhu/keras-io/scripts/tmp_9629511/left.zip
100%|█████████████████████████████████████████| 104M/104M [00:00<00:00, 278MB/s]
/home/scottzhu/.local/lib/python3.10/site-packages/gdown/cli.py:126: FutureWarning: Option `--id` was deprecated in version 4.3.1 and will be removed in 5.0. You don't need to pass it anymore to use a file ID.
Downloading...
From (uriginal): https://drive.google.com/uc?id=1EzBZUb_mh_Dp_FKD0P4XiYYSd0QBH5zW
From (redirected): https://drive.google.com/uc?id=1EzBZUb_mh_Dp_FKD0P4XiYYSd0QBH5zW&confirm=t&uuid=0eb1b2e2-beee-462a-a9b8-c0bf21bea257
To: /home/scottzhu/keras-io/scripts/tmp_9629511/right.zip
100%|█████████████████████████████████████████| 104M/104M [00:00<00:00, 285MB/s]

准备数据

我们将使用 tf.data 管道来加载数据并生成训练 Siamese 网络所需的三元组。

我们将使用一个压缩列表设置管道,其中包含锚点、正例和负例文件名作为源。管道将加载并预处理相应的图像。

def preprocess_image(filename):
    """
    Load the specified file as a JPEG image, preprocess it and
    resize it to the target shape.
    """

    image_string = tf.io.read_file(filename)
    image = tf.image.decode_jpeg(image_string, channels=3)
    image = tf.image.convert_image_dtype(image, tf.float32)
    image = tf.image.resize(image, target_shape)
    return image


def preprocess_triplets(anchor, positive, negative):
    """
    Given the filenames corresponding to the three images, load and
    preprocess them.
    """

    return (
        preprocess_image(anchor),
        preprocess_image(positive),
        preprocess_image(negative),
    )

让我们使用一个压缩列表设置我们的数据管道,其中包含锚点、正例和负例图像文件名作为源。管道的输出包含相同的三元组,其中每个图像都已加载并预处理。

# We need to make sure both the anchor and positive images are loaded in
# sorted order so we can match them together.
anchor_images = sorted(
    [str(anchor_images_path / f) for f in os.listdir(anchor_images_path)]
)

positive_images = sorted(
    [str(positive_images_path / f) for f in os.listdir(positive_images_path)]
)

image_count = len(anchor_images)

anchor_dataset = tf.data.Dataset.from_tensor_slices(anchor_images)
positive_dataset = tf.data.Dataset.from_tensor_slices(positive_images)

# To generate the list of negative images, let's randomize the list of
# available images and concatenate them together.
rng = np.random.RandomState(seed=42)
rng.shuffle(anchor_images)
rng.shuffle(positive_images)

negative_images = anchor_images + positive_images
np.random.RandomState(seed=32).shuffle(negative_images)

negative_dataset = tf.data.Dataset.from_tensor_slices(negative_images)
negative_dataset = negative_dataset.shuffle(buffer_size=4096)

dataset = tf.data.Dataset.zip((anchor_dataset, positive_dataset, negative_dataset))
dataset = dataset.shuffle(buffer_size=1024)
dataset = dataset.map(preprocess_triplets)

# Let's now split our dataset in train and validation.
train_dataset = dataset.take(round(image_count * 0.8))
val_dataset = dataset.skip(round(image_count * 0.8))

train_dataset = train_dataset.batch(32, drop_remainder=False)
train_dataset = train_dataset.prefetch(tf.data.AUTOTUNE)

val_dataset = val_dataset.batch(32, drop_remainder=False)
val_dataset = val_dataset.prefetch(tf.data.AUTOTUNE)

让我们看一下一些三元组示例。请注意,前两个图像看起来相似,而第三个图像始终不同。

def visualize(anchor, positive, negative):
    """Visualize a few triplets from the supplied batches."""

    def show(ax, image):
        ax.imshow(image)
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)

    fig = plt.figure(figsize=(9, 9))

    axs = fig.subplots(3, 3)
    for i in range(3):
        show(axs[i, 0], anchor[i])
        show(axs[i, 1], positive[i])
        show(axs[i, 2], negative[i])


visualize(*list(train_dataset.take(1).as_numpy_iterator())[0])

png


设置嵌入生成器模型

我们的 Siamese 网络将为三元组的每个图像生成嵌入。为此,我们将使用在 ImageNet 上预训练的 ResNet50 模型,并将其连接到一些 Dense 层,以便我们学习分离这些嵌入。

我们将冻结模型所有层(直到 conv5_block1_out 层)的权重。这对于避免影响模型已经学习的权重非常重要。我们将保留底部的几个层可训练,以便我们可以在训练期间微调它们的权重。

base_cnn = resnet.ResNet50(
    weights="imagenet", input_shape=target_shape + (3,), include_top=False
)

flatten = layers.Flatten()(base_cnn.output)
dense1 = layers.Dense(512, activation="relu")(flatten)
dense1 = layers.BatchNormalization()(dense1)
dense2 = layers.Dense(256, activation="relu")(dense1)
dense2 = layers.BatchNormalization()(dense2)
output = layers.Dense(256)(dense2)

embedding = Model(base_cnn.input, output, name="Embedding")

trainable = False
for layer in base_cnn.layers:
    if layer.name == "conv5_block1_out":
        trainable = True
    layer.trainable = trainable

设置 Siamese 网络模型

Siamese 网络将接收每个三元组图像作为输入,生成嵌入,并输出锚点和正例嵌入之间的距离,以及锚点和负例嵌入之间的距离。

为了计算距离,我们可以使用自定义层 DistanceLayer,它将两个值作为元组返回。

class DistanceLayer(layers.Layer):
    """
    This layer is responsible for computing the distance between the anchor
    embedding and the positive embedding, and the anchor embedding and the
    negative embedding.
    """

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def call(self, anchor, positive, negative):
        ap_distance = ops.sum(tf.square(anchor - positive), -1)
        an_distance = ops.sum(tf.square(anchor - negative), -1)
        return (ap_distance, an_distance)


anchor_input = layers.Input(name="anchor", shape=target_shape + (3,))
positive_input = layers.Input(name="positive", shape=target_shape + (3,))
negative_input = layers.Input(name="negative", shape=target_shape + (3,))

distances = DistanceLayer()(
    embedding(resnet.preprocess_input(anchor_input)),
    embedding(resnet.preprocess_input(positive_input)),
    embedding(resnet.preprocess_input(negative_input)),
)

siamese_network = Model(
    inputs=[anchor_input, positive_input, negative_input], outputs=distances
)

将所有内容放在一起

我们现在需要实现一个具有自定义训练循环的模型,以便我们可以使用 Siamese 网络生成的三个嵌入来计算三元组损失。

让我们创建一个 Mean 指标实例来跟踪训练过程的损失。

class SiameseModel(Model):
    """The Siamese Network model with a custom training and testing loops.

    Computes the triplet loss using the three embeddings produced by the
    Siamese Network.

    The triplet loss is defined as:
       L(A, P, N) = max(‖f(A) - f(P)‖² - ‖f(A) - f(N)‖² + margin, 0)
    """

    def __init__(self, siamese_network, margin=0.5):
        super().__init__()
        self.siamese_network = siamese_network
        self.margin = margin
        self.loss_tracker = metrics.Mean(name="loss")

    def call(self, inputs):
        return self.siamese_network(inputs)

    def train_step(self, data):
        # GradientTape is a context manager that records every operation that
        # you do inside. We are using it here to compute the loss so we can get
        # the gradients and apply them using the optimizer specified in
        # `compile()`.
        with tf.GradientTape() as tape:
            loss = self._compute_loss(data)

        # Storing the gradients of the loss function with respect to the
        # weights/parameters.
        gradients = tape.gradient(loss, self.siamese_network.trainable_weights)

        # Applying the gradients on the model using the specified optimizer
        self.optimizer.apply_gradients(
            zip(gradients, self.siamese_network.trainable_weights)
        )

        # Let's update and return the training loss metric.
        self.loss_tracker.update_state(loss)
        return {"loss": self.loss_tracker.result()}

    def test_step(self, data):
        loss = self._compute_loss(data)

        # Let's update and return the loss metric.
        self.loss_tracker.update_state(loss)
        return {"loss": self.loss_tracker.result()}

    def _compute_loss(self, data):
        # The output of the network is a tuple containing the distances
        # between the anchor and the positive example, and the anchor and
        # the negative example.
        ap_distance, an_distance = self.siamese_network(data)

        # Computing the Triplet Loss by subtracting both distances and
        # making sure we don't get a negative value.
        loss = ap_distance - an_distance
        loss = tf.maximum(loss + self.margin, 0.0)
        return loss

    @property
    def metrics(self):
        # We need to list our metrics here so the `reset_states()` can be
        # called automatically.
        return [self.loss_tracker]

训练

我们现在准备好训练我们的模型了。

siamese_model = SiameseModel(siamese_network)
siamese_model.compile(optimizer=optimizers.Adam(0.0001))
siamese_model.fit(train_dataset, epochs=10, validation_data=val_dataset)
Epoch 1/10
   1/151 ━━━━━━━━━━━━━━━━━━━━  1:21:32 33s/step - loss: 1.5020

WARNING: All log messages before absl::InitializeLog() is called are written to STDERR
I0000 00:00:1699919378.193493    9680 device_compiler.h:187] Compiled cluster using XLA!  This line is logged at most once for the lifetime of the process.

 151/151 ━━━━━━━━━━━━━━━━━━━━ 80s 317ms/step - loss: 0.7004 - val_loss: 0.3704
Epoch 2/10
 151/151 ━━━━━━━━━━━━━━━━━━━━ 20s 136ms/step - loss: 0.3749 - val_loss: 0.3609
Epoch 3/10
 151/151 ━━━━━━━━━━━━━━━━━━━━ 21s 140ms/step - loss: 0.3548 - val_loss: 0.3399
Epoch 4/10
 151/151 ━━━━━━━━━━━━━━━━━━━━ 20s 135ms/step - loss: 0.3432 - val_loss: 0.3533
Epoch 5/10
 151/151 ━━━━━━━━━━━━━━━━━━━━ 20s 134ms/step - loss: 0.3299 - val_loss: 0.3522
Epoch 6/10
 151/151 ━━━━━━━━━━━━━━━━━━━━ 20s 135ms/step - loss: 0.3263 - val_loss: 0.3177
Epoch 7/10
 151/151 ━━━━━━━━━━━━━━━━━━━━ 20s 134ms/step - loss: 0.3032 - val_loss: 0.3308
Epoch 8/10
 151/151 ━━━━━━━━━━━━━━━━━━━━ 20s 134ms/step - loss: 0.2944 - val_loss: 0.3282
Epoch 9/10
 151/151 ━━━━━━━━━━━━━━━━━━━━ 20s 135ms/step - loss: 0.2893 - val_loss: 0.3046
Epoch 10/10
 151/151 ━━━━━━━━━━━━━━━━━━━━ 20s 134ms/step - loss: 0.2679 - val_loss: 0.2841

<keras.src.callbacks.history.History at 0x7f6945c08820>

检查网络学到了什么

在这一点上,我们可以检查网络如何学习根据嵌入是否属于相似图像来分离它们。

我们可以使用余弦相似度来衡量嵌入之间的相似性。

让我们从数据集中选择一个样本来检查为每个图像生成的嵌入之间的相似性。

sample = next(iter(train_dataset))
visualize(*sample)

anchor, positive, negative = sample
anchor_embedding, positive_embedding, negative_embedding = (
    embedding(resnet.preprocess_input(anchor)),
    embedding(resnet.preprocess_input(positive)),
    embedding(resnet.preprocess_input(negative)),
)

png

最后,我们可以计算锚点图像和正样本图像之间的余弦相似度,并将其与锚点图像和负样本图像之间的相似度进行比较。

我们应该期望锚点图像和正样本图像之间的相似度大于锚点图像和负样本图像之间的相似度。

cosine_similarity = metrics.CosineSimilarity()

positive_similarity = cosine_similarity(anchor_embedding, positive_embedding)
print("Positive similarity:", positive_similarity.numpy())

negative_similarity = cosine_similarity(anchor_embedding, negative_embedding)
print("Negative similarity", negative_similarity.numpy())
Positive similarity: 0.99608964
Negative similarity 0.9941576

总结

  1. tf.data API 使您能够为您的模型构建高效的输入管道。如果您有一个大型数据集,这将特别有用。您可以在tf.data:构建 TensorFlow 输入管道中了解有关tf.data管道的更多信息。

  2. 在此示例中,我们使用预训练的 ResNet50 作为生成特征嵌入的子网络的一部分。通过使用迁移学习,我们可以显著减少训练时间和数据集的大小。

  3. 请注意我们如何微调 ResNet50 网络最后一层的权重,但保持其余层不变。使用分配给每一层的名称,我们可以将权重冻结到某个点,并保持最后几层开放。

  4. 我们可以通过创建一个继承自tf.keras.layers.Layer的类来创建自定义层,就像我们在 DistanceLayer 类中所做的那样。

  5. 我们使用余弦相似度指标来衡量 2 个输出嵌入彼此之间的相似程度。

  6. 您可以通过重写 train_step() 方法来实现自定义训练循环。train_step() 使用 tf.GradientTape,它记录您在其中执行的每个操作。在本示例中,我们使用它来访问传递给优化器的梯度,以便在每个步骤更新模型权重。有关更多详细信息,请查看 Keras 研究人员入门从头开始编写训练循环