作者: Sayak Paul
创建日期 2021/05/28
最后修改日期 2023/12/08
描述: 在 UCF101 数据集上使用迁移学习和循环模型训练视频分类器。
此示例演示了视频分类,这是一个重要的用例,可应用于推荐、安全等领域。我们将使用 UCF101 数据集 来构建我们的视频分类器。该数据集由分为不同动作的视频组成,例如板球击球、拳击、骑自行车等。此数据集通常用于构建动作识别器,这是视频分类的应用。
视频由帧的有序序列组成。每一帧都包含空间信息,而这些帧的序列则包含时间信息。为了对这两个方面进行建模,我们使用了一个混合架构,该架构由卷积(用于空间处理)和循环层(用于时间处理)组成。具体来说,我们将使用卷积神经网络 (CNN) 和一个由 GRU 层组成的循环神经网络 (RNN)。这种混合架构俗称 CNN-RNN。
此示例需要 TensorFlow 2.5 或更高版本,以及 TensorFlow Docs,可以使用以下命令安装
!pip install -q git+https://github.com/tensorflow/docs
为了保持此示例的运行时间相对较短,我们将使用原始 UCF101 数据集的子采样版本。您可以参考此笔记本,了解如何完成子采样。
!!wget -q https://github.com/sayakpaul/Action-Recognition-in-TensorFlow/releases/download/v1.0.0/ucf101_top5.tar.gz
!tar xf ucf101_top5.tar.gz
import os
import keras
from imutils import paths
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import imageio
import cv2
from IPython.display import Image
IMG_SIZE = 224
train_df = pd.read_csv("train.csv")
test_df = pd.read_csv("test.csv")
print(f"Total videos for training: {len(train_df)}")
print(f"Total videos for testing: {len(test_df)}")
Total videos for training: 594
Total videos for testing: 224
视频名称 | 标签 | |
492 | v_TennisSwing_g10_c03.avi | 网球挥拍 |
536 | v_TennisSwing_g16_c05.avi | 网球挥拍 |
413 | v_ShavingBeard_g16_c05.avi | 剃须 |
268 | v_Punch_g12_c04.avi | 拳击 |
288 | v_Punch_g15_c03.avi | 拳击 |
30 | v_CricketShot_g12_c03.avi | 板球击球 |
449 | v_ShavingBeard_g21_c07.avi | 剃须 |
524 | v_TennisSwing_g14_c07.avi | 网球挥拍 |
145 | v_PlayingCello_g12_c01.avi | 演奏大提琴 |
566 | v_TennisSwing_g21_c03.avi | 网球挥拍 |
训练视频分类器的众多挑战之一是找出将视频馈送到网络的方法。此博客文章讨论了五种此类方法。由于视频是帧的有序序列,我们可以直接提取帧并将它们放入 3D 张量中。但是,帧的数量可能因视频而异,这将阻止我们将它们堆叠成批次(除非我们使用填充)。作为一种替代方法,我们可以以固定的间隔保存视频帧,直到达到最大帧计数。在此示例中,我们将执行以下操作
请注意,此工作流程与涉及文本序列的问题相同。已知UCF101 数据集的视频不包含跨帧的对象和动作的极端变化。因此,仅考虑一些帧来执行学习任务可能是可以的。但是,这种方法可能无法很好地推广到其他视频分类问题。我们将使用 OpenCV 的 VideoCapture()
# The following two methods are taken from this tutorial:
# https://tensorflowcn.cn/hub/tutorials/action_recognition_with_tf_hub
def crop_center_square(frame):
y, x = frame.shape[0:2]
min_dim = min(y, x)
start_x = (x // 2) - (min_dim // 2)
start_y = (y // 2) - (min_dim // 2)
return frame[start_y : start_y + min_dim, start_x : start_x + min_dim]
def load_video(path, max_frames=0, resize=(IMG_SIZE, IMG_SIZE)):
cap = cv2.VideoCapture(path)
frames = []
while True:
ret, frame = cap.read()
if not ret:
frame = crop_center_square(frame)
frame = cv2.resize(frame, resize)
frame = frame[:, :, [2, 1, 0]]
if len(frames) == max_frames:
return np.array(frames)
我们可以使用预训练的网络从提取的帧中提取有意义的特征。Keras Applications
模块提供了许多在ImageNet-1k 数据集上预训练的最先进模型。我们将为此目的使用 InceptionV3 模型。
def build_feature_extractor():
feature_extractor = keras.applications.InceptionV3(
input_shape=(IMG_SIZE, IMG_SIZE, 3),
preprocess_input = keras.applications.inception_v3.preprocess_input
inputs = keras.Input((IMG_SIZE, IMG_SIZE, 3))
preprocessed = preprocess_input(inputs)
outputs = feature_extractor(preprocessed)
return keras.Model(inputs, outputs, name="feature_extractor")
feature_extractor = build_feature_extractor()
视频的标签是字符串。神经网络不理解字符串值,因此在将它们馈送到模型之前必须将它们转换为某种数字形式。在这里,我们将使用 StringLookup
label_processor = keras.layers.StringLookup(
num_oov_indices=0, vocabulary=np.unique(train_df["tag"])
['CricketShot', 'PlayingCello', 'Punch', 'ShavingBeard', 'TennisSwing']
def prepare_all_videos(df, root_dir):
num_samples = len(df)
video_paths = df["video_name"].values.tolist()
labels = df["tag"].values
labels = keras.ops.convert_to_numpy(label_processor(labels[..., None]))
# `frame_masks` and `frame_features` are what we will feed to our sequence model.
# `frame_masks` will contain a bunch of booleans denoting if a timestep is
# masked with padding or not.
frame_masks = np.zeros(shape=(num_samples, MAX_SEQ_LENGTH), dtype="bool")
frame_features = np.zeros(
shape=(num_samples, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32"
# For each video.
for idx, path in enumerate(video_paths):
# Gather all its frames and add a batch dimension.
frames = load_video(os.path.join(root_dir, path))
frames = frames[None, ...]
# Initialize placeholders to store the masks and features of the current video.
temp_frame_mask = np.zeros(
temp_frame_features = np.zeros(
shape=(1, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32"
# Extract features from the frames of the current video.
for i, batch in enumerate(frames):
video_length = batch.shape[0]
length = min(MAX_SEQ_LENGTH, video_length)
for j in range(length):
temp_frame_features[i, j, :] = feature_extractor.predict(
batch[None, j, :], verbose=0,
temp_frame_mask[i, :length] = 1 # 1 = not masked, 0 = masked
frame_features[idx,] = temp_frame_features.squeeze()
frame_masks[idx,] = temp_frame_mask.squeeze()
return (frame_features, frame_masks), labels
train_data, train_labels = prepare_all_videos(train_df, "train")
test_data, test_labels = prepare_all_videos(test_df, "test")
print(f"Frame features in train set: {train_data[0].shape}")
print(f"Frame masks in train set: {train_data[1].shape}")
Frame features in train set: (594, 20, 2048)
Frame masks in train set: (594, 20)
上述代码块的执行时间大约需要 20 分钟,具体取决于执行它的机器。
现在,我们可以将此数据馈送到由 GRU
# Utility for our sequence model.
def get_sequence_model():
class_vocab = label_processor.get_vocabulary()
frame_features_input = keras.Input((MAX_SEQ_LENGTH, NUM_FEATURES))
mask_input = keras.Input((MAX_SEQ_LENGTH,), dtype="bool")
# Refer to the following tutorial to understand the significance of using `mask`:
# https://keras.org.cn/api/layers/recurrent_layers/gru/
x = keras.layers.GRU(16, return_sequences=True)(
frame_features_input, mask=mask_input
x = keras.layers.GRU(8)(x)
x = keras.layers.Dropout(0.4)(x)
x = keras.layers.Dense(8, activation="relu")(x)
output = keras.layers.Dense(len(class_vocab), activation="softmax")(x)
rnn_model = keras.Model([frame_features_input, mask_input], output)
loss="sparse_categorical_crossentropy", optimizer="adam", metrics=["accuracy"]
return rnn_model
# Utility for running experiments.
def run_experiment():
filepath = "/tmp/video_classifier/ckpt.weights.h5"
checkpoint = keras.callbacks.ModelCheckpoint(
filepath, save_weights_only=True, save_best_only=True, verbose=1
seq_model = get_sequence_model()
history = seq_model.fit(
[train_data[0], train_data[1]],
_, accuracy = seq_model.evaluate([test_data[0], test_data[1]], test_labels)
print(f"Test accuracy: {round(accuracy * 100, 2)}%")
return history, seq_model
_, sequence_model = run_experiment()
7/7 ━━━━━━━━━━━━━━━━━━━━ 0s 3ms/step - accuracy: 0.7816 - loss: 1.0624
Test accuracy: 56.7%
注意:为了使本示例的运行时间相对较短,我们仅使用了几个训练示例。相对于所使用的具有 99,909 个可训练参数的序列模型来说,这个训练示例的数量是很少的。我们建议您使用上面提到的笔记本从 UCF101 数据集中采样更多数据并训练相同的模型。
def prepare_single_video(frames):
frames = frames[None, ...]
frame_mask = np.zeros(
frame_features = np.zeros(shape=(1, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32")
for i, batch in enumerate(frames):
video_length = batch.shape[0]
length = min(MAX_SEQ_LENGTH, video_length)
for j in range(length):
frame_features[i, j, :] = feature_extractor.predict(batch[None, j, :])
frame_mask[i, :length] = 1 # 1 = not masked, 0 = masked
return frame_features, frame_mask
def sequence_prediction(path):
class_vocab = label_processor.get_vocabulary()
frames = load_video(os.path.join("test", path))
frame_features, frame_mask = prepare_single_video(frames)
probabilities = sequence_model.predict([frame_features, frame_mask])[0]
for i in np.argsort(probabilities)[::-1]:
print(f" {class_vocab[i]}: {probabilities[i] * 100:5.2f}%")
return frames
# This utility is for visualization.
# Referenced from:
# https://tensorflowcn.cn/hub/tutorials/action_recognition_with_tf_hub
def to_gif(images):
converted_images = images.astype(np.uint8)
imageio.mimsave("animation.gif", converted_images, duration=100)
return Image("animation.gif")
test_video = np.random.choice(test_df["video_name"].values.tolist())
print(f"Test video path: {test_video}")
test_frames = sequence_prediction(test_video)
Test video path: v_TennisSwing_g03_c01.avi
CricketShot: 46.99%
ShavingBeard: 18.83%
TennisSwing: 14.65%
Punch: 12.41%
PlayingCello: 7.12%
