KerasRS / 示例 / 列表式排序

列表式排序

作者: Abheesht Sharma, Fabien Hertschuh
创建日期 2025/04/28
最后修改时间 2025/04/28
描述: 使用成对损失而非逐点损失对电影进行排序。

在 Colab 中查看 GitHub 源代码


介绍

在我们的基础排序教程中,我们探索了一个能够预测特定用户-电影组合评分的模型。该模型接受(用户,电影)对作为输入,并使用均方误差进行训练,以精确预测用户可能给一部电影的评分。

然而,仅仅优化模型预测单个电影分数的准确性并非总是开发排序系统最有效的策略。对于排序模型而言,精确预测分数的准确性不如模型生成与用户偏好相符的有序物品列表的能力重要。本质上,物品的相对顺序比精确的预测值更重要。

与其关注模型对单个查询-物品对的预测(逐点方法),我们可以基于其正确排序物品的能力来优化模型。一种常见的方法是成对排序。在这种方法中,模型通过比较物品对(例如,物品 A 和物品 B)来学习,并确定对于给定用户或查询哪个物品应该排名更高。目标是最小化错误排序的物品对数量。

首先导入所有必需的库。

!pip install -q keras-rs
import os

os.environ["KERAS_BACKEND"] = "jax"  # `"tensorflow"`/`"torch"`

import collections

import keras
import numpy as np
import tensorflow as tf  # Needed only for the dataset
import tensorflow_datasets as tfds
from keras import ops

import keras_rs

在这里定义一些超参数。

# Data args
TRAIN_NUM_LIST_PER_USER = 50
TEST_NUM_LIST_PER_USER = 1
NUM_EXAMPLES_PER_LIST = 5

# Model args
EMBEDDING_DIM = 32

# Train args
BATCH_SIZE = 1024
EPOCHS = 5
LEARNING_RATE = 0.1

准备数据集

我们使用 MovieLens 数据集。数据加载和处理步骤与之前的教程类似,因此我们只在这里讨论不同之处。

# Ratings data.
ratings = tfds.load("movielens/100k-ratings", split="train")
# Features of all the available movies.
movies = tfds.load("movielens/100k-movies", split="train")

users_count = (
    ratings.map(lambda x: tf.strings.to_number(x["user_id"], out_type=tf.int32))
    .reduce(tf.constant(0, tf.int32), tf.maximum)
    .numpy()
)
movies_count = movies.cardinality().numpy()


def preprocess_rating(x):
    return {
        "user_id": tf.strings.to_number(x["user_id"], out_type=tf.int32),
        "movie_id": tf.strings.to_number(x["movie_id"], out_type=tf.int32),
        # Normalise ratings between 0 and 1.
        "user_rating": (x["user_rating"] - 1.0) / 4.0,
    }


shuffled_ratings = ratings.map(preprocess_rating).shuffle(
    100_000, seed=42, reshuffle_each_iteration=False
)
train_ratings = shuffled_ratings.take(70_000)
val_ratings = shuffled_ratings.skip(70_000).take(15_000)
test_ratings = shuffled_ratings.skip(85_000).take(15_000)

到目前为止,我们已经复制了基础排序教程中的内容。

然而,现有的数据集不能直接应用于列表式优化。列表式优化要求针对每个用户,提供他们评分过的电影列表,以便模型可以从该列表中的相对排序中学习。MovieLens 100K 数据集原始形式提供了单独的评分实例(每个示例对应一个用户、一部电影和一个评分),而不是这些聚合的用户特定列表。

为了实现列表式优化,我们需要重构数据集。这包括对其进行转换,使每个数据点或示例代表一个用户 ID,并附带该用户评分过的电影列表。在这些列表中,有些电影会自然地被用户排名更高(通过他们的评分证明),高于其他电影。我们模型的主要目标将是学习预测与这些观察到的用户偏好相对应的物品顺序。

首先获取每个用户的完整电影列表和相应的评分。我们移除那些评分电影数量少于 NUM_EXAMPLES_PER_LIST 的用户对应的 user_ids

def get_movie_sequence_per_user(ratings, min_examples_per_list):
    """Gets movieID sequences and ratings for every user."""
    sequences = collections.defaultdict(list)

    for sample in ratings:
        user_id = sample["user_id"]
        movie_id = sample["movie_id"]
        user_rating = sample["user_rating"]

        sequences[int(user_id.numpy())].append(
            {
                "movie_id": int(movie_id.numpy()),
                "user_rating": float(user_rating.numpy()),
            }
        )

    # Remove lists with < `min_examples_per_list` number of elements.
    sequences = {
        user_id: sequence
        for user_id, sequence in sequences.items()
        if len(sequence) >= min_examples_per_list
    }

    return sequences

现在,我们为训练数据中的每个用户采样 50 个列表。对于每个列表,我们从用户评分过的电影中随机采样 5 部电影。

def sample_sublist_from_list(
    lst,
    num_examples_per_list,
):
    """Random selects `num_examples_per_list` number of elements from list."""

    indices = np.random.choice(
        range(len(lst)),
        size=num_examples_per_list,
        replace=False,
    )

    samples = [lst[i] for i in indices]
    return samples


def get_examples(
    sequences,
    num_list_per_user,
    num_examples_per_list,
):
    inputs = {
        "user_id": [],
        "movie_id": [],
    }
    labels = []
    for user_id, user_list in sequences.items():
        for _ in range(num_list_per_user):
            sampled_list = sample_sublist_from_list(
                user_list,
                num_examples_per_list,
            )

            inputs["user_id"].append(user_id)
            inputs["movie_id"].append(
                tf.convert_to_tensor([f["movie_id"] for f in sampled_list])
            )
            labels.append(
                tf.convert_to_tensor([f["user_rating"] for f in sampled_list])
            )

    return (
        {"user_id": inputs["user_id"], "movie_id": inputs["movie_id"]},
        labels,
    )


train_sequences = get_movie_sequence_per_user(
    ratings=train_ratings, min_examples_per_list=NUM_EXAMPLES_PER_LIST
)
train_examples = get_examples(
    train_sequences,
    num_list_per_user=TRAIN_NUM_LIST_PER_USER,
    num_examples_per_list=NUM_EXAMPLES_PER_LIST,
)
train_ds = tf.data.Dataset.from_tensor_slices(train_examples)

val_sequences = get_movie_sequence_per_user(
    ratings=val_ratings, min_examples_per_list=5
)
val_examples = get_examples(
    val_sequences,
    num_list_per_user=TEST_NUM_LIST_PER_USER,
    num_examples_per_list=NUM_EXAMPLES_PER_LIST,
)
val_ds = tf.data.Dataset.from_tensor_slices(val_examples)

test_sequences = get_movie_sequence_per_user(
    ratings=test_ratings, min_examples_per_list=5
)
test_examples = get_examples(
    test_sequences,
    num_list_per_user=TEST_NUM_LIST_PER_USER,
    num_examples_per_list=NUM_EXAMPLES_PER_LIST,
)
test_ds = tf.data.Dataset.from_tensor_slices(test_examples)

对数据集进行分批处理并缓存。

train_ds = train_ds.batch(BATCH_SIZE).cache()
val_ds = val_ds.batch(BATCH_SIZE).cache()
test_ds = test_ds.batch(BATCH_SIZE).cache()

构建模型

我们构建一个典型的双塔排序模型,类似于基础排序教程。我们为用户 ID 和电影 ID 分别设置了嵌入层。获得这些嵌入后,我们将它们连接起来,并通过一个密集层网络。

唯一的不同点在于,对于电影 ID,我们采用 ID 列表而非单个电影 ID。因此,当我们将用户 ID 嵌入和电影 ID 嵌入连接时,我们会将用户 ID“重复”'NUM_EXAMPLES_PER_LIST' 次,以便与电影 ID 嵌入具有相同的形状。

class RankingModel(keras.Model):
    """Create the ranking model with the provided parameters.

    Args:
      num_users: Number of entries in the user embedding table.
      num_candidates: Number of entries in the candidate embedding table.
      embedding_dimension: Output dimension for user and movie embedding tables.
    """

    def __init__(
        self,
        num_users,
        num_candidates,
        embedding_dimension=32,
        **kwargs,
    ):
        super().__init__(**kwargs)
        # Embedding table for users.
        self.user_embedding = keras.layers.Embedding(num_users, embedding_dimension)
        # Embedding table for candidates.
        self.candidate_embedding = keras.layers.Embedding(
            num_candidates, embedding_dimension
        )
        # Predictions.
        self.ratings = keras.Sequential(
            [
                # Learn multiple dense layers.
                keras.layers.Dense(256, activation="relu"),
                keras.layers.Dense(64, activation="relu"),
                # Make rating predictions in the final layer.
                keras.layers.Dense(1),
            ]
        )

    def build(self, input_shape):
        self.user_embedding.build(input_shape["user_id"])
        self.candidate_embedding.build(input_shape["movie_id"])

        output_shape = self.candidate_embedding.compute_output_shape(
            input_shape["movie_id"]
        )

        self.ratings.build(list(output_shape[:-1]) + [2 * output_shape[-1]])

    def call(self, inputs):
        user_id, movie_id = inputs["user_id"], inputs["movie_id"]
        user_embeddings = self.user_embedding(user_id)
        candidate_embeddings = self.candidate_embedding(movie_id)

        list_length = ops.shape(movie_id)[-1]
        user_embeddings_repeated = ops.repeat(
            ops.expand_dims(user_embeddings, axis=1),
            repeats=list_length,
            axis=1,
        )
        concatenated_embeddings = ops.concatenate(
            [user_embeddings_repeated, candidate_embeddings], axis=-1
        )

        scores = self.ratings(concatenated_embeddings)
        scores = ops.squeeze(scores, axis=-1)

        return scores

    def compute_output_shape(self, input_shape):
        return (input_shape[0], input_shape[1])

让我们实例化、编译和训练我们的模型。我们将训练两个模型:一个使用普通的均方误差,另一个使用成对合页损失。对于后者,我们将使用keras_rs.losses.PairwiseHingeLoss

成对损失比较列表内物品对,惩罚真实标签较高的物品预测分数低于真实标签较低的物品的情况。这就是为什么它们比逐点损失更适合排序任务的原因。

为了量化这些结果,我们计算 nDCG。nDCG 是衡量排序质量的一种指标,它评估系统根据相关性对物品进行排序的优劣程度,更重视高度相关的物品出现在列表顶部,并根据理想排序对分数进行归一化。要计算它,我们只需将 keras_rs.metrics.NDCG() 作为指标传递给 model.compile

model_mse = RankingModel(
    num_users=users_count + 1,
    num_candidates=movies_count + 1,
    embedding_dimension=EMBEDDING_DIM,
)
model_mse.compile(
    loss=keras.losses.MeanSquaredError(),
    metrics=[keras_rs.metrics.NDCG(k=NUM_EXAMPLES_PER_LIST, name="ndcg")],
    optimizer=keras.optimizers.Adagrad(learning_rate=LEARNING_RATE),
)
model_mse.fit(train_ds, validation_data=val_ds, epochs=EPOCHS)
Epoch 1/5
47/47 ━━━━━━━━━━━━━━━━━━━━ 6s 82ms/step - loss: 0.1209 - ndcg: 0.8871 - val_loss: 0.0782 - val_ndcg: 0.8943
Epoch 2/5
47/47 ━━━━━━━━━━━━━━━━━━━━ 1s 3ms/step - loss: 0.0780 - ndcg: 0.8928 - val_loss: 0.0776 - val_ndcg: 0.9020
Epoch 3/5
47/47 ━━━━━━━━━━━━━━━━━━━━ 0s 3ms/step - loss: 0.0774 - ndcg: 0.8975 - val_loss: 0.0770 - val_ndcg: 0.9052
Epoch 4/5
47/47 ━━━━━━━━━━━━━━━━━━━━ 0s 3ms/step - loss: 0.0768 - ndcg: 0.9008 - val_loss: 0.0765 - val_ndcg: 0.9089
Epoch 5/5
47/47 ━━━━━━━━━━━━━━━━━━━━ 0s 3ms/step - loss: 0.0762 - ndcg: 0.9043 - val_loss: 0.0756 - val_ndcg: 0.9121

<keras.src.callbacks.history.History at 0x7f0bd21e8df0>

现在,使用成对合页损失的模型。

model_hinge = RankingModel(
    num_users=users_count + 1,
    num_candidates=movies_count + 1,
    embedding_dimension=EMBEDDING_DIM,
)
model_hinge.compile(
    loss=keras_rs.losses.PairwiseHingeLoss(),
    metrics=[keras_rs.metrics.NDCG(k=NUM_EXAMPLES_PER_LIST, name="ndcg")],
    optimizer=keras.optimizers.Adagrad(learning_rate=LEARNING_RATE),
)
model_hinge.fit(train_ds, validation_data=val_ds, epochs=EPOCHS)
Epoch 1/5
47/47 ━━━━━━━━━━━━━━━━━━━━ 5s 76ms/step - loss: 1.3971 - ndcg: 0.8887 - val_loss: 1.3608 - val_ndcg: 0.8992
Epoch 2/5
47/47 ━━━━━━━━━━━━━━━━━━━━ 1s 3ms/step - loss: 1.3866 - ndcg: 0.9021 - val_loss: 1.2919 - val_ndcg: 0.9147
Epoch 3/5
47/47 ━━━━━━━━━━━━━━━━━━━━ 0s 3ms/step - loss: 1.2492 - ndcg: 0.9162 - val_loss: 1.1026 - val_ndcg: 0.9272
Epoch 4/5
47/47 ━━━━━━━━━━━━━━━━━━━━ 0s 3ms/step - loss: 1.0838 - ndcg: 0.9267 - val_loss: 1.0412 - val_ndcg: 0.9298
Epoch 5/5
47/47 ━━━━━━━━━━━━━━━━━━━━ 0s 3ms/step - loss: 1.0409 - ndcg: 0.9298 - val_loss: 1.0267 - val_ndcg: 0.9303

<keras.src.callbacks.history.History at 0x7f0b8c238490>

评估

比较验证集 nDCG 值,显然使用成对合页损失训练的模型优于另一个。让我们通过比较测试集上的结果来具体说明这一观察。

ndcg_mse = model_mse.evaluate(test_ds, return_dict=True)["ndcg"]
ndcg_hinge = model_hinge.evaluate(test_ds, return_dict=True)["ndcg"]
print(ndcg_mse, ndcg_hinge)
1/1 ━━━━━━━━━━━━━━━━━━━━ 1s 898ms/step - loss: 1.0489 - ndcg: 0.9348
0.9109011292457581 0.9348157048225403

预测

现在,让我们对一些列表进行排序!

让我们创建一个从电影 ID 到标题的映射,以便我们可以显示排序列表的标题。

movie_id_to_movie_title = {
    int(x["movie_id"]): x["movie_title"] for x in movies.as_numpy_iterator()
}
movie_id_to_movie_title[0] = ""  # Because id 0 is not in the dataset.

user_id = 42
movie_ids = [409, 237, 131, 941, 543]
predictions = model_hinge.predict(
    {
        "user_id": keras.ops.array([user_id]),
        "movie_id": keras.ops.array([movie_ids]),
    }
)
predictions = keras.ops.convert_to_numpy(keras.ops.squeeze(predictions, axis=0))
sorted_indices = np.argsort(predictions)
sorted_movies = [movie_ids[i] for i in sorted_indices]

for i, movie_id in enumerate(sorted_movies):
    print(f"{i + 1}. ", movie_id_to_movie_title[movie_id])
1/1 ━━━━━━━━━━━━━━━━━━━━ 0s 168ms/step
1.  b'With Honors (1994)'
2.  b'Mis\xc3\xa9rables, Les (1995)'
3.  b'Jack (1996)'
4.  b"Breakfast at Tiffany's (1961)"
5.  b'Jerry Maguire (1996)'

我们完成了!