KerasRS / 示例 / 顺序检索 [GRU4Rec]

顺序检索 [GRU4Rec]

作者: Abheesht SharmaFabien Hertschuh
创建日期 2025/04/28
最后修改日期 2025/04/28
描述: 使用基于 GRU 的顺序检索模型推荐电影。

在 Colab 中查看 GitHub 源代码


简介

在本示例中,我们将构建一个顺序检索模型。顺序推荐是一种流行的模型,它查看用户之前交互过的一系列项目,然后预测下一个项目。在这里,每个序列中项目的顺序很重要。因此,我们将使用循环神经网络来建模顺序关系。有关更多详情,请参阅 GRU4Rec 论文。

首先,选择 JAX 作为我们想要运行的后端,并导入所有必需的库。

!pip install -q keras-rs
import os

os.environ["KERAS_BACKEND"] = "jax"  # `"tensorflow"`/`"torch"`

import collections
import os
import random

import keras
import pandas as pd
import tensorflow as tf  # Needed only for the dataset

import keras_rs

让我们还在下方定义所有重要的变量/超参数。

DATA_DIR = "./raw/data/"

# MovieLens-specific variables
MOVIELENS_1M_URL = "https://files.grouplens.org/datasets/movielens/ml-1m.zip"
MOVIELENS_ZIP_HASH = "a6898adb50b9ca05aa231689da44c217cb524e7ebd39d264c56e2832f2c54e20"

RATINGS_FILE_NAME = "ratings.dat"
MOVIES_FILE_NAME = "movies.dat"

# Data processing args
MAX_CONTEXT_LENGTH = 10
MIN_SEQUENCE_LENGTH = 3

RATINGS_DATA_COLUMNS = ["UserID", "MovieID", "Rating", "Timestamp"]
MOVIES_DATA_COLUMNS = ["MovieID", "Title", "Genres"]
MIN_RATING = 2

# Training/model args
BATCH_SIZE = 4096
TEST_BATCH_SIZE = 2048
EMBEDDING_DIM = 32
NUM_EPOCHS = 5
LEARNING_RATE = 0.005

数据集

接下来,我们需要准备数据集。与 基础检索 示例中一样,我们将使用 MovieLens 数据集。

数据集准备步骤相当复杂。原始评分数据集包含 (用户, 电影 ID, 评分, 时间戳) 元组(以及其他不重要的列)。由于我们处理的是顺序检索,因此需要为每个用户创建电影序列,其中序列按时间戳排序。

让我们首先下载并读取数据集。

# Download the MovieLens dataset.
if not os.path.exists(DATA_DIR):
    os.makedirs(DATA_DIR)

path_to_zip = keras.utils.get_file(
    fname="ml-1m.zip",
    origin=MOVIELENS_1M_URL,
    file_hash=MOVIELENS_ZIP_HASH,
    hash_algorithm="sha256",
    extract=True,
    cache_dir=DATA_DIR,
)
movielens_extracted_dir = os.path.join(
    os.path.dirname(path_to_zip),
    "ml-1m_extracted",
    "ml-1m",
)


# Read the dataset.
def read_data(data_directory, min_rating=None):
    """Read movielens ratings.dat and movies.dat file
    into dataframe.
    """

    ratings_df = pd.read_csv(
        os.path.join(data_directory, RATINGS_FILE_NAME),
        sep="::",
        names=RATINGS_DATA_COLUMNS,
        encoding="unicode_escape",
    )
    ratings_df["Timestamp"] = ratings_df["Timestamp"].apply(int)

    # Remove movies with `rating < min_rating`.
    if min_rating is not None:
        ratings_df = ratings_df[ratings_df["Rating"] >= min_rating]

    movies_df = pd.read_csv(
        os.path.join(data_directory, MOVIES_FILE_NAME),
        sep="::",
        names=MOVIES_DATA_COLUMNS,
        encoding="unicode_escape",
    )
    return ratings_df, movies_df


ratings_df, movies_df = read_data(
    data_directory=movielens_extracted_dir, min_rating=MIN_RATING
)

# Need to know #movies so as to define embedding layers.
movies_count = movies_df["MovieID"].max()
Downloading data from https://files.grouplens.org/datasets/movielens/ml-1m.zip

5917549/5917549 ━━━━━━━━━━━━━━━━━━━━ 2s 0us/step

<ipython-input-4-6fc962858754>:26: ParserWarning: Falling back to the 'python' engine because the 'c' engine does not support regex separators (separators > 1 char and different from '\s+' are interpreted as regex); you can avoid this warning by specifying engine='python'.
  ratings_df = pd.read_csv(

<ipython-input-4-6fc962858754>:38: ParserWarning: Falling back to the 'python' engine because the 'c' engine does not support regex separators (separators > 1 char and different from '\s+' are interpreted as regex); you can avoid this warning by specifying engine='python'.
  movies_df = pd.read_csv(

读取数据集后,让我们为每个用户创建电影序列。下面是执行此操作的函数。

def get_movie_sequence_per_user(ratings_df):
    """Get movieID sequences for every user."""
    sequences = collections.defaultdict(list)

    for user_id, movie_id, rating, timestamp in ratings_df.values:
        sequences[user_id].append(
            {
                "movie_id": movie_id,
                "timestamp": timestamp,
                "rating": rating,
            }
        )

    # Sort movie sequences by timestamp for every user.
    for user_id, context in sequences.items():
        context.sort(key=lambda x: x["timestamp"])
        sequences[user_id] = context

    return sequences

在继续训练模型之前,我们需要进行一些过滤和处理。

  1. 形成所有长度不超过 min(user_sequence_length, MAX_CONTEXT_LENGTH) 的序列。因此,每个用户将对应多个序列。
  2. 获取标签,即给定一个长度为 n 的序列,前 n-1 个 token 将作为输入馈送到模型,标签将是最后一个 token。
  3. 删除所有电影数量少于 MIN_SEQUENCE_LENGTH 的用户序列。
  4. 将所有序列填充到 MAX_CONTEXT_LENGTH

需要注意的一个重要点是我们如何形成训练集和测试集的划分。我们不会形成完整的序列数据集,然后将其划分为训练集和测试集。相反,对于每个用户,我们将最后一个序列作为测试集的一部分,所有其他序列作为训练集的一部分。这是为了防止数据泄露。

def generate_examples_from_user_sequences(sequences):
    """Generates sequences for all users, with padding, truncation, etc."""

    def generate_examples_from_user_sequence(sequence):
        """Generates examples for a single user sequence."""

        train_examples = []
        test_examples = []
        for label_idx in range(1, len(sequence)):
            start_idx = max(0, label_idx - MAX_CONTEXT_LENGTH)
            context = sequence[start_idx:label_idx]

            # Padding
            while len(context) < MAX_CONTEXT_LENGTH:
                context.append(
                    {
                        "movie_id": 0,
                        "timestamp": 0,
                        "rating": 0.0,
                    }
                )

            label_movie_id = int(sequence[label_idx]["movie_id"])
            context_movie_id = [int(movie["movie_id"]) for movie in context]

            example = {
                "context_movie_id": context_movie_id,
                "label_movie_id": label_movie_id,
            }

            if label_idx == len(sequence) - 1:
                test_examples.append(example)
            else:
                train_examples.append(example)

        return train_examples, test_examples

    all_train_examples = []
    all_test_examples = []
    for sequence in sequences.values():
        if len(sequence) < MIN_SEQUENCE_LENGTH:
            continue

        user_train_examples, user_test_example = generate_examples_from_user_sequence(
            sequence
        )

        all_train_examples.extend(user_train_examples)
        all_test_examples.extend(user_test_example)

    return all_train_examples, all_test_examples

让我们将数据集划分为训练集和测试集。此外,我们需要更改数据集字典的格式,以便能够转换为 tf.data.Dataset 对象。

sequences = get_movie_sequence_per_user(ratings_df)
train_examples, test_examples = generate_examples_from_user_sequences(sequences)


def list_of_dicts_to_dict_of_lists(list_of_dicts):
    """Convert list of dictionaries to dictionary of lists for
    [`tf.data`](https://tensorflowcn.cn/api_docs/python/tf/data) conversion.
    """
    dict_of_lists = collections.defaultdict(list)
    for dictionary in list_of_dicts:
        for key, value in dictionary.items():
            dict_of_lists[key].append(value)
    return dict_of_lists


train_examples = list_of_dicts_to_dict_of_lists(train_examples)
test_examples = list_of_dicts_to_dict_of_lists(test_examples)

train_ds = tf.data.Dataset.from_tensor_slices(train_examples).map(
    lambda x: (x["context_movie_id"], x["label_movie_id"])
)
test_ds = tf.data.Dataset.from_tensor_slices(test_examples).map(
    lambda x: (x["context_movie_id"], x["label_movie_id"])
)

我们需要对数据集进行批处理。我们还使用 cache()prefetch() 以获得更好的性能。

train_ds = train_ds.batch(BATCH_SIZE).cache().prefetch(tf.data.AUTOTUNE)
test_ds = test_ds.batch(TEST_BATCH_SIZE).cache().prefetch(tf.data.AUTOTUNE)

让我们打印出一个批次。

for sample in train_ds.take(1):
    print(sample)
(<tf.Tensor: shape=(4096, 10), dtype=int32, numpy=
array([[3186,    0,    0, ...,    0,    0,    0],
       [3186, 1270,    0, ...,    0,    0,    0],
       [3186, 1270, 1721, ...,    0,    0,    0],
       ...,
       [2194, 1291, 2159, ...,  300, 2076,  866],
       [1291, 2159, 1012, ..., 2076,  866, 2206],
       [2159, 1012, 1092, ...,  866, 2206,  377]], dtype=int32)>, <tf.Tensor: shape=(4096,), dtype=int32, numpy=array([1270, 1721, 1022, ..., 2206,  377, 1357], dtype=int32)>)

模型与训练

在基础检索示例中,我们对用户使用了一个查询塔,对候选电影使用了候选塔。在这里,我们也将使用双塔架构。然而,我们在查询塔中使用门控循环单元 (GRU) 层来编码历史电影序列,并为候选电影保留相同的候选塔。

注意:请查看标签是如何定义的。标签张量(形状为 (batch_size, batch_size))包含独热向量。思路是:对于每个样本,将批次中对应于其他样本的电影 ID 视为负样本。

class SequentialRetrievalModel(keras.Model):
    """Create the sequential retrieval model.

    Args:
      movies_count: Total number of unique movies in the dataset.
      embedding_dimension: Output dimension for movie embedding tables.
    """

    def __init__(
        self,
        movies_count,
        embedding_dimension=128,
        **kwargs,
    ):
        super().__init__(**kwargs)
        # Our query tower, simply an embedding table followed by
        # a GRU unit. This encodes sequence of historical movies.
        self.query_model = keras.Sequential(
            [
                keras.layers.Embedding(movies_count + 1, embedding_dimension),
                keras.layers.GRU(embedding_dimension),
            ]
        )

        # Our candidate tower, simply an embedding table.
        self.candidate_model = keras.layers.Embedding(
            movies_count + 1, embedding_dimension
        )

        # The layer that performs the retrieval.
        self.retrieval = keras_rs.layers.BruteForceRetrieval(k=10, return_scores=False)
        self.loss_fn = keras.losses.CategoricalCrossentropy(
            from_logits=True,
        )

    def build(self, input_shape):
        self.query_model.build(input_shape)
        self.candidate_model.build(input_shape)

        # In this case, the candidates are directly the movie embeddings.
        # We take a shortcut and directly reuse the variable.
        self.retrieval.candidate_embeddings = self.candidate_model.embeddings
        self.retrieval.build(input_shape)
        super().build(input_shape)

    def call(self, inputs, training=False):
        query_embeddings = self.query_model(inputs)
        result = {
            "query_embeddings": query_embeddings,
        }

        if not training:
            # Skip the retrieval of top movies during training as the
            # predictions are not used.
            result["predictions"] = self.retrieval(query_embeddings)
        return result

    def compute_loss(self, x, y, y_pred, sample_weight, training=True):
        candidate_id = y
        query_embeddings = y_pred["query_embeddings"]
        candidate_embeddings = self.candidate_model(candidate_id)

        num_queries = keras.ops.shape(query_embeddings)[0]
        num_candidates = keras.ops.shape(candidate_embeddings)[0]

        # One-hot vectors for labels.
        labels = keras.ops.eye(num_queries, num_candidates)

        # Compute the affinity score by multiplying the two embeddings.
        scores = keras.ops.matmul(
            query_embeddings, keras.ops.transpose(candidate_embeddings)
        )

        return self.loss_fn(labels, scores, sample_weight)

让我们实例化、编译和训练我们的模型。

model = SequentialRetrievalModel(
    movies_count=movies_count, embedding_dimension=EMBEDDING_DIM
)

# Compile.
model.compile(optimizer=keras.optimizers.AdamW(learning_rate=LEARNING_RATE))

# Train.
model.fit(
    train_ds,
    validation_data=test_ds,
    epochs=NUM_EPOCHS,
)
Epoch 1/5

228/228 ━━━━━━━━━━━━━━━━━━━━ 7s 24ms/step - loss: 7.9319 - val_loss: 6.8823

Epoch 2/5

228/228 ━━━━━━━━━━━━━━━━━━━━ 2s 6ms/step - loss: 7.0997 - val_loss: 6.5517

Epoch 3/5

228/228 ━━━━━━━━━━━━━━━━━━━━ 1s 5ms/step - loss: 6.8198 - val_loss: 6.4342

Epoch 4/5

228/228 ━━━━━━━━━━━━━━━━━━━━ 1s 5ms/step - loss: 6.6873 - val_loss: 6.3748

Epoch 5/5

228/228 ━━━━━━━━━━━━━━━━━━━━ 1s 5ms/step - loss: 6.6105 - val_loss: 6.3444

<keras.src.callbacks.history.History at 0x795792c69b90>

进行预测

现在我们有了模型,就可以进行预测了。

到目前为止,我们只处理了电影 ID。现在是时候创建一个以电影 ID 为键的映射,以便显示电影标题了。

movie_id_to_movie_title = dict(zip(movies_df["MovieID"], movies_df["Title"]))
movie_id_to_movie_title[0] = ""  # Because id 0 is not in the dataset.

然后,我们只需使用 Keras 的 model.predict() 方法。在底层,它调用 BruteForceRetrieval 层来执行实际的检索。

请注意,此模型可以检索用户已经看过的电影。如果需要,我们可以轻松添加逻辑来去除它们。

print("\n==> Movies the user has watched:")
movie_sequence = test_ds.unbatch().take(1)
for element in movie_sequence:
    for movie_id in element[0][:-1]:
        print(movie_id_to_movie_title[movie_id.numpy()], end=", ")
    print(movie_id_to_movie_title[element[0][-1].numpy()])

predictions = model.predict(movie_sequence.batch(1))
predictions = keras.ops.convert_to_numpy(predictions["predictions"])

print("\n==> Recommended movies for the above sequence:")
for movie_id in predictions[0]:
    print(movie_id_to_movie_title[movie_id])
==> Movies the user has watched:
Beauty and the Beast (1991), Tarzan (1999), Close Shave, A (1995), Aladdin (1992), Toy Story (1995), Bug's Life, A (1998), Antz (1998), Hunchback of Notre Dame, The (1996), Hercules (1997), Mulan (1998)

1/1 ━━━━━━━━━━━━━━━━━━━━ 0s 272ms/step

==> Recommended movies for the above sequence:
Hunchback of Notre Dame, The (1996)
Anastasia (1997)
Beavis and Butt-head Do America (1996)
Hercules (1997)
Pocahontas (1995)
Thumbelina (1994)
James and the Giant Peach (1996)
We're Back! A Dinosaur's Story (1993)
Rescuers Down Under, The (1990)
Prince of Egypt, The (1998)

/usr/local/lib/python3.11/dist-packages/keras/src/trainers/epoch_iterator.py:151: UserWarning: Your input ran out of data; interrupting training. Make sure that your dataset or generator can generate at least `steps_per_epoch * epochs` batches. You may need to use the `.repeat()` function when building your dataset.
  self._interrupted_warning()