作者: Abheesht Sharma,Fabien Hertschuh
创建日期 2025/04/28
最后修改日期 2025/04/28
描述: 使用基于 GRU 的顺序检索模型推荐电影。
在本示例中,我们将构建一个顺序检索模型。顺序推荐是一种流行的模型,它查看用户之前交互过的一系列项目,然后预测下一个项目。在这里,每个序列中项目的顺序很重要。因此,我们将使用循环神经网络来建模顺序关系。有关更多详情,请参阅 GRU4Rec 论文。
首先,选择 JAX 作为我们想要运行的后端,并导入所有必需的库。
!pip install -q keras-rs
import os
os.environ["KERAS_BACKEND"] = "jax" # `"tensorflow"`/`"torch"`
import collections
import os
import random
import keras
import pandas as pd
import tensorflow as tf # Needed only for the dataset
import keras_rs
让我们还在下方定义所有重要的变量/超参数。
DATA_DIR = "./raw/data/"
# MovieLens-specific variables
MOVIELENS_1M_URL = "https://files.grouplens.org/datasets/movielens/ml-1m.zip"
MOVIELENS_ZIP_HASH = "a6898adb50b9ca05aa231689da44c217cb524e7ebd39d264c56e2832f2c54e20"
RATINGS_FILE_NAME = "ratings.dat"
MOVIES_FILE_NAME = "movies.dat"
# Data processing args
MAX_CONTEXT_LENGTH = 10
MIN_SEQUENCE_LENGTH = 3
RATINGS_DATA_COLUMNS = ["UserID", "MovieID", "Rating", "Timestamp"]
MOVIES_DATA_COLUMNS = ["MovieID", "Title", "Genres"]
MIN_RATING = 2
# Training/model args
BATCH_SIZE = 4096
TEST_BATCH_SIZE = 2048
EMBEDDING_DIM = 32
NUM_EPOCHS = 5
LEARNING_RATE = 0.005
接下来,我们需要准备数据集。与 基础检索 示例中一样,我们将使用 MovieLens 数据集。
数据集准备步骤相当复杂。原始评分数据集包含 (用户, 电影 ID, 评分, 时间戳)
元组(以及其他不重要的列)。由于我们处理的是顺序检索,因此需要为每个用户创建电影序列,其中序列按时间戳排序。
让我们首先下载并读取数据集。
# Download the MovieLens dataset.
if not os.path.exists(DATA_DIR):
os.makedirs(DATA_DIR)
path_to_zip = keras.utils.get_file(
fname="ml-1m.zip",
origin=MOVIELENS_1M_URL,
file_hash=MOVIELENS_ZIP_HASH,
hash_algorithm="sha256",
extract=True,
cache_dir=DATA_DIR,
)
movielens_extracted_dir = os.path.join(
os.path.dirname(path_to_zip),
"ml-1m_extracted",
"ml-1m",
)
# Read the dataset.
def read_data(data_directory, min_rating=None):
"""Read movielens ratings.dat and movies.dat file
into dataframe.
"""
ratings_df = pd.read_csv(
os.path.join(data_directory, RATINGS_FILE_NAME),
sep="::",
names=RATINGS_DATA_COLUMNS,
encoding="unicode_escape",
)
ratings_df["Timestamp"] = ratings_df["Timestamp"].apply(int)
# Remove movies with `rating < min_rating`.
if min_rating is not None:
ratings_df = ratings_df[ratings_df["Rating"] >= min_rating]
movies_df = pd.read_csv(
os.path.join(data_directory, MOVIES_FILE_NAME),
sep="::",
names=MOVIES_DATA_COLUMNS,
encoding="unicode_escape",
)
return ratings_df, movies_df
ratings_df, movies_df = read_data(
data_directory=movielens_extracted_dir, min_rating=MIN_RATING
)
# Need to know #movies so as to define embedding layers.
movies_count = movies_df["MovieID"].max()
Downloading data from https://files.grouplens.org/datasets/movielens/ml-1m.zip
5917549/5917549 ━━━━━━━━━━━━━━━━━━━━ 2s 0us/step
<ipython-input-4-6fc962858754>:26: ParserWarning: Falling back to the 'python' engine because the 'c' engine does not support regex separators (separators > 1 char and different from '\s+' are interpreted as regex); you can avoid this warning by specifying engine='python'.
ratings_df = pd.read_csv(
<ipython-input-4-6fc962858754>:38: ParserWarning: Falling back to the 'python' engine because the 'c' engine does not support regex separators (separators > 1 char and different from '\s+' are interpreted as regex); you can avoid this warning by specifying engine='python'.
movies_df = pd.read_csv(
读取数据集后,让我们为每个用户创建电影序列。下面是执行此操作的函数。
def get_movie_sequence_per_user(ratings_df):
"""Get movieID sequences for every user."""
sequences = collections.defaultdict(list)
for user_id, movie_id, rating, timestamp in ratings_df.values:
sequences[user_id].append(
{
"movie_id": movie_id,
"timestamp": timestamp,
"rating": rating,
}
)
# Sort movie sequences by timestamp for every user.
for user_id, context in sequences.items():
context.sort(key=lambda x: x["timestamp"])
sequences[user_id] = context
return sequences
在继续训练模型之前,我们需要进行一些过滤和处理。
min(user_sequence_length, MAX_CONTEXT_LENGTH)
的序列。因此,每个用户将对应多个序列。n
的序列,前 n-1
个 token 将作为输入馈送到模型,标签将是最后一个 token。MIN_SEQUENCE_LENGTH
的用户序列。MAX_CONTEXT_LENGTH
。需要注意的一个重要点是我们如何形成训练集和测试集的划分。我们不会形成完整的序列数据集,然后将其划分为训练集和测试集。相反,对于每个用户,我们将最后一个序列作为测试集的一部分,所有其他序列作为训练集的一部分。这是为了防止数据泄露。
def generate_examples_from_user_sequences(sequences):
"""Generates sequences for all users, with padding, truncation, etc."""
def generate_examples_from_user_sequence(sequence):
"""Generates examples for a single user sequence."""
train_examples = []
test_examples = []
for label_idx in range(1, len(sequence)):
start_idx = max(0, label_idx - MAX_CONTEXT_LENGTH)
context = sequence[start_idx:label_idx]
# Padding
while len(context) < MAX_CONTEXT_LENGTH:
context.append(
{
"movie_id": 0,
"timestamp": 0,
"rating": 0.0,
}
)
label_movie_id = int(sequence[label_idx]["movie_id"])
context_movie_id = [int(movie["movie_id"]) for movie in context]
example = {
"context_movie_id": context_movie_id,
"label_movie_id": label_movie_id,
}
if label_idx == len(sequence) - 1:
test_examples.append(example)
else:
train_examples.append(example)
return train_examples, test_examples
all_train_examples = []
all_test_examples = []
for sequence in sequences.values():
if len(sequence) < MIN_SEQUENCE_LENGTH:
continue
user_train_examples, user_test_example = generate_examples_from_user_sequence(
sequence
)
all_train_examples.extend(user_train_examples)
all_test_examples.extend(user_test_example)
return all_train_examples, all_test_examples
让我们将数据集划分为训练集和测试集。此外,我们需要更改数据集字典的格式,以便能够转换为 tf.data.Dataset
对象。
sequences = get_movie_sequence_per_user(ratings_df)
train_examples, test_examples = generate_examples_from_user_sequences(sequences)
def list_of_dicts_to_dict_of_lists(list_of_dicts):
"""Convert list of dictionaries to dictionary of lists for
[`tf.data`](https://tensorflowcn.cn/api_docs/python/tf/data) conversion.
"""
dict_of_lists = collections.defaultdict(list)
for dictionary in list_of_dicts:
for key, value in dictionary.items():
dict_of_lists[key].append(value)
return dict_of_lists
train_examples = list_of_dicts_to_dict_of_lists(train_examples)
test_examples = list_of_dicts_to_dict_of_lists(test_examples)
train_ds = tf.data.Dataset.from_tensor_slices(train_examples).map(
lambda x: (x["context_movie_id"], x["label_movie_id"])
)
test_ds = tf.data.Dataset.from_tensor_slices(test_examples).map(
lambda x: (x["context_movie_id"], x["label_movie_id"])
)
我们需要对数据集进行批处理。我们还使用 cache()
和 prefetch()
以获得更好的性能。
train_ds = train_ds.batch(BATCH_SIZE).cache().prefetch(tf.data.AUTOTUNE)
test_ds = test_ds.batch(TEST_BATCH_SIZE).cache().prefetch(tf.data.AUTOTUNE)
让我们打印出一个批次。
for sample in train_ds.take(1):
print(sample)
(<tf.Tensor: shape=(4096, 10), dtype=int32, numpy=
array([[3186, 0, 0, ..., 0, 0, 0],
[3186, 1270, 0, ..., 0, 0, 0],
[3186, 1270, 1721, ..., 0, 0, 0],
...,
[2194, 1291, 2159, ..., 300, 2076, 866],
[1291, 2159, 1012, ..., 2076, 866, 2206],
[2159, 1012, 1092, ..., 866, 2206, 377]], dtype=int32)>, <tf.Tensor: shape=(4096,), dtype=int32, numpy=array([1270, 1721, 1022, ..., 2206, 377, 1357], dtype=int32)>)
在基础检索示例中,我们对用户使用了一个查询塔,对候选电影使用了候选塔。在这里,我们也将使用双塔架构。然而,我们在查询塔中使用门控循环单元 (GRU) 层来编码历史电影序列,并为候选电影保留相同的候选塔。
注意:请查看标签是如何定义的。标签张量(形状为 (batch_size, batch_size)
)包含独热向量。思路是:对于每个样本,将批次中对应于其他样本的电影 ID 视为负样本。
class SequentialRetrievalModel(keras.Model):
"""Create the sequential retrieval model.
Args:
movies_count: Total number of unique movies in the dataset.
embedding_dimension: Output dimension for movie embedding tables.
"""
def __init__(
self,
movies_count,
embedding_dimension=128,
**kwargs,
):
super().__init__(**kwargs)
# Our query tower, simply an embedding table followed by
# a GRU unit. This encodes sequence of historical movies.
self.query_model = keras.Sequential(
[
keras.layers.Embedding(movies_count + 1, embedding_dimension),
keras.layers.GRU(embedding_dimension),
]
)
# Our candidate tower, simply an embedding table.
self.candidate_model = keras.layers.Embedding(
movies_count + 1, embedding_dimension
)
# The layer that performs the retrieval.
self.retrieval = keras_rs.layers.BruteForceRetrieval(k=10, return_scores=False)
self.loss_fn = keras.losses.CategoricalCrossentropy(
from_logits=True,
)
def build(self, input_shape):
self.query_model.build(input_shape)
self.candidate_model.build(input_shape)
# In this case, the candidates are directly the movie embeddings.
# We take a shortcut and directly reuse the variable.
self.retrieval.candidate_embeddings = self.candidate_model.embeddings
self.retrieval.build(input_shape)
super().build(input_shape)
def call(self, inputs, training=False):
query_embeddings = self.query_model(inputs)
result = {
"query_embeddings": query_embeddings,
}
if not training:
# Skip the retrieval of top movies during training as the
# predictions are not used.
result["predictions"] = self.retrieval(query_embeddings)
return result
def compute_loss(self, x, y, y_pred, sample_weight, training=True):
candidate_id = y
query_embeddings = y_pred["query_embeddings"]
candidate_embeddings = self.candidate_model(candidate_id)
num_queries = keras.ops.shape(query_embeddings)[0]
num_candidates = keras.ops.shape(candidate_embeddings)[0]
# One-hot vectors for labels.
labels = keras.ops.eye(num_queries, num_candidates)
# Compute the affinity score by multiplying the two embeddings.
scores = keras.ops.matmul(
query_embeddings, keras.ops.transpose(candidate_embeddings)
)
return self.loss_fn(labels, scores, sample_weight)
让我们实例化、编译和训练我们的模型。
model = SequentialRetrievalModel(
movies_count=movies_count, embedding_dimension=EMBEDDING_DIM
)
# Compile.
model.compile(optimizer=keras.optimizers.AdamW(learning_rate=LEARNING_RATE))
# Train.
model.fit(
train_ds,
validation_data=test_ds,
epochs=NUM_EPOCHS,
)
Epoch 1/5
228/228 ━━━━━━━━━━━━━━━━━━━━ 7s 24ms/step - loss: 7.9319 - val_loss: 6.8823
Epoch 2/5
228/228 ━━━━━━━━━━━━━━━━━━━━ 2s 6ms/step - loss: 7.0997 - val_loss: 6.5517
Epoch 3/5
228/228 ━━━━━━━━━━━━━━━━━━━━ 1s 5ms/step - loss: 6.8198 - val_loss: 6.4342
Epoch 4/5
228/228 ━━━━━━━━━━━━━━━━━━━━ 1s 5ms/step - loss: 6.6873 - val_loss: 6.3748
Epoch 5/5
228/228 ━━━━━━━━━━━━━━━━━━━━ 1s 5ms/step - loss: 6.6105 - val_loss: 6.3444
<keras.src.callbacks.history.History at 0x795792c69b90>
现在我们有了模型,就可以进行预测了。
到目前为止,我们只处理了电影 ID。现在是时候创建一个以电影 ID 为键的映射,以便显示电影标题了。
movie_id_to_movie_title = dict(zip(movies_df["MovieID"], movies_df["Title"]))
movie_id_to_movie_title[0] = "" # Because id 0 is not in the dataset.
然后,我们只需使用 Keras 的 model.predict()
方法。在底层,它调用 BruteForceRetrieval
层来执行实际的检索。
请注意,此模型可以检索用户已经看过的电影。如果需要,我们可以轻松添加逻辑来去除它们。
print("\n==> Movies the user has watched:")
movie_sequence = test_ds.unbatch().take(1)
for element in movie_sequence:
for movie_id in element[0][:-1]:
print(movie_id_to_movie_title[movie_id.numpy()], end=", ")
print(movie_id_to_movie_title[element[0][-1].numpy()])
predictions = model.predict(movie_sequence.batch(1))
predictions = keras.ops.convert_to_numpy(predictions["predictions"])
print("\n==> Recommended movies for the above sequence:")
for movie_id in predictions[0]:
print(movie_id_to_movie_title[movie_id])
==> Movies the user has watched:
Beauty and the Beast (1991), Tarzan (1999), Close Shave, A (1995), Aladdin (1992), Toy Story (1995), Bug's Life, A (1998), Antz (1998), Hunchback of Notre Dame, The (1996), Hercules (1997), Mulan (1998)
1/1 ━━━━━━━━━━━━━━━━━━━━ 0s 272ms/step
==> Recommended movies for the above sequence:
Hunchback of Notre Dame, The (1996)
Anastasia (1997)
Beavis and Butt-head Do America (1996)
Hercules (1997)
Pocahontas (1995)
Thumbelina (1994)
James and the Giant Peach (1996)
We're Back! A Dinosaur's Story (1993)
Rescuers Down Under, The (1990)
Prince of Egypt, The (1998)
/usr/local/lib/python3.11/dist-packages/keras/src/trainers/epoch_iterator.py:151: UserWarning: Your input ran out of data; interrupting training. Make sure that your dataset or generator can generate at least `steps_per_epoch * epochs` batches. You may need to use the `.repeat()` function when building your dataset.
self._interrupted_warning()