作者: fchollet
创建日期 2023/06/29
最后修改日期 2023/06/29
描述: 使用 PyTorch 为 Keras 模型进行多 GPU 训练的指南。
通常有两种方法可以将计算分布到多个设备上
数据并行,其中单个模型被复制到多个设备或多个机器上。每个设备处理不同的数据批次,然后合并它们的结果。这种设置有很多变体,它们在不同的模型副本如何合并结果、它们是否在每个批次都保持同步、或者它们是否更加松散耦合等方面有所不同。
模型并行,其中单个模型的不同部分在不同的设备上运行,共同处理一批数据。这对于具有自然并行架构的模型效果最好,例如具有多个分支的模型。
本指南侧重于数据并行,特别是同步数据并行,其中模型的不同副本在处理完每个批次后保持同步。同步性使得模型收敛行为与您在单设备训练中看到的一致。
具体来说,本指南教您如何使用 PyTorch 的 DistributedDataParallel
模块包装器在单个机器(单主机,多设备训练)上安装的多个 GPU(通常是 2 到 16 个)上训练 Keras,只需对代码进行最少的更改。这是研究人员和小规模工业工作流程中最常见的设置。
让我们首先定义创建我们将要训练的模型的函数,以及创建我们将要训练的数据集(在本例中为 MNIST)的函数。
import os
os.environ["KERAS_BACKEND"] = "torch"
import torch
import numpy as np
import keras
def get_model():
# Make a simple convnet with batch normalization and dropout.
inputs = keras.Input(shape=(28, 28, 1))
x = keras.layers.Rescaling(1.0 / 255.0)(inputs)
x = keras.layers.Conv2D(filters=12, kernel_size=3, padding="same", use_bias=False)(
x
)
x = keras.layers.BatchNormalization(scale=False, center=True)(x)
x = keras.layers.ReLU()(x)
x = keras.layers.Conv2D(
filters=24,
kernel_size=6,
use_bias=False,
strides=2,
)(x)
x = keras.layers.BatchNormalization(scale=False, center=True)(x)
x = keras.layers.ReLU()(x)
x = keras.layers.Conv2D(
filters=32,
kernel_size=6,
padding="same",
strides=2,
name="large_k",
)(x)
x = keras.layers.BatchNormalization(scale=False, center=True)(x)
x = keras.layers.ReLU()(x)
x = keras.layers.GlobalAveragePooling2D()(x)
x = keras.layers.Dense(256, activation="relu")(x)
x = keras.layers.Dropout(0.5)(x)
outputs = keras.layers.Dense(10)(x)
model = keras.Model(inputs, outputs)
return model
def get_dataset():
# Load the data and split it between train and test sets
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
# Scale images to the [0, 1] range
x_train = x_train.astype("float32")
x_test = x_test.astype("float32")
# Make sure images have shape (28, 28, 1)
x_train = np.expand_dims(x_train, -1)
x_test = np.expand_dims(x_test, -1)
print("x_train shape:", x_train.shape)
# Create a TensorDataset
dataset = torch.utils.data.TensorDataset(
torch.from_numpy(x_train), torch.from_numpy(y_train)
)
return dataset
接下来,让我们定义一个简单的 PyTorch 训练循环,该循环针对 GPU(注意对 .cuda()
的调用)。
def train_model(model, dataloader, num_epochs, optimizer, loss_fn):
for epoch in range(num_epochs):
running_loss = 0.0
running_loss_count = 0
for batch_idx, (inputs, targets) in enumerate(dataloader):
inputs = inputs.cuda(non_blocking=True)
targets = targets.cuda(non_blocking=True)
# Forward pass
outputs = model(inputs)
loss = loss_fn(outputs, targets)
# Backward and optimize
optimizer.zero_grad()
loss.backward()
optimizer.step()
running_loss += loss.item()
running_loss_count += 1
# Print loss statistics
print(
f"Epoch {epoch + 1}/{num_epochs}, "
f"Loss: {running_loss / running_loss_count}"
)
在这种设置中,您有一台机器,上面有多个 GPU(通常是 2 到 16 个)。每个设备将运行您的模型的副本(称为副本)。为简单起见,在下文中,我们将假设我们正在处理 8 个 GPU,这不会影响普遍性。
它是如何工作的
在训练的每一步
在实践中,同步更新模型副本权重的过程是在每个单独的权重变量级别处理的。这是通过一个镜像变量对象完成的。
如何使用它
要使用 Keras 模型进行单主机、多设备同步训练,您可以使用 torch.nn.parallel.DistributedDataParallel
模块包装器。以下是它的工作方式
torch.multiprocessing.start_processes
来启动多个 Python 进程,每个设备一个进程。每个进程将运行 per_device_launch_fn
函数。per_device_launch_fn
函数执行以下操作: - 它使用 torch.distributed.init_process_group
和 torch.cuda.set_device
来配置该进程要使用的设备。 - 它使用 torch.utils.data.distributed.DistributedSampler
和 torch.utils.data.DataLoader
将我们的数据转换为分布式数据加载器。 - 它还使用 torch.nn.parallel.DistributedDataParallel
将我们的模型转换为分布式 PyTorch 模块。 - 然后它调用 train_model
函数。train_model
函数将在每个进程中运行,模型在每个进程中使用单独的设备。这是流程,其中每个步骤都拆分为自己的实用程序函数
# Config
num_gpu = torch.cuda.device_count()
num_epochs = 2
batch_size = 64
print(f"Running on {num_gpu} GPUs")
def setup_device(current_gpu_index, num_gpus):
# Device setup
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "56492"
device = torch.device("cuda:{}".format(current_gpu_index))
torch.distributed.init_process_group(
backend="nccl",
init_method="env://",
world_size=num_gpus,
rank=current_gpu_index,
)
torch.cuda.set_device(device)
def cleanup():
torch.distributed.destroy_process_group()
def prepare_dataloader(dataset, current_gpu_index, num_gpus, batch_size):
sampler = torch.utils.data.distributed.DistributedSampler(
dataset,
num_replicas=num_gpus,
rank=current_gpu_index,
shuffle=False,
)
dataloader = torch.utils.data.DataLoader(
dataset,
sampler=sampler,
batch_size=batch_size,
shuffle=False,
)
return dataloader
def per_device_launch_fn(current_gpu_index, num_gpu):
# Setup the process groups
setup_device(current_gpu_index, num_gpu)
dataset = get_dataset()
model = get_model()
# prepare the dataloader
dataloader = prepare_dataloader(dataset, current_gpu_index, num_gpu, batch_size)
# Instantiate the torch optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
# Instantiate the torch loss function
loss_fn = torch.nn.CrossEntropyLoss()
# Put model on device
model = model.to(current_gpu_index)
ddp_model = torch.nn.parallel.DistributedDataParallel(
model, device_ids=[current_gpu_index], output_device=current_gpu_index
)
train_model(ddp_model, dataloader, num_epochs, optimizer, loss_fn)
cleanup()
Running on 0 GPUs
/opt/conda/envs/keras-torch/lib/python3.10/site-packages/torch/cuda/__init__.py:611: UserWarning: Can't initialize NVML
warnings.warn("Can't initialize NVML")
开始多个进程的时间到了
if __name__ == "__main__":
# We use the "fork" method rather than "spawn" to support notebooks
torch.multiprocessing.start_processes(
per_device_launch_fn,
args=(num_gpu,),
nprocs=num_gpu,
join=True,
start_method="fork",
)
就是这样!