作者: A_K_Nain,Sayak Paul
创建时间 2021/08/16
最后修改时间 2023/07/06
描述: 使用可变长度序列训练手写识别模型。
此示例展示了如何将 验证码 OCR 示例扩展到 IAM 数据集,该数据集具有可变长度的真实目标。数据集中的每个样本都是一些手写文本的图像,其对应目标是图像中存在的字符串。IAM 数据集广泛用于许多 OCR 基准测试,因此我们希望此示例可以作为构建 OCR 系统的良好起点。
!wget -q https://github.com/sayakpaul/Handwriting-Recognizer-in-Keras/releases/download/v1.0.0/IAM_Words.zip
!unzip -qq IAM_Words.zip
!mkdir data
!mkdir data/words
!tar -xf IAM_Words/words.tgz -C data/words
!mv IAM_Words/words.txt data
!head -20 data/words.txt
#--- words.txt ---------------------------------------------------------------#
# iam database word information
# format: a01-000u-00-00 ok 154 1 408 768 27 51 AT A
# a01-000u-00-00 -> word id for line 00 in form a01-000u
# ok -> result of word segmentation
# ok: word was correctly
# er: segmentation of word can be bad
# 154 -> graylevel to binarize the line containing this word
# 1 -> number of components for this word
# 408 768 27 51 -> bounding box around this word in x,y,w,h format
# AT -> the grammatical tag for this word, see the
# file tagset.txt for an explanation
# A -> the transcription for this word
a01-000u-00-00 ok 154 408 768 27 51 AT A
a01-000u-00-01 ok 154 507 766 213 48 NN MOVE
from tensorflow.keras.layers import StringLookup
from tensorflow import keras
import matplotlib.pyplot as plt
import tensorflow as tf
import numpy as np
import os
base_path = "data"
words_list = []
words = open(f"{base_path}/words.txt", "r").readlines()
for line in words:
if line[0] == "#":
if line.split(" ")[1] != "err": # We don't need to deal with errored entries.
我们将数据集拆分为三个子集,比例为 90:5:5(训练:验证:测试)。
split_idx = int(0.9 * len(words_list))
train_samples = words_list[:split_idx]
test_samples = words_list[split_idx:]
val_split_idx = int(0.5 * len(test_samples))
validation_samples = test_samples[:val_split_idx]
test_samples = test_samples[val_split_idx:]
assert len(words_list) == len(train_samples) + len(validation_samples) + len(
print(f"Total training samples: {len(train_samples)}")
print(f"Total validation samples: {len(validation_samples)}")
print(f"Total test samples: {len(test_samples)}")
Total training samples: 86810
Total validation samples: 4823
Total test samples: 4823
base_image_path = os.path.join(base_path, "words")
def get_image_paths_and_labels(samples):
paths = []
corrected_samples = []
for (i, file_line) in enumerate(samples):
line_split = file_line.strip()
line_split = line_split.split(" ")
# Each line split will have this format for the corresponding image:
# part1/part1-part2/part1-part2-part3.png
image_name = line_split[0]
partI = image_name.split("-")[0]
partII = image_name.split("-")[1]
img_path = os.path.join(
base_image_path, partI, partI + "-" + partII, image_name + ".png"
if os.path.getsize(img_path):
return paths, corrected_samples
train_img_paths, train_labels = get_image_paths_and_labels(train_samples)
validation_img_paths, validation_labels = get_image_paths_and_labels(validation_samples)
test_img_paths, test_labels = get_image_paths_and_labels(test_samples)
# Find maximum length and the size of the vocabulary in the training data.
train_labels_cleaned = []
characters = set()
max_len = 0
for label in train_labels:
label = label.split(" ")[-1].strip()
for char in label:
max_len = max(max_len, len(label))
characters = sorted(list(characters))
print("Maximum length: ", max_len)
print("Vocab size: ", len(characters))
# Check some label samples.
Maximum length: 21
Vocab size: 78
def clean_labels(labels):
cleaned_labels = []
for label in labels:
label = label.split(" ")[-1].strip()
return cleaned_labels
validation_labels_cleaned = clean_labels(validation_labels)
test_labels_cleaned = clean_labels(test_labels)
Keras 提供了不同的预处理层来处理不同类型的数据。 本指南提供了全面介绍。我们的示例涉及在字符级别预处理标签。这意味着,如果存在两个标签,例如“cat”和“dog”,那么我们的字符词汇表应该是 {a, c, d, g, o, t}(没有任何特殊标记)。我们使用 StringLookup
# Mapping characters to integers.
char_to_num = StringLookup(vocabulary=list(characters), mask_token=None)
# Mapping integers back to original characters.
num_to_char = StringLookup(
vocabulary=char_to_num.get_vocabulary(), mask_token=None, invert=True
许多 OCR 模型使用矩形图像,而不是正方形图像。当我们可视化数据集中的几个样本时,这一点将更加清晰。虽然针对正方形图像的纵横比无关调整大小不会引入大量的失真,但对于矩形图像来说并非如此。但是,将图像调整为统一大小是进行小批量处理的要求。因此,我们需要执行调整大小操作,以满足以下条件
def distortion_free_resize(image, img_size):
w, h = img_size
image = tf.image.resize(image, size=(h, w), preserve_aspect_ratio=True)
# Check tha amount of padding needed to be done.
pad_height = h - tf.shape(image)[0]
pad_width = w - tf.shape(image)[1]
# Only necessary if you want to do same amount of padding on both sides.
if pad_height % 2 != 0:
height = pad_height // 2
pad_height_top = height + 1
pad_height_bottom = height
pad_height_top = pad_height_bottom = pad_height // 2
if pad_width % 2 != 0:
width = pad_width // 2
pad_width_left = width + 1
pad_width_right = width
pad_width_left = pad_width_right = pad_width // 2
image = tf.pad(
[pad_height_top, pad_height_bottom],
[pad_width_left, pad_width_right],
[0, 0],
image = tf.transpose(image, perm=[1, 0, 2])
image = tf.image.flip_left_right(image)
return image
batch_size = 64
padding_token = 99
image_width = 128
image_height = 32
def preprocess_image(image_path, img_size=(image_width, image_height)):
image = tf.io.read_file(image_path)
image = tf.image.decode_png(image, 1)
image = distortion_free_resize(image, img_size)
image = tf.cast(image, tf.float32) / 255.0
return image
def vectorize_label(label):
label = char_to_num(tf.strings.unicode_split(label, input_encoding="UTF-8"))
length = tf.shape(label)[0]
pad_amount = max_len - length
label = tf.pad(label, paddings=[[0, pad_amount]], constant_values=padding_token)
return label
def process_images_labels(image_path, label):
image = preprocess_image(image_path)
label = vectorize_label(label)
return {"image": image, "label": label}
def prepare_dataset(image_paths, labels):
dataset = tf.data.Dataset.from_tensor_slices((image_paths, labels)).map(
process_images_labels, num_parallel_calls=AUTOTUNE
return dataset.batch(batch_size).cache().prefetch(AUTOTUNE)
对象train_ds = prepare_dataset(train_img_paths, train_labels_cleaned)
validation_ds = prepare_dataset(validation_img_paths, validation_labels_cleaned)
test_ds = prepare_dataset(test_img_paths, test_labels_cleaned)
for data in train_ds.take(1):
images, labels = data["image"], data["label"]
_, ax = plt.subplots(4, 4, figsize=(15, 8))
for i in range(16):
img = images[i]
img = tf.image.flip_left_right(img)
img = tf.transpose(img, perm=[1, 0, 2])
img = (img * 255.0).numpy().clip(0, 255).astype(np.uint8)
img = img[:, :, 0]
# Gather indices where label!= padding_token.
label = labels[i]
indices = tf.gather(label, tf.where(tf.math.not_equal(label, padding_token)))
# Convert to string.
label = tf.strings.reduce_join(num_to_char(indices))
label = label.numpy().decode("utf-8")
ax[i // 4, i % 4].imshow(img, cmap="gray")
ax[i // 4, i % 4].set_title(label)
ax[i // 4, i % 4].axis("off")
我们的模型将使用 CTC 损失作为端点层。要详细了解 CTC 损失,请参考 这篇文章。
class CTCLayer(keras.layers.Layer):
def __init__(self, name=None):
self.loss_fn = keras.backend.ctc_batch_cost
def call(self, y_true, y_pred):
batch_len = tf.cast(tf.shape(y_true)[0], dtype="int64")
input_length = tf.cast(tf.shape(y_pred)[1], dtype="int64")
label_length = tf.cast(tf.shape(y_true)[1], dtype="int64")
input_length = input_length * tf.ones(shape=(batch_len, 1), dtype="int64")
label_length = label_length * tf.ones(shape=(batch_len, 1), dtype="int64")
loss = self.loss_fn(y_true, y_pred, input_length, label_length)
# At test time, just return the computed predictions.
return y_pred
def build_model():
# Inputs to the model
input_img = keras.Input(shape=(image_width, image_height, 1), name="image")
labels = keras.layers.Input(name="label", shape=(None,))
# First conv block.
x = keras.layers.Conv2D(
(3, 3),
x = keras.layers.MaxPooling2D((2, 2), name="pool1")(x)
# Second conv block.
x = keras.layers.Conv2D(
(3, 3),
x = keras.layers.MaxPooling2D((2, 2), name="pool2")(x)
# We have used two max pool with pool size and strides 2.
# Hence, downsampled feature maps are 4x smaller. The number of
# filters in the last layer is 64. Reshape accordingly before
# passing the output to the RNN part of the model.
new_shape = ((image_width // 4), (image_height // 4) * 64)
x = keras.layers.Reshape(target_shape=new_shape, name="reshape")(x)
x = keras.layers.Dense(64, activation="relu", name="dense1")(x)
x = keras.layers.Dropout(0.2)(x)
# RNNs.
x = keras.layers.Bidirectional(
keras.layers.LSTM(128, return_sequences=True, dropout=0.25)
x = keras.layers.Bidirectional(
keras.layers.LSTM(64, return_sequences=True, dropout=0.25)
# +2 is to account for the two special tokens introduced by the CTC loss.
# The recommendation comes here: https://git.io/J0eXP.
x = keras.layers.Dense(
len(char_to_num.get_vocabulary()) + 2, activation="softmax", name="dense2"
# Add CTC layer for calculating CTC loss at each step.
output = CTCLayer(name="ctc_loss")(labels, x)
# Define the model.
model = keras.models.Model(
inputs=[input_img, labels], outputs=output, name="handwriting_recognizer"
# Optimizer.
opt = keras.optimizers.Adam()
# Compile the model and return.
return model
# Get the model.
model = build_model()
Model: "handwriting_recognizer"
Layer (type) Output Shape Param # Connected to
image (InputLayer) [(None, 128, 32, 1)] 0
Conv1 (Conv2D) (None, 128, 32, 32) 320 image[0][0]
pool1 (MaxPooling2D) (None, 64, 16, 32) 0 Conv1[0][0]
Conv2 (Conv2D) (None, 64, 16, 64) 18496 pool1[0][0]
pool2 (MaxPooling2D) (None, 32, 8, 64) 0 Conv2[0][0]
reshape (Reshape) (None, 32, 512) 0 pool2[0][0]
dense1 (Dense) (None, 32, 64) 32832 reshape[0][0]
dropout (Dropout) (None, 32, 64) 0 dense1[0][0]
bidirectional (Bidirectional) (None, 32, 256) 197632 dropout[0][0]
bidirectional_1 (Bidirectional) (None, 32, 128) 164352 bidirectional[0][0]
label (InputLayer) [(None, None)] 0
dense2 (Dense) (None, 32, 81) 10449 bidirectional_1[0][0]
ctc_loss (CTCLayer) (None, 32, 81) 0 label[0][0]
Total params: 424,081
Trainable params: 424,081
Non-trainable params: 0
编辑距离 是评估 OCR 模型最常用的指标。在本节中,我们将实现它并将其用作回调来监控我们的模型。
validation_images = []
validation_labels = []
for batch in validation_ds:
def calculate_edit_distance(labels, predictions):
# Get a single batch and convert its labels to sparse tensors.
saprse_labels = tf.cast(tf.sparse.from_dense(labels), dtype=tf.int64)
# Make predictions and convert them to sparse tensors.
input_len = np.ones(predictions.shape[0]) * predictions.shape[1]
predictions_decoded = keras.backend.ctc_decode(
predictions, input_length=input_len, greedy=True
)[0][0][:, :max_len]
sparse_predictions = tf.cast(
tf.sparse.from_dense(predictions_decoded), dtype=tf.int64
# Compute individual edit distances and average them out.
edit_distances = tf.edit_distance(
sparse_predictions, saprse_labels, normalize=False
return tf.reduce_mean(edit_distances)
class EditDistanceCallback(keras.callbacks.Callback):
def __init__(self, pred_model):
self.prediction_model = pred_model
def on_epoch_end(self, epoch, logs=None):
edit_distances = []
for i in range(len(validation_images)):
labels = validation_labels[i]
predictions = self.prediction_model.predict(validation_images[i])
edit_distances.append(calculate_edit_distance(labels, predictions).numpy())
f"Mean edit distance for epoch {epoch + 1}: {np.mean(edit_distances):.4f}"
epochs = 10 # To get good results this should be at least 50.
model = build_model()
prediction_model = keras.models.Model(
model.get_layer(name="image").input, model.get_layer(name="dense2").output
edit_distance_callback = EditDistanceCallback(prediction_model)
# Train the model.
history = model.fit(
Epoch 1/10
1357/1357 [==============================] - 89s 51ms/step - loss: 13.6670 - val_loss: 11.8041
Mean edit distance for epoch 1: 20.5117
Epoch 2/10
1357/1357 [==============================] - 48s 36ms/step - loss: 10.6864 - val_loss: 9.6994
Mean edit distance for epoch 2: 20.1167
Epoch 3/10
1357/1357 [==============================] - 48s 35ms/step - loss: 9.0437 - val_loss: 8.0355
Mean edit distance for epoch 3: 19.7270
Epoch 4/10
1357/1357 [==============================] - 48s 35ms/step - loss: 7.6098 - val_loss: 6.4239
Mean edit distance for epoch 4: 19.1106
Epoch 5/10
1357/1357 [==============================] - 48s 35ms/step - loss: 6.3194 - val_loss: 4.9814
Mean edit distance for epoch 5: 18.4894
Epoch 6/10
1357/1357 [==============================] - 48s 35ms/step - loss: 5.3417 - val_loss: 4.1307
Mean edit distance for epoch 6: 18.1909
Epoch 7/10
1357/1357 [==============================] - 48s 35ms/step - loss: 4.6396 - val_loss: 3.7706
Mean edit distance for epoch 7: 18.1224
Epoch 8/10
1357/1357 [==============================] - 48s 35ms/step - loss: 4.1926 - val_loss: 3.3682
Mean edit distance for epoch 8: 17.9387
Epoch 9/10
1357/1357 [==============================] - 48s 36ms/step - loss: 3.8532 - val_loss: 3.1829
Mean edit distance for epoch 9: 17.9074
Epoch 10/10
1357/1357 [==============================] - 49s 36ms/step - loss: 3.5769 - val_loss: 2.9221
Mean edit distance for epoch 10: 17.7960
# A utility function to decode the output of the network.
def decode_batch_predictions(pred):
input_len = np.ones(pred.shape[0]) * pred.shape[1]
# Use greedy search. For complex tasks, you can use beam search.
results = keras.backend.ctc_decode(pred, input_length=input_len, greedy=True)[0][0][
:, :max_len
# Iterate over the results and get back the text.
output_text = []
for res in results:
res = tf.gather(res, tf.where(tf.math.not_equal(res, -1)))
res = tf.strings.reduce_join(num_to_char(res)).numpy().decode("utf-8")
return output_text
# Let's check results on some test samples.
for batch in test_ds.take(1):
batch_images = batch["image"]
_, ax = plt.subplots(4, 4, figsize=(15, 8))
preds = prediction_model.predict(batch_images)
pred_texts = decode_batch_predictions(preds)
for i in range(16):
img = batch_images[i]
img = tf.image.flip_left_right(img)
img = tf.transpose(img, perm=[1, 0, 2])
img = (img * 255.0).numpy().clip(0, 255).astype(np.uint8)
img = img[:, :, 0]
title = f"Prediction: {pred_texts[i]}"
ax[i // 4, i % 4].imshow(img, cmap="gray")
ax[i // 4, i % 4].set_title(title)
ax[i // 4, i % 4].axis("off")
为了获得更好的结果,模型应该至少训练 50 个 epoch。
与 TensorFlow Lite 完全兼容。如果您有兴趣,可以在移动应用程序中使用它。您可能会发现 此笔记本在这方面很有用。