DistributedEmbedding
类keras_rs.layers.DistributedEmbedding(
feature_configs: Union[
keras_rs.src.layers.embedding.distributed_embedding_config.FeatureConfig,
tensorflow.python.tpu.tpu_embedding_v2_utils.FeatureConfig,
Sequence[
Union[
keras_rs.src.layers.embedding.distributed_embedding_config.FeatureConfig,
tensorflow.python.tpu.tpu_embedding_v2_utils.FeatureConfig,
ForwardRef("Nested[T]"),
]
],
Mapping[
str,
Union[
keras_rs.src.layers.embedding.distributed_embedding_config.FeatureConfig,
tensorflow.python.tpu.tpu_embedding_v2_utils.FeatureConfig,
ForwardRef("Nested[T]"),
],
],
],
table_stacking: Union[str, Sequence[str], Sequence[Sequence[str]]] = "auto",
**kwargs: Any
)
DistributedEmbedding,用于加速大型嵌入查找的层。
DistributedEmbedding
处于预览阶段。DistributedEmbedding
是一个针对具有 SparseCore 的 TPU 芯片优化的层,可以显著提高嵌入查找和嵌入训练的速度。它通过将多个查找组合成一次调用,并将嵌入表分片到可用的芯片上工作。请注意,只有当嵌入表足够大需要分片(因为它们不适合单个芯片)时,才会看到性能提升。更多详细信息请参见下面的“放置”部分。
在其他硬件(GPU、CPU 和不带 SparseCore 的 TPU)上,DistributedEmbedding
提供相同的 API,没有任何特定的加速。除通过 keras.distribution.set_distribution
设置的分发方案外,不应用任何特定的分发方案。
DistributedEmbedding
嵌入输入序列,并通过应用可配置的组合器函数将其缩减为单个嵌入。
DistributedEmbedding
嵌入层通过一组 keras_rs.layers.FeatureConfig
对象进行配置,这些对象本身引用 keras_rs.layers.TableConfig
对象。
TableConfig
定义了一个嵌入表,包含词汇大小、嵌入维度等参数,以及用于缩减的组合器和用于训练的优化器。FeatureConfig
定义了 DistributedEmbedding
将处理哪些输入特征以及使用哪个嵌入表。请注意,多个特征可以使用同一个嵌入表。table1 = keras_rs.layers.TableConfig(
name="table1",
vocabulary_size=TABLE1_VOCABULARY_SIZE,
embedding_dim=TABLE1_EMBEDDING_SIZE,
placement="auto",
)
table2 = keras_rs.layers.TableConfig(
name="table2",
vocabulary_size=TABLE2_VOCABULARY_SIZE,
embedding_dim=TABLE2_EMBEDDING_SIZE,
placement="auto",
)
feature1 = keras_rs.layers.FeatureConfig(
name="feature1",
table=table1,
input_shape=(PER_REPLICA_BATCH_SIZE,),
output_shape=(PER_REPLICA_BATCH_SIZE, TABLE1_EMBEDDING_SIZE),
)
feature2 = keras_rs.layers.FeatureConfig(
name="feature2",
table=table2,
input_shape=(PER_REPLICA_BATCH_SIZE,),
output_shape=(PER_REPLICA_BATCH_SIZE, TABLE2_EMBEDDING_SIZE),
)
feature_configs = {
"feature1": feature1,
"feature2": feature2,
}
embedding = keras_rs.layers.DistributedEmbedding(feature_configs)
DistributedEmbedding
中的每个嵌入表都有自己的用于训练的优化器,它独立于通过 model.compile()
在模型上设置的优化器。
注意,并非所有优化器都受支持。目前,所有后端和加速器都支持以下优化器:
使用 TensorFlow 后端时,还提供以下优化器:
此外,并非所有优化器的参数都受支持(例如 SGD
的 nesterov
选项)。使用不受支持的优化器或不受支持的优化器参数时会引发错误。
DistributedEmbedding
中的每个嵌入表都可以放置在 SparseCore 芯片上,或者放置在加速器的默认设备上(例如 TPU 上 Tensor Cores 的 HBM)。这由 keras_rs.layers.TableConfig
的 placement
属性控制。
"sparsecore"
表示表应放置在 SparseCore 芯片上。如果选择此选项但没有 SparseCore 芯片,则会引发错误。"default_device"
表示表不应放置在 SparseCore 上,即使可用。相反,表放置在模型通常运行的设备上,即 TPU 和 GPU 的 HBM 上。在这种情况下,如果适用,表将使用通过 keras.distribution.set_distribution
设置的方案进行分发。在没有 SparseCore 的 GPU、CPU 和 TPU 上,这是唯一可用的放置选项,也是 "auto"
选择的选项。"auto"
表示如果 SparseCore 可用则使用 "sparsecore"
,否则使用 "default_device"
。这是未指定时的默认值。优化 TPU 性能
"sparsecore"
放置。"default_device"
,并且通常应通过使用 keras.distribution.DataParallel
分发选项跨 TPU 复制。除了 tf.Tensor
外,DistributedEmbedding
还接受 tf.RaggedTensor
和 tf.SparseTensor
作为嵌入查找的输入。Ragged 张量必须在索引为 1 的维度上是 ragged 的。请注意,如果传递了权重,则每个权重张量必须与该特定特征的输入属于同一类,并且对于 ragged 张量使用完全相同的 ragged 行长度,对于 sparse 张量使用相同的索引。DistributedEmbedding
的所有输出都是密集张量。
要在带有 TensorFlow 的 TPU 上使用 DistributedEmbedding
,必须使用 tf.distribute.TPUStrategy
。DistributedEmbedding
层必须在 TPUStrategy
下创建。
resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu="local")
topology = tf.tpu.experimental.initialize_tpu_system(resolver)
device_assignment = tf.tpu.experimental.DeviceAssignment.build(
topology, num_replicas=resolver.get_tpu_system_metadata().num_cores
)
strategy = tf.distribute.TPUStrategy(
resolver, experimental_device_assignment=device_assignment
)
with strategy.scope():
embedding = keras_rs.layers.DistributedEmbedding(feature_configs)
要使用 Keras 的 model.fit()
,必须在 TPUStrategy
下编译模型。然后,可以直接调用 model.fit()
、model.evaluate()
或 model.predict()
。Keras 模型负责使用 strategy 运行模型并自动分发数据集。
with strategy.scope():
embedding = keras_rs.layers.DistributedEmbedding(feature_configs)
model = create_model(embedding)
model.compile(loss=keras.losses.MeanSquaredError(), optimizer="adam")
model.fit(dataset, epochs=10)
DistributedEmbedding
必须通过嵌套在 tf.function
中的 strategy.run
调用来调用。
@tf.function
def embedding_wrapper(tf_fn_inputs, tf_fn_weights=None):
def strategy_fn(st_fn_inputs, st_fn_weights):
return embedding(st_fn_inputs, st_fn_weights)
return strategy.run(strategy_fn, args=(tf_fn_inputs, tf_fn_weights)))
embedding_wrapper(my_inputs, my_weights)
使用数据集时,数据集必须是分布式的。然后可以将迭代器传递给使用 strategy.run
的 tf.function
。
dataset = strategy.experimental_distribute_dataset(dataset)
@tf.function
def run_loop(iterator):
def step(data):
(inputs, weights), labels = data
with tf.GradientTape() as tape:
result = embedding(inputs, weights)
loss = keras.losses.mean_squared_error(labels, result)
tape.gradient(loss, embedding.trainable_variables)
return result
for _ in tf.range(4):
result = strategy.run(step, args=(next(iterator),))
run_loop(iter(dataset))
参数
keras_rs.layers.FeatureConfig
的嵌套结构。None
表示不进行表堆叠。"auto"
表示自动堆叠表。表名称列表或表名称列表的列表表示将内部列表中的表堆叠在一起。请注意,较旧的 TPU 不支持表堆叠,在这种情况下,默认值 "auto"
将被解释为不进行表堆叠。call
方法DistributedEmbedding.call(
inputs: Union[
Any,
Sequence[Union[Any, ForwardRef("Nested[T]")]],
Mapping[str, Union[Any, ForwardRef("Nested[T]")]],
],
weights: Union[
Any,
Sequence[Union[Any, ForwardRef("Nested[T]")]],
Mapping[str, Union[Any, ForwardRef("Nested[T]")]],
NoneType,
] = None,
training: bool = False,
)
在嵌入表中查找特征并应用缩减。
参数
feature_configs
相同。或者,可以包含已经预处理过的输入(参见 preprocess
)。inputs
相同,并且形状必须匹配。返回
密集 2D 张量的嵌套结构,这些张量是来自传入特征的缩减后的嵌入。结构与 inputs
相同。
preprocess
方法DistributedEmbedding.preprocess(
inputs: Union[
Any,
Sequence[Union[Any, ForwardRef("Nested[T]")]],
Mapping[str, Union[Any, ForwardRef("Nested[T]")]],
],
weights: Union[
Any,
Sequence[Union[Any, ForwardRef("Nested[T]")]],
Mapping[str, Union[Any, ForwardRef("Nested[T]")]],
NoneType,
] = None,
training: bool = False,
)
预处理和重新格式化数据,以便模型使用。
仅当使用 JAX 后端且 jit_compile = True
以启用 sparsecore
放置时,才需要显式调用 preprocess
。对于所有其他情况和后端,显式使用 preprocess
是可选的。
在 JAX 中,sparsecore 的使用需要根据可用硬件属性进行特殊格式化的数据。当前此数据重新格式化不支持 jit-compilation,因此必须在将数据馈送到模型中之前应用。
示例用法如下:
# Create the embedding layer.
embedding_layer = DistributedEmbedding(feature_configs)
# Add preprocessing to a data input pipeline.
def training_dataset_generator():
for (inputs, weights), labels in iter(training_dataset):
yield embedding_layer.preprocess(
inputs, weights, training=True
), labels
preprocessed_training_dataset = training_dataset_generate()
# Construct, compile, and fit the model using the preprocessed data.
model = keras.Sequential(
[
embedding_layer,
keras.layers.Dense(2),
keras.layers.Dense(3),
keras.layers.Dense(4),
]
)
model.compile(optimizer="adam", loss="mse", jit_compile=True)
model.fit(preprocessed_training_dataset, epochs=10)
对于非 JAX 后端,预处理会将输入和权重捆绑在一起,并按设备放置分离输入。此步骤是完全可选的。
参数
返回
一组预处理后的输入,可以直接馈送到层的 inputs
参数中。