教你掌握分布式訓練PyTorch?DDP到Accelerate到Trainer
概述
本教程假定你已經(jīng)對于 PyToch 訓練一個簡單模型有一定的基礎理解。本教程將展示使用 3 種封裝層級不同的方法調(diào)用 DDP (DistributedDataParallel) 進程,在多個 GPU 上訓練同一個模型:
- 使用
pytorch.distributed模塊的原生 PyTorch DDP 模塊 - 使用 ?? Accelerate 對
pytorch.distributed的輕量封裝,確保程序可以在不修改代碼或者少量修改代碼的情況下在單個 GPU 或 TPU 下正常運行 - 使用 ?? Transformer 的高級 Trainer API ,該 API 抽象封裝了所有代碼模板并且支持不同設備和分布式場景。
什么是分布式訓練,為什么它很重要?
下面是一些非?;A的 PyTorch 訓練代碼,它基于 Pytorch 官方在 MNIST 上創(chuàng)建和訓練模型的示例。
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
class BasicNet(nn.Module):
def __init__(self):
super().__init__()
self.conv1 = nn.Conv2d(1, 32, 3, 1)
self.conv2 = nn.Conv2d(32, 64, 3, 1)
self.dropout1 = nn.Dropout(0.25)
self.dropout2 = nn.Dropout(0.5)
self.fc1 = nn.Linear(9216, 128)
self.fc2 = nn.Linear(128, 10)
self.act = F.relu
def forward(self, x):
x = self.act(self.conv1(x))
x = self.act(self.conv2(x))
x = F.max_pool2d(x, 2)
x = self.dropout1(x)
x = torch.flatten(x, 1)
x = self.act(self.fc1(x))
x = self.dropout2(x)
x = self.fc2(x)
output = F.log_softmax(x, dim=1)
return output
我們定義訓練設備 (cuda):
device = "cuda"
構建一些基本的 PyTorch DataLoaders:
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307), (0.3081))
])
train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)
test_dset = datasets.MNIST('data', train=False, transform=transform)
train_loader = torch.utils.data.DataLoader(train_dset, shuffle=True, batch_size=64)
test_loader = torch.utils.data.DataLoader(test_dset, shuffle=False, batch_size=64)
把模型放入 CUDA 設備:
model = BasicNet().to(device)
構建 PyTorch optimizer (優(yōu)化器)
optimizer = optim.AdamW(model.parameters(), lr=1e-3)
最終創(chuàng)建一個簡單的訓練和評估循環(huán),訓練循環(huán)會使用全部訓練數(shù)據(jù)集進行訓練,評估循環(huán)會計算訓練后模型在測試數(shù)據(jù)集上的準確度:
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
optimizer.zero_grad()
model.eval()
correct = 0
with torch.no_grad():
for data, target in test_loader:
output = model(data)
pred = output.argmax(dim=1, keepdim=True)
correct += pred.eq(target.view_as(pred)).sum().item()
print(f'Accuracy: {100. * correct / len(test_loader.dataset)}')
通常從這里開始,就可以將所有的代碼放入 Python 腳本或在 Jupyter Notebook 上運行它。
然而,只執(zhí)行 python myscript.py 只會使用單個 GPU 運行腳本。如果有多個 GPU 資源可用,您將如何讓這個腳本在兩個 GPU 或多臺機器上運行,通過分布式訓練提高訓練速度?這是 torch.distributed 發(fā)揮作用的地方。
PyTorch 分布式數(shù)據(jù)并行
顧名思義,torch.distributed 旨在配置分布式訓練。你可以使用它配置多個節(jié)點進行訓練,例如:多機器下的單個 GPU,或者單臺機器下的多個 GPU,或者兩者的任意組合。
為了將上述代碼轉換為分布式訓練,必須首先定義一些設置配置,具體細節(jié)請參閱 DDP 使用教程
首先必須聲明 setup 和 cleanup 函數(shù)。這將創(chuàng)建一個進程組,并且所有計算進程都可以通過這個進程組通信。
注意:在本教程的這一部分中,假定這些代碼是在 Python 腳本文件中啟動。稍后將討論使用 ?? Accelerate 的啟動器,就不必聲明 setup 和 cleanup 函數(shù)了
import os
import torch.distributed as dist
def setup(rank, world_size):
"Sets up the process group and configuration for PyTorch Distributed Data Parallelism"
os.environ["MASTER_ADDR"] = 'localhost'
os.environ["MASTER_PORT"] = "12355"
# Initialize the process group
dist.init_process_group("gloo", rank=rank, world_size=world_size)
def cleanup():
"Cleans up the distributed environment"
dist.destroy_process_group()
最后一個疑問是,我怎樣把我的數(shù)據(jù)和模型發(fā)送到另一個 GPU 上?
這正是 DistributedDataParallel 模塊發(fā)揮作用的地方, 它將您的模型復制到每個 GPU 上 ,并且當 loss.backward() 被調(diào)用進行反向傳播的時候,所有這些模型副本的梯度將被同步地平均/下降 (reduce)。這確保每個設備在執(zhí)行優(yōu)化器步驟后具有相同的權重。
下面是我們的訓練設置示例,我們使用了 DistributedDataParallel 重構了訓練函數(shù):
注意:此處的 rank 是當前 GPU 與所有其他可用 GPU 相比的總體 rank,這意味著它們的 rank 為 0 -> n-1
from torch.nn.parallel import DistributedDataParallel as DDP
def train(model, rank, world_size):
setup(rank, world_size)
model = model.to(rank)
ddp_model = DDP(model, device_ids=[rank])
optimizer = optim.AdamW(ddp_model.parameters(), lr=1e-3)
# Train for one epoch
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
optimizer.zero_grad()
cleanup()
在上述的代碼中需要為每個副本設備上的模型 (因此在這里是ddp_model的參數(shù)而不是 model 的參數(shù)) 聲明優(yōu)化器,以便正確計算每個副本設備上的梯度。
最后,要運行腳本,PyTorch 有一個方便的 torchrun 命令行模塊可以提供幫助。只需傳入它應該使用的節(jié)點數(shù)以及要運行的腳本即可:
torchrun --nproc_per_nodes=2 --nnodes=1 example_script.py
上面的代碼可以在在一臺機器上的兩個 GPU 上運行訓練腳本,這是使用 PyTorch 只進行分布式訓練的情況 (不可以在單機單卡上運行)。
現(xiàn)在讓我們談談 ?? Accelerate,一個旨在使并行化更加無縫并有助于一些最佳實踐的庫。
?? Accelerate
?? Accelerate 是一個庫,旨在無需大幅修改代碼的情況下完成并行化。除此之外,?? Accelerate 附帶的數(shù)據(jù) pipeline 還可以提高代碼的性能。
首先,讓我們將剛剛執(zhí)行的所有上述代碼封裝到一個函數(shù)中,以幫助我們直觀地看到差異:
def train_ddp(rank, world_size):
setup(rank, world_size)
# Build DataLoaders
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307), (0.3081))
])
train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)
test_dset = datasets.MNIST('data', train=False, transform=transform)
train_loader = torch.utils.data.DataLoader(train_dset, shuffle=True, batch_size=64)
test_loader = torch.utils.data.DataLoader(test_dset, shuffle=False, batch_size=64)
# Build model
model = model.to(rank)
ddp_model = DDP(model, device_ids=[rank])
# Build optimizer
optimizer = optim.AdamW(ddp_model.parameters(), lr=1e-3)
# Train for a single epoch
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
optimizer.zero_grad()
# Evaluate
model.eval()
correct = 0
with torch.no_grad():
for data, target in test_loader:
data, target = data.to(device), target.to(device)
output = model(data)
pred = output.argmax(dim=1, keepdim=True)
correct += pred.eq(target.view_as(pred)).sum().item()
print(f'Accuracy: {100. * correct / len(test_loader.dataset)}')
接下來讓我們談談 ?? Accelerate 如何便利地實現(xiàn)并行化的。上面的代碼有幾個問題:
- 該代碼有點低效,因為每個設備都會創(chuàng)建一個
dataloader。 - 這些代碼只能運行在多 GPU 下,當想讓這個代碼運行在單個 GPU 或 TPU 時,還需要額外進行一些修改。
?? Accelerate 通過 Accelerator 類解決上述問題。通過它,不論是單節(jié)點還是多節(jié)點,除了三行代碼外,其余代碼幾乎保持不變,如下所示:
def train_ddp_accelerate():
accelerator = Accelerator()
# Build DataLoaders
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307), (0.3081))
])
train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)
test_dset = datasets.MNIST('data', train=False, transform=transform)
train_loader = torch.utils.data.DataLoader(train_dset, shuffle=True, batch_size=64)
test_loader = torch.utils.data.DataLoader(test_dset, shuffle=False, batch_size=64)
# Build model
model = BasicModel()
# Build optimizer
optimizer = optim.AdamW(model.parameters(), lr=1e-3)
# Send everything through `accelerator.prepare`
train_loader, test_loader, model, optimizer = accelerator.prepare(
train_loader, test_loader, model, optimizer
)
# Train for a single epoch
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
output = model(data)
loss = F.nll_loss(output, target)
accelerator.backward(loss)
optimizer.step()
optimizer.zero_grad()
# Evaluate
model.eval()
correct = 0
with torch.no_grad():
for data, target in test_loader:
data, target = data.to(device), target.to(device)
output = model(data)
pred = output.argmax(dim=1, keepdim=True)
correct += pred.eq(target.view_as(pred)).sum().item()
print(f'Accuracy: {100. * correct / len(test_loader.dataset)}')
借助 Accelerator 對象,您的 PyTorch 訓練循環(huán)現(xiàn)在已配置為可以在任何分布式情況運行。使用 Accelerator 改造后的代碼仍然可以通過 torchrun CLI 或通過 ?? Accelerate 自己的 CLI 界面啟動(啟動你的?? Accelerate 腳本)。
因此,現(xiàn)在可以盡可能保持 PyTorch 原生代碼不變的前提下,使用 ?? Accelerate 執(zhí)行分布式訓練。
早些時候有人提到 ?? Accelerate 還可以使 DataLoaders 更高效。這是通過自定義采樣器實現(xiàn)的,它可以在訓練期間自動將部分批次發(fā)送到不同的設備,從而允許每個設備只需要儲存數(shù)據(jù)的一部分,而不是一次將數(shù)據(jù)復制四份存入內(nèi)存,具體取決于配置。因此,內(nèi)存總量中只有原始數(shù)據(jù)集的一個完整副本。該數(shù)據(jù)集 會拆分后分配到各個訓練節(jié)點上,從而允許在單個實例上訓練更大的數(shù)據(jù)集,而不會使內(nèi)存爆炸
使用 notebook_launcher
之前提到您可以直接從 Jupyter Notebook 運行分布式代碼。這來自 ?? Accelerate 的 notebook_launcher 模塊,它可以在 Jupyter Notebook 內(nèi)部的代碼啟動多 GPU 訓練。
使用它就像導入 launcher 一樣簡單:
from accelerate import notebook_launcher
接著傳遞我們之前聲明的訓練函數(shù)、要傳遞的任何參數(shù)以及要使用的進程數(shù)(例如 TPU 上的 8 個,或兩個 GPU 上的 2 個)。下面兩個訓練函數(shù)都可以運行,但請注意,啟動單次啟動后,實例需要重新啟動才能產(chǎn)生另一個:
notebook_launcher(train_ddp, args=(), num_processes=2)
或者:
notebook_launcher(train_accelerate_ddp, args=(), num_processes=2)
使用 ?? Trainer
終于我們來到了最高級的 API——Hugging Face Trainer.
它涵蓋了盡可能多的訓練類型,同時仍然能夠在分布式系統(tǒng)上進行訓練,用戶根本不需要做任何事情。
首先我們需要導入 ?? Trainer:
from transformers import Trainer
然后我們定義一些 TrainingArguments 來控制所有常用的超參數(shù)。?? Trainer 需要的訓練數(shù)據(jù)是字典類型的,因此需要制作自定義整理功能。
最后,我們將訓練器子類化并編寫我們自己的 compute_loss.
之后,這段代碼也可以分布式運行,而無需修改任何訓練代碼!
from transformers import Trainer, TrainingArguments
model = BasicNet()
training_args = TrainingArguments(
"basic-trainer",
per_device_train_batch_size=64,
per_device_eval_batch_size=64,
num_train_epochs=1,
evaluation_strategy="epoch",
remove_unused_columns=False
)
def collate_fn(examples):
pixel_values = torch.stack([example[0] for example in examples])
labels = torch.tensor([example[1] for example in examples])
return {"x":pixel_values, "labels":labels}
class MyTrainer(Trainer):
def compute_loss(self, model, inputs, return_outputs=False):
outputs = model(inputs["x"])
target = inputs["labels"]
loss = F.nll_loss(outputs, target)
return (loss, outputs) if return_outputs else loss
trainer = MyTrainer(
model,
training_args,
train_dataset=train_dset,
eval_dataset=test_dset,
data_collator=collate_fn,
)
trainer.train()
***** Running training ***** Num examples = 60000 Num Epochs = 1 Instantaneous batch size per device = 64 Total train batch size (w. parallel, distributed & accumulation) = 64 Gradient Accumulation steps = 1 Total optimization steps = 938
| Epoch | 訓練損失 | 驗證損失 |
|---|---|---|
| 1 | 0.875700 | 0.282633 |
與上面的 notebook_launcher 示例類似,也可以將這個過程封裝成一個訓練函數(shù):
def train_trainer_ddp():
model = BasicNet()
training_args = TrainingArguments(
"basic-trainer",
per_device_train_batch_size=64,
per_device_eval_batch_size=64,
num_train_epochs=1,
evaluation_strategy="epoch",
remove_unused_columns=False
)
def collate_fn(examples):
pixel_values = torch.stack([example[0] for example in examples])
labels = torch.tensor([example[1] for example in examples])
return {"x":pixel_values, "labels":labels}
class MyTrainer(Trainer):
def compute_loss(self, model, inputs, return_outputs=False):
outputs = model(inputs["x"])
target = inputs["labels"]
loss = F.nll_loss(outputs, target)
return (loss, outputs) if return_outputs else loss
trainer = MyTrainer(
model,
training_args,
train_dataset=train_dset,
eval_dataset=test_dset,
data_collator=collate_fn,
)
trainer.train()
notebook_launcher(train_trainer_ddp, args=(), num_processes=2)
相關資源
- 要了解有關 PyTorch 分布式數(shù)據(jù)并行性的更多信息,請查看: pytorch.org/docs/stable…
- 要了解有關 ?? Accelerate 的更多信息,請查看: hf.co/docs/accele…
- 要了解有關 ?? Transformer 的更多信息,請查看: hf.co/docs/transf…
原文作者:Zachary Mueller
譯者:innovation64 (李洋)
審校:yaoqi (胡耀淇)
排版:zhongdongy (阿東)
以上就是教你掌握分布式訓練PyTorch DDP到Accelerate到Trainer的詳細內(nèi)容,更多關于分布式訓練PyTorch DDP Accelerate Trainer的資料請關注腳本之家其它相關文章!
相關文章
python 實現(xiàn)創(chuàng)建文件夾和創(chuàng)建日志文件的方法
這篇文章主要介紹了python 實現(xiàn)創(chuàng)建文件夾和創(chuàng)建日志文件的方法,文中給大家介紹了python 讀寫創(chuàng)建文件文件夾的方法 ,需要的朋友可以參考下2019-07-07
詳解如何用OpenCV + Python 實現(xiàn)人臉識別
這篇文章主要介紹了詳解如何用OpenCV + Python 實現(xiàn)人臉識別,非常具有實用價值,需要的朋友可以參考下2017-10-10
Python根據(jù)指定日期計算后n天,前n天是哪一天的方法
這篇文章主要介紹了Python根據(jù)指定日期計算后n天,前n天是哪一天的方法,涉及Python日期與時間計算相關操作技巧,需要的朋友可以參考下2018-05-05
深入理解python中pytest.ini的配置方法和參數(shù)
Pytest 是 Python 測試框架中最流行的一個,而 pytest.ini 文件則是 pytest 配置文件的核心,在本文中,將詳細介紹 pytest.ini 文件的配置方法和可能的參數(shù),幫助您更好地掌握 Pytest 的使用,需要的朋友可以參考下2024-10-10
Python __getattr__與__setattr__使用方法
__getattr__和__setattr__可以用來對屬性的設置和取值進行處理2008-09-09
Python中使用Queue和Condition進行線程同步的方法
這篇文章主要介紹了Python中使用Queue模塊和Condition對象進行線程同步的方法,配合threading模塊下的線程編程進行操作的實例,需要的朋友可以參考下2016-01-01

