代码示例 / 自然语言处理 / 使用 KerasHub 进行英西翻译

使用 KerasHub 进行英西翻译

作者: Abheesht Sharma
创建日期 2022/05/26
最后修改日期 2024/04/30
描述: 使用 KerasHub 在机器翻译任务上训练一个序列到序列 Transformer 模型。

ⓘ 此示例使用 Keras 3

在 Colab 中查看 GitHub 源代码


简介

KerasHub 提供了 NLP 的构建模块(模型层、分词器、指标等),并方便构建 NLP 管道。

在本例中,我们将使用 KerasHub 层构建一个编码器-解码器 Transformer 模型,并在英西机器翻译任务上对其进行训练。

此示例基于 fchollet 编写的英西 NMT 示例。原始示例更底层,并从头实现层,而本示例使用 KerasHub 展示了一些更高级的方法,例如子词分词和使用指标计算生成翻译的质量。

您将学习如何

如果您不熟悉 KerasHub,请不要担心。本教程将从基础开始。让我们开始吧!


设置

在开始实现管道之前,让我们导入所有需要的库。

!pip install -q --upgrade rouge-score
!pip install -q --upgrade keras-hub
!pip install -q --upgrade keras  # Upgrade to Keras 3.
import keras_hub
import pathlib
import random

import keras
from keras import ops

import tensorflow.data as tf_data
from tensorflow_text.tools.wordpiece_vocab import (
    bert_vocab_from_dataset as bert_vocab,
)
ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
tensorflow 2.15.1 requires keras<2.16,>=2.15.0, but you have keras 3.3.3 which is incompatible.

让我们也定义我们的参数/超参数。

BATCH_SIZE = 64
EPOCHS = 1  # This should be at least 10 for convergence
MAX_SEQUENCE_LENGTH = 40
ENG_VOCAB_SIZE = 15000
SPA_VOCAB_SIZE = 15000

EMBED_DIM = 256
INTERMEDIATE_DIM = 2048
NUM_HEADS = 8

下载数据

我们将使用由 Anki 提供的英西翻译数据集。让我们下载它。

text_file = keras.utils.get_file(
    fname="spa-eng.zip",
    origin="http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip",
    extract=True,
)
text_file = pathlib.Path(text_file).parent / "spa-eng" / "spa.txt"
Downloading data from http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip
 2638744/2638744 ━━━━━━━━━━━━━━━━━━━━ 0s 0us/step

解析数据

每行包含一个英文句子及其对应的西班牙语句子。英文句子是源序列,西班牙语句子是目标序列。在将文本添加到列表之前,我们将它转换为小写。

with open(text_file) as f:
    lines = f.read().split("\n")[:-1]
text_pairs = []
for line in lines:
    eng, spa = line.split("\t")
    eng = eng.lower()
    spa = spa.lower()
    text_pairs.append((eng, spa))

以下是我们的句子对的样子

for _ in range(5):
    print(random.choice(text_pairs))
('tom heard that mary had bought a new computer.', 'tom oyó que mary se había comprado un computador nuevo.')
('will you stay at home?', '¿te vas a quedar en casa?')
('where is this train going?', '¿adónde va este tren?')
('tom panicked.', 'tom entró en pánico.')
("we'll help you rescue tom.", 'te ayudaremos a rescatar a tom.')

现在,让我们将句子对拆分为训练集、验证集和测试集。

random.shuffle(text_pairs)
num_val_samples = int(0.15 * len(text_pairs))
num_train_samples = len(text_pairs) - 2 * num_val_samples
train_pairs = text_pairs[:num_train_samples]
val_pairs = text_pairs[num_train_samples : num_train_samples + num_val_samples]
test_pairs = text_pairs[num_train_samples + num_val_samples :]

print(f"{len(text_pairs)} total pairs")
print(f"{len(train_pairs)} training pairs")
print(f"{len(val_pairs)} validation pairs")
print(f"{len(test_pairs)} test pairs")
118964 total pairs
83276 training pairs
17844 validation pairs
17844 test pairs

对数据进行分词

我们将定义两个分词器 - 一个用于源语言(英语),另一个用于目标语言(西班牙语)。我们将使用 keras_hub.tokenizers.WordPieceTokenizer 对文本进行分词。keras_hub.tokenizers.WordPieceTokenizer 获取一个 WordPiece 词汇表,并具有对文本进行分词和将标记序列去分词的功能。

在定义这两个分词器之前,我们首先需要在我们拥有的数据集上对其进行训练。WordPiece 分词算法是一种子词分词算法;在语料库上对其进行训练会为我们提供一个子词词汇表。子词分词器是词分词器(词分词器需要非常大的词汇表才能很好地覆盖输入词)和字符分词器(字符不像词那样真正地编码含义)之间的折衷方案。幸运的是,KerasHub 使得使用 keras_hub.tokenizers.compute_word_piece_vocabulary 工具在语料库上训练 WordPiece 变得非常简单。

def train_word_piece(text_samples, vocab_size, reserved_tokens):
    word_piece_ds = tf_data.Dataset.from_tensor_slices(text_samples)
    vocab = keras_hub.tokenizers.compute_word_piece_vocabulary(
        word_piece_ds.batch(1000).prefetch(2),
        vocabulary_size=vocab_size,
        reserved_tokens=reserved_tokens,
    )
    return vocab

每个词汇表都有一些特殊的保留标记。我们有四个这样的标记

  • "[PAD]" - 填充标记。当输入序列长度小于最大序列长度时,将填充标记附加到输入序列长度。
  • "[UNK]" - 未知标记。
  • "[START]" - 标记输入序列开始的标记。
  • "[END]" - 标记输入序列结束的标记。
reserved_tokens = ["[PAD]", "[UNK]", "[START]", "[END]"]

eng_samples = [text_pair[0] for text_pair in train_pairs]
eng_vocab = train_word_piece(eng_samples, ENG_VOCAB_SIZE, reserved_tokens)

spa_samples = [text_pair[1] for text_pair in train_pairs]
spa_vocab = train_word_piece(spa_samples, SPA_VOCAB_SIZE, reserved_tokens)

让我们看看一些标记!

print("English Tokens: ", eng_vocab[100:110])
print("Spanish Tokens: ", spa_vocab[100:110])
English Tokens:  ['at', 'know', 'him', 'there', 'go', 'they', 'her', 'has', 'time', 'will']
Spanish Tokens:  ['le', 'para', 'te', 'mary', 'las', 'más', 'al', 'yo', 'tu', 'estoy']

现在,让我们定义分词器。我们将使用上面训练的词汇表配置分词器。

eng_tokenizer = keras_hub.tokenizers.WordPieceTokenizer(
    vocabulary=eng_vocab, lowercase=False
)
spa_tokenizer = keras_hub.tokenizers.WordPieceTokenizer(
    vocabulary=spa_vocab, lowercase=False
)

让我们尝试对我们数据集中的一条样本进行分词!为了验证文本是否已正确分词,我们还可以将标记列表去分词回原始文本。

eng_input_ex = text_pairs[0][0]
eng_tokens_ex = eng_tokenizer.tokenize(eng_input_ex)
print("English sentence: ", eng_input_ex)
print("Tokens: ", eng_tokens_ex)
print(
    "Recovered text after detokenizing: ",
    eng_tokenizer.detokenize(eng_tokens_ex),
)

print()

spa_input_ex = text_pairs[0][1]
spa_tokens_ex = spa_tokenizer.tokenize(spa_input_ex)
print("Spanish sentence: ", spa_input_ex)
print("Tokens: ", spa_tokens_ex)
print(
    "Recovered text after detokenizing: ",
    spa_tokenizer.detokenize(spa_tokens_ex),
)
English sentence:  i am leaving the books here.
Tokens:  tf.Tensor([ 35 163 931  66 356 119  12], shape=(7,), dtype=int32)
Recovered text after detokenizing:  tf.Tensor(b'i am leaving the books here .', shape=(), dtype=string)
Spanish sentence:  dejo los libros aquí.
Tokens:  tf.Tensor([2962   93  350  122   14], shape=(5,), dtype=int32)
Recovered text after detokenizing:  tf.Tensor(b'dejo los libros aqu\xc3\xad .', shape=(), dtype=string)

格式化数据集

接下来,我们将格式化我们的数据集。

在每个训练步骤中,模型将试图使用源句子和目标词 0 到 N 来预测目标词 N+1(及以后)。

因此,训练数据集将产生一个元组 (inputs, targets),其中

  • inputs 是一个字典,其键为 encoder_inputsdecoder_inputsencoder_inputs 是分词后的源句子,decoder_inputs 是到目前为止的目标句子,也就是说,用于预测目标句子中词 N+1(及以后)的词 0 到 N。
  • target 是目标句子偏移一步:它提供了目标句子中的下一个词 - 模型将尝试预测的内容。

在对文本进行分词后,我们将向输入西班牙语句子添加特殊标记 "[START]""[END]"。我们还将输入填充到固定长度。这可以使用 keras_hub.layers.StartEndPacker 轻松完成。

def preprocess_batch(eng, spa):
    batch_size = ops.shape(spa)[0]

    eng = eng_tokenizer(eng)
    spa = spa_tokenizer(spa)

    # Pad `eng` to `MAX_SEQUENCE_LENGTH`.
    eng_start_end_packer = keras_hub.layers.StartEndPacker(
        sequence_length=MAX_SEQUENCE_LENGTH,
        pad_value=eng_tokenizer.token_to_id("[PAD]"),
    )
    eng = eng_start_end_packer(eng)

    # Add special tokens (`"[START]"` and `"[END]"`) to `spa` and pad it as well.
    spa_start_end_packer = keras_hub.layers.StartEndPacker(
        sequence_length=MAX_SEQUENCE_LENGTH + 1,
        start_value=spa_tokenizer.token_to_id("[START]"),
        end_value=spa_tokenizer.token_to_id("[END]"),
        pad_value=spa_tokenizer.token_to_id("[PAD]"),
    )
    spa = spa_start_end_packer(spa)

    return (
        {
            "encoder_inputs": eng,
            "decoder_inputs": spa[:, :-1],
        },
        spa[:, 1:],
    )


def make_dataset(pairs):
    eng_texts, spa_texts = zip(*pairs)
    eng_texts = list(eng_texts)
    spa_texts = list(spa_texts)
    dataset = tf_data.Dataset.from_tensor_slices((eng_texts, spa_texts))
    dataset = dataset.batch(BATCH_SIZE)
    dataset = dataset.map(preprocess_batch, num_parallel_calls=tf_data.AUTOTUNE)
    return dataset.shuffle(2048).prefetch(16).cache()


train_ds = make_dataset(train_pairs)
val_ds = make_dataset(val_pairs)

让我们快速查看一下序列形状(我们有 64 对批次,所有序列都为 40 步长)

for inputs, targets in train_ds.take(1):
    print(f'inputs["encoder_inputs"].shape: {inputs["encoder_inputs"].shape}')
    print(f'inputs["decoder_inputs"].shape: {inputs["decoder_inputs"].shape}')
    print(f"targets.shape: {targets.shape}")
inputs["encoder_inputs"].shape: (64, 40)
inputs["decoder_inputs"].shape: (64, 40)
targets.shape: (64, 40)

构建模型

现在,让我们继续激动人心的部分 - 定义我们的模型!我们首先需要一个嵌入层,即输入序列中每个标记的向量。此嵌入层可以随机初始化。我们还需要一个位置嵌入层,它对序列中的词序进行编码。惯例是将这两个嵌入相加。KerasHub 有一个 keras_hub.layers.TokenAndPositionEmbedding 层可以为我们完成所有上述步骤。

我们的序列到序列 Transformer 由一个 keras_hub.layers.TransformerEncoder 层和一个 keras_hub.layers.TransformerDecoder 层连接在一起。

源序列将传递给 keras_hub.layers.TransformerEncoder,它将生成一个新的表示。然后,此新表示将与到目前为止的目标序列(目标词 0 到 N)一起传递给 keras_hub.layers.TransformerDecoderkeras_hub.layers.TransformerDecoder 然后将试图预测目标序列中的下一个词(N+1 及以后)。

使这成为可能的一个关键细节是因果掩码。keras_hub.layers.TransformerDecoder 同时看到整个序列,因此我们必须确保它在预测标记 N+1 时仅使用目标标记 0 到 N 的信息(否则,它可能会使用来自未来的信息,这将导致一个在推理时无法使用的模型)。因果掩码在 keras_hub.layers.TransformerDecoder 中默认启用。

我们还需要屏蔽填充标记("[PAD]")。为此,我们可以将 keras_hub.layers.TokenAndPositionEmbedding 层的 mask_zero 参数设置为 True。然后,这将传播到所有后续层。

# Encoder
encoder_inputs = keras.Input(shape=(None,), name="encoder_inputs")

x = keras_hub.layers.TokenAndPositionEmbedding(
    vocabulary_size=ENG_VOCAB_SIZE,
    sequence_length=MAX_SEQUENCE_LENGTH,
    embedding_dim=EMBED_DIM,
)(encoder_inputs)

encoder_outputs = keras_hub.layers.TransformerEncoder(
    intermediate_dim=INTERMEDIATE_DIM, num_heads=NUM_HEADS
)(inputs=x)
encoder = keras.Model(encoder_inputs, encoder_outputs)


# Decoder
decoder_inputs = keras.Input(shape=(None,), name="decoder_inputs")
encoded_seq_inputs = keras.Input(shape=(None, EMBED_DIM), name="decoder_state_inputs")

x = keras_hub.layers.TokenAndPositionEmbedding(
    vocabulary_size=SPA_VOCAB_SIZE,
    sequence_length=MAX_SEQUENCE_LENGTH,
    embedding_dim=EMBED_DIM,
)(decoder_inputs)

x = keras_hub.layers.TransformerDecoder(
    intermediate_dim=INTERMEDIATE_DIM, num_heads=NUM_HEADS
)(decoder_sequence=x, encoder_sequence=encoded_seq_inputs)
x = keras.layers.Dropout(0.5)(x)
decoder_outputs = keras.layers.Dense(SPA_VOCAB_SIZE, activation="softmax")(x)
decoder = keras.Model(
    [
        decoder_inputs,
        encoded_seq_inputs,
    ],
    decoder_outputs,
)
decoder_outputs = decoder([decoder_inputs, encoder_outputs])

transformer = keras.Model(
    [encoder_inputs, decoder_inputs],
    decoder_outputs,
    name="transformer",
)

训练我们的模型

我们将使用准确率作为一种快速监控验证数据上训练进度的方法。请注意,机器翻译通常使用 BLEU 分数以及其他指标,而不是准确率。但是,为了使用 ROUGE、BLEU 等指标,我们需要解码概率并生成文本。文本生成在计算上代价很高,在训练期间执行此操作是不推荐的。

这里我们只训练了 1 个 epoch,但要使模型真正收敛,您应该至少训练 10 个 epoch。

transformer.summary()
transformer.compile(
    "rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"]
)
transformer.fit(train_ds, epochs=EPOCHS, validation_data=val_ds)
Model: "transformer"
┏━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┓
┃ Layer (type)         Output Shape          Param #  Connected to      ┃
┡━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━┩
│ encoder_inputs      │ (None, None)      │          0 │ -                 │
│ (InputLayer)        │                   │            │                   │
├─────────────────────┼───────────────────┼────────────┼───────────────────┤
│ token_and_position… │ (None, None, 256) │  3,850,240 │ encoder_inputs[0… │
│ (TokenAndPositionE… │                   │            │                   │
├─────────────────────┼───────────────────┼────────────┼───────────────────┤
│ decoder_inputs      │ (None, None)      │          0 │ -                 │
│ (InputLayer)        │                   │            │                   │
├─────────────────────┼───────────────────┼────────────┼───────────────────┤
│ transformer_encoder │ (None, None, 256) │  1,315,072 │ token_and_positi… │
│ (TransformerEncode… │                   │            │                   │
├─────────────────────┼───────────────────┼────────────┼───────────────────┤
│ functional_3        │ (None, None,      │  9,283,992 │ decoder_inputs[0… │
│ (Functional)        │ 15000)            │            │ transformer_enco… │
└─────────────────────┴───────────────────┴────────────┴───────────────────┘
 Total params: 14,449,304 (55.12 MB)
 Trainable params: 14,449,304 (55.12 MB)
 Non-trainable params: 0 (0.00 B)
 1302/1302 ━━━━━━━━━━━━━━━━━━━━ 1701s 1s/step - accuracy: 0.8168 - loss: 1.4819 - val_accuracy: 0.8650 - val_loss: 0.8129

<keras.src.callbacks.history.History at 0x7efdd7ee6a50>

解码测试句子(定性分析)

最后,让我们演示如何翻译全新的英文句子。我们只需将标记化的英文句子以及目标标记 "[START]" 馈送到模型中。模型输出下一个标记的概率。然后我们重复生成以迄今生成的标记为条件的下一个标记,直到我们遇到标记 "[END]"

对于解码,我们将使用 KerasHub 中的 keras_hub.samplers 模块。贪婪解码是一种文本解码方法,它在每个时间步输出最可能的下一个标记,即概率最高的标记。

def decode_sequences(input_sentences):
    batch_size = 1

    # Tokenize the encoder input.
    encoder_input_tokens = ops.convert_to_tensor(eng_tokenizer(input_sentences))
    if len(encoder_input_tokens[0]) < MAX_SEQUENCE_LENGTH:
        pads = ops.full((1, MAX_SEQUENCE_LENGTH - len(encoder_input_tokens[0])), 0)
        encoder_input_tokens = ops.concatenate(
            [encoder_input_tokens.to_tensor(), pads], 1
        )

    # Define a function that outputs the next token's probability given the
    # input sequence.
    def next(prompt, cache, index):
        logits = transformer([encoder_input_tokens, prompt])[:, index - 1, :]
        # Ignore hidden states for now; only needed for contrastive search.
        hidden_states = None
        return logits, hidden_states, cache

    # Build a prompt of length 40 with a start token and padding tokens.
    length = 40
    start = ops.full((batch_size, 1), spa_tokenizer.token_to_id("[START]"))
    pad = ops.full((batch_size, length - 1), spa_tokenizer.token_to_id("[PAD]"))
    prompt = ops.concatenate((start, pad), axis=-1)

    generated_tokens = keras_hub.samplers.GreedySampler()(
        next,
        prompt,
        stop_token_ids=[spa_tokenizer.token_to_id("[END]")],
        index=1,  # Start sampling after start token.
    )
    generated_sentences = spa_tokenizer.detokenize(generated_tokens)
    return generated_sentences


test_eng_texts = [pair[0] for pair in test_pairs]
for i in range(2):
    input_sentence = random.choice(test_eng_texts)
    translated = decode_sequences([input_sentence])
    translated = translated.numpy()[0].decode("utf-8")
    translated = (
        translated.replace("[PAD]", "")
        .replace("[START]", "")
        .replace("[END]", "")
        .strip()
    )
    print(f"** Example {i} **")
    print(input_sentence)
    print(translated)
    print()
WARNING: All log messages before absl::InitializeLog() is called are written to STDERR
I0000 00:00:1714519073.816969   34774 device_compiler.h:186] Compiled cluster using XLA!  This line is logged at most once for the lifetime of the process.

** Example 0 **
i got the ticket free of charge.
me pregunto la comprome .
** Example 1 **
i think maybe that's all you have to do.
creo que tom le dije que hacer eso .

评估我们的模型(定量分析)

文本生成任务中使用了许多指标。在这里,为了评估我们的模型生成的翻译,让我们计算 ROUGE-1 和 ROUGE-2 分数。从本质上讲,ROUGE-N 是一个基于参考文本和生成文本之间共同 n 元组数量的分数。ROUGE-1 和 ROUGE-2 分别使用共同一元组和二元组的数量。

我们将对 30 个测试样本计算分数(因为解码是一个代价很高的过程)。

rouge_1 = keras_hub.metrics.RougeN(order=1)
rouge_2 = keras_hub.metrics.RougeN(order=2)

for test_pair in test_pairs[:30]:
    input_sentence = test_pair[0]
    reference_sentence = test_pair[1]

    translated_sentence = decode_sequences([input_sentence])
    translated_sentence = translated_sentence.numpy()[0].decode("utf-8")
    translated_sentence = (
        translated_sentence.replace("[PAD]", "")
        .replace("[START]", "")
        .replace("[END]", "")
        .strip()
    )

    rouge_1(reference_sentence, translated_sentence)
    rouge_2(reference_sentence, translated_sentence)

print("ROUGE-1 Score: ", rouge_1.result())
print("ROUGE-2 Score: ", rouge_2.result())
ROUGE-1 Score:  {'precision': <tf.Tensor: shape=(), dtype=float32, numpy=0.30989552>, 'recall': <tf.Tensor: shape=(), dtype=float32, numpy=0.37136248>, 'f1_score': <tf.Tensor: shape=(), dtype=float32, numpy=0.33032653>}
ROUGE-2 Score:  {'precision': <tf.Tensor: shape=(), dtype=float32, numpy=0.08999339>, 'recall': <tf.Tensor: shape=(), dtype=float32, numpy=0.09524643>, 'f1_score': <tf.Tensor: shape=(), dtype=float32, numpy=0.08855649>}

经过 10 个 epoch 后,分数如下所示

ROUGE-1 ROUGE-2
精确率 0.568 0.374
召回率 0.615 0.394
F1 分数 0.579 0.381