Overview
🤗 Accelerate is a library that enables the same PyTorch code to be run across any distributed configuration by adding just four lines of code! In short, training and inference at scale made simple, efficient and adaptable.
Demo
# + 代表使用accelerate的增加语句;- 代表去掉 + from accelerate import Accelerator from transformers import AdamW, AutoModelForSequenceClassification, get_scheduler + accelerator = Accelerator() model = AutoModelForSequenceClassification.from_pretrained(checkpoint, num_labels=2) optimizer = AdamW(model.parameters(), lr=3e-5) - device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu") - model.to(device) + train_dataloader, eval_dataloader, model, optimizer = accelerator.prepare( + train_dataloader, eval_dataloader, model, optimizer + ) num_epochs = 3 num_training_steps = num_epochs * len(train_dataloader) lr_scheduler = get_scheduler( "linear", optimizer=optimizer, num_warmup_steps=0, num_training_steps=num_training_steps ) progress_bar = tqdm(range(num_training_steps)) model.train() for epoch in range(num_epochs): for batch in train_dataloader: - batch = {k: v.to(device) for k, v in batch.items()} outputs = model(**batch) loss = outputs.loss - loss.backward() + accelerator.backward(loss) optimizer.step() lr_scheduler.step() optimizer.zero_grad() progress_bar.update(1)
如果简单来说,就是添加了一个
accelerate
来控制分布式训练,其中了loss的backward变成了accelerate.backward(loss)
。
Installation & Configuration
安装和配置参考官网即可,其中配置的过程是需要在终端Terminal上通过回答一系列问题,然后自动生成一个名为default_config
的yaml文件,并保存在根目录.catch/huggingface/accelerate
目录下。
配置完成之后可以使用accelerate env [--config_file] [config_file_name]
来验证配置文件是否是Valid。
默认配置文件内容:
- `Accelerate` version: 0.11.0.dev0
- Platform: Linux-5.10.0-15-cloud-amd64-x86_64-with-debian-11.3
- Python version: 3.7.12
- Numpy version: 1.19.5
- PyTorch version (GPU?): 1.12.0+cu102 (True)
- `Accelerate` default config:
- compute_environment: LOCAL_MACHINE
- distributed_type: MULTI_GPU
- mixed_precision: no
- use_cpu: False
- num_processes: 2
- machine_rank: 0
- num_machines: 1
- main_process_ip: None
- main_process_port: None
- main_training_function: main
- deepspeed_config: {}
- fsdp_config: {}
下面就是主要的四种command:
- accelerate-config:通过在终端中回答一系列问题生成配置文件
- accelerate-env:验证配置文件的合法性
- accelerate-launch:运行自己的python文件(主要)
- accelerate-test:运行accelerate默认的神经网络模型来测试环境是否可以。
Quicktour
Main use
-
首先是先导入accelerate的包:
from accelerate import Accelerator accelerator = Accelerator()
这一个配置需要写在整个training script的前面,因为这是对于distributed training十分重要。
-
如果原先的代码中有
.to(device)
或.cuda()
,那么就去掉,accelerator是可以自动处理的。如果非要使用.to(device)
,那么就需要用accelerator.device
来代替。如果想要完全手动配置device的情况,可以在初始化模型的时候,仅仅需要传入
device_placement=False
参数即可。如果在初始化的时候使用了自动化配置,后面还想用到
.to(device)
或.cuda()
的话,也是可以的(就跟原来的使用方法一样),只不过此时的方法是根据通过prepare()方法之后的model来获取;或者通过Accelerator的属性来获取。 -
将所有与训练有关的object(包括optimizer、model、dataloader、scheduler)传入
prepare()
方法中。需要说明的有以下几点:
- 在训练过程中dataloader是共享在GPUs/TPUs cores上,也就是说,每一个device是获得了整个dataloader中的不相同的一部分。而且 The random states of all processes will be synchronized at the beginning of each iteration through your dataloader, to make sure the data is shuffled the same way (if you decided to use
shuffle=True
or any kind of random sampler).【也就是说如果在pytorch的Dataloader中设置shuffle=True
或设置其他的random sample,那么在每一次迭代的时候,所有进程的随机状态会以相同的shuffled形式进行同步】。 - 实际处理的batch_size大小实际上是在script中设置的batch_size * 设备数量(这其实与pytorch的 DDP分布式训练方法一样)。举个例子,比如在script中设置的batch_size是16,其中有四块GPU可以使用,那么此时整个训练过程中共有16 * 4 个batch参与训练,但是还是以大小为16的batch参与训练的,只不过有16 * 4个batch数据被加入到内存中来。
- 当然,如果在初始化accelerate的过程中,给定参数
split_batches=True
就可以保证相同整个训练过程中的总得batch size大小是script中设置的。对于这个参数,官网是这样定义的:split_batches (bool
, optional, defaults toFalse
) — Whether or not the accelerator should split the batches yielded by the dataloaders across the devices. IfTrue
the actual batch size used will be the same on any kind of distributed processes, but it must be a round multiple of thenum_processes
you are using. IfFalse
, actual batch size used will be the one set in your script multiplied by the number of processes.
- 当然,如果在初始化accelerate的过程中,给定参数
- 如果想要记录或者查看数据集的情况,要在
prepare()
方法之后。- 当然,
prepare()
方法也是可以在不同的需求下使用的,比如在验证数据集上。如果想要分布式验证(distrubuted evaluation)的话,可以把valuation dataloader也加入到prepare()
方法中来。
- 当然,
- 在训练过程中dataloader是共享在GPUs/TPUs cores上,也就是说,每一个device是获得了整个dataloader中的不相同的一部分。而且 The random states of all processes will be synchronized at the beginning of each iteration through your dataloader, to make sure the data is shuffled the same way (if you decided to use
-
用accelerate.backward(loss)代替loss.backward()即可。
至此,就可以依托于accelerate来通过使用不同的分布式训练工具(比如pytorch的torchrun或者accelerate的launch等)来实现训练了。
Distributed evaluation
这里主要讲一讲如何分布式评估在验证集上的效果。方法也很简单,只需要单独将validation的dataloader传入prepare()
方法中即可:
validation_dataloader = accelerator.prepare(validation_dataloader)
因为是分布式训练,每一个device只会看到数据集中的一部分,因此需要平均最后的结果,也就是说需要汇总所有的结果然后求一个均值。这里就可以使用 gather_for_metrics()方法:
for inputs, targets in validation_dataloader:
predictions = model(inputs)
# Gather all predictions and targets
all_predictions, all_targets = accelerator.gather_for_metrics((predictions, targets))
# Example of use with a *Datasets.Metric*
metric.add_batch(all_predictions, all_targets)
这里还有几点需要强调:
- 在实际过程中,数据可能在分发的时候,会出现重复的现象(Some data at the end of the dataset may be duplicated so the batch can be divided equally among all workers. )。为此 gather_for_metrics()方法可以在汇总结果的时候,自动将重复的数据结果删除。
- 当然,这里还有手动的办法不去删除重复的结果,即用 gather() 方法,我们可以看到两者的不同点:
-
gather_for_metrics():Gathers
tensor
and potentially drops duplicates in the last batch if on a distributed system. Should be used for gathering the inputs and targets for metric calculation. - gather():The gathered tensor(s). Note that the first dimension of the result is num_processes multiplied by the first dimension of the input tensors. Gather the values in tensor across all processes and concatenate them on the first dimension. Useful to regroup the predictions from all processes when doing evaluation. Note: This gather happens in all processes.
- 在处理NLP任务中需要注意:gather() 和 gather_for_metrics() 方法都是需要处理相同tensor,即在每一个进程中size of tensor都必须是相同的。 如果在padding的过程中是这样动态策略来padding的——取一个min-batch中最长的作为整个batch中的长度,那么就需要调用pad_across_processes() 方法来padding所有的tensor达到所有tensor中的统一长度。
Launching your distributed script
分布式训练的方法就不赘述了,训练方法很简单,需要详细了解的是一些参数:Check out the Launch tutorial for more information about launching your scripts.
在Launch tutorial中主要有下面的三个部分内容:
-
Using accelerate launch
-
If you are familiar with launching scripts in PyTorch yourself such as with
torchrun
, you can still do this. It is not required to useaccelerate launch
. -
默认的运行时需要用到配置文件的,但是也是可以不使用配置文件的形式来运行,我们可以通过命令行的形式来运行,比如多GPU训练的demo:
accelerate launch --multi_gpu {script_name.py} {--arg1} {--arg2} ...
所有的CLI参数说明都在这里了
-
-
Why you should always use
accelerate config
-
Custom Configurations
其他问题
如何只在某一进程中处理
针对某些任务(在单服务上),比如下载数据或者记录日志等需要在某一个进程中进行即可。那么此时就可以用下面的方法:
if accelerator.is_local_main_process:
# Is executed once per server(Once on a single server)
如果是使用了tqdm
库的话,也是需要特殊处理的:
from tqdm.auto import tqdm
progress_bar = tqdm(range(args.max_train_steps), disable=not accelerator.is_local_main_process)
其中,如果对于某些任务(在多服务上),比如上传模型到model hub中,那么就需要用下面的方法:
if accelerator.is_main_process:
# Is executed once only(Only ever once across all servers)
is_local_main_process和is_main_process就跟Pytorch的分布式训练中的LOCAL_RANK和RANK的区别,这里就不展开讨论了。
简单说就是,如果是单物理主机,那么就用 is_local_main_process 方法就够用的了。
还有一个可能会用到的小trick就是:For printing statements you only want executed once per machine, you can just replace the print function by accelerator.print.(也就是仅主进程为0的进程才输出)。
保存或加载模型
保存训练好的模型需要分两步完成:
-
使用
accelerator.wait_for_everyone()
方法; -
使用
accelerator.unwrap_model(model)
方法:总结来说如下:
accelerator.wait_for_everyone() unwrapped_model = accelerator.unwrap_model(model) accelerator.save(unwrapped_model.state_dict(), filename)
那么这个unwrap_model到底在干嘛呢,我查看了他们的源码,其中有下面这一段:
def extract_model_from_parallel(model): """ Extract a model from its distributed containers. Args: model (`torch.nn.Module`): The model to extract. Returns: `torch.nn.Module`: The extracted model. """ options = (torch.nn.parallel.DistributedDataParallel, torch.nn.DataParallel) if is_deepspeed_available(): options += (DeepSpeedEngine,) while isinstance(model, options): model = model.module return model
考虑一个简单的情况,如果我们没有使用DeepSpeed的方法,那么就是执行while循环的那一句,也就是
model=model.module
;熟悉Pytorch DDP的都知道,分布式训练的时候,模型的保存总是在key中多一个
module
字段(具体的原因也不清楚)。因此这一步就是将这个module
字段给去掉,要不然对于不熟悉DDP的同学来说,在将权重load进模型的时候就会出现字段不匹配的情况。
模型加载也是一样的处理方法,这里就直接放代码了:
unwrapped_model = accelerator.unwrap_model(model) # 如果是先调用了prepare()方法的话,这一步必须加
unwrapped_model.load_state_dict(torch.load(filename))
⚠️:如果你是使用prepare()
方法之后的话,那么模型加载权重的时候是需要用accelerator.unwrap_model
方法的。其他情况下,问题不大。
保存或加载训练的整个状态
这里说的整个训练过程状态是指保存/加载 训练模型过程中的model、optimizer、random generators和LR schedulers。详情参考文档吧,一般情况也不常用到(PS:主要是懒)
同步等待
我们知道,在pytoch中,有torch.distributed.barrier
这个方法来实现多卡训练的进程等待。那么应用在什么地方呢?在比如我们想要在单进程中进行预测或者验证,那么其他的进程就必须要等待执行任务的进程结束之后,才可以进行下一轮迭代,那么为了实现这个等待,就需要写下面的代码:
accelerator.wait_for_everyone()
这个方法跟torch.distributed.barrier
的含义是一样的,都是需要所有的进程执行了这句话之后才可以进行下一步。
这里我遇到了一个大问题,就是比如我们想要做单一设备上的预测,不通过分布式的方法来预测,那么通过
accelerator.wait_for_everyone()
或者torch.distributed.barrier
两个方法时,就会出现所有进程卡住,虽然此时的GPU占用率是100%,但是没有任何程序在正常运行,不过在我找了好多博客之后,终于在Github上发现了类似的问题Using torch.distributed.barrier() makes the whole code hang。这里的方法简单来说,就是将原来的model(**input)用model.module(**input)来代替即可。至于为什么咱也不清楚😭。
同样还有其他方法——使用
no_grad()
方法:# validate the model if gpu==0 : with torch.no_grad(): model.eval() for data, target in valid_loader: if torch.cuda.is_available: data, target = data.cuda(), target.cuda() output = model(data) loss = criterion(output, target) valid_loss += loss.item()*data.size(0)
那么知道了这个之后,我们就可以用
accelerator.unwrap_model(model)
方法就可以完成正常的barrier了。
梯度裁剪
在Pytorch中使用到的梯度裁剪方法是torch.nn.utils.clip_grad_norm_
or torch.nn.utils.clip_grad_value_
,那么这里就对应使用 clipgrad_norm() and clipgrad_value() 两个方法就可以了。
其实跟用原生的Pytorch方法也差不多,这里以torch.nn.utils.clip_grad_norm_
为例,可以看到其源码的实现也就是用了torch.nn.utils.clip_grad_norm_
:
def clip_grad_norm_(self, parameters, max_norm, norm_type=2):
"""
Should be used in place of `torch.nn.utils.clip_grad_norm_`.
Example:
```python
>>> from accelerate import Accelerator
>>> accelerator = Accelerator(gradient_accumulation_steps=2)
>>> dataloader, model, optimizer, scheduler = accelerator.prepare(dataloader, model, optimizer, scheduler)
>>> for (input, target) in dataloader:
... optimizer.zero_grad()
... output = model(input)
... loss = loss_func(output, target)
... accelerator.backward(loss)
... if accelerator.sync_gradients:
... accelerator.clip_grad_norm_(model.parameters(), max_grad_norm)
... optimizer.step()
```
"""
if self.distributed_type == DistributedType.FSDP:
self.unscale_gradients()
parameters = [p for p in parameters]
for model in self._models:
if parameters == [p for p in model.parameters()]:
model.clip_grad_norm_(max_norm, norm_type)
return
elif self.distributed_type == DistributedType.DEEPSPEED:
# `accelerator.backward(loss)` is doing that automatically. Therefore, it's implementation is not needed
return
self.unscale_gradients()
torch.nn.utils.clip_grad_norm_(parameters, max_norm, norm_type=norm_type)
混合精度训练
混合精度训练在 🤗 Accelerate框架的加持下使用起来也是非常简单的:
with accelerator.autocast():
loss = complex_loss_function(outputs, target):
这里还有一个情况需要说明:混合精度训练的时候,在训练的开始或者其他的不确定的时候,会发生gradient skip。这是因为 “because of the dynamic loss scaling strategy, there are points during training where the gradients have overflown, and the loss scaling factor is reduced to avoid this happening again at the next step.”
如果发生了这一情况,那么就需要手动更新LR scheduler。一般情况下不更新也是可以的,但是如果训练集很小或者说模型对scheduler的初始化的LR很敏感,那么就需要手动更新LR scheduler了:
if not accelerator.optimizer_step_was_skipped:
lr_scheduler.step()
梯度累计
使用 accumulate()方法指定 gradient_accumulation_steps
参数即可实现梯度累计。
accelerator = Accelerator(gradient_accumulation_steps=2)
model, optimizer, training_dataloader = accelerator.prepare(model, optimizer, training_dataloader)
for input, label in training_dataloader:
with accelerator.accumulate(model):
predictions = model(input)
loss = loss_function(predictions, label)
accelerator.backward(loss)
optimizer.step()
scheduler.step()
optimizer.zero_grad()
随机种子设定
使用 utils.set_seed()方法固定种子
from accelerate import set_seed
set_seed(42)
内部机制
以上的内容就足够用于日常的炼丹了,但是这里还是想简单得聊聊Accelerate到底从头到尾做了什么呢?
-
首先Accelerate会先去分析给定的参数情况,所有的信息都被存储在了AcceleratorState中。
-
然后就是调用prepare()方法,该方法会做下面的三件事情:
-
wraps your model(s) in the container adapted for the distributed setup;
-
wraps your optimizer(s) in a AcceleratedOptimizer
-
creates a new version of your dataloader(s) in a DataLoaderShard.
当model和optimizer被wrap的时候,dataloader会重新构建。这是因为Pytorch的问题,因为Pytorch要根据
num_processes
来确定新的batch_size的大小是多少。这里随便举一个Pytorch的使用便可以知道上面在说什么:
# 分布式数据集 train_sampler = DistributedSampler(train_dataset) train_loader = torch.utils.data.DataLoader(train_dataset, sampler=train_sampler, batch_size=batch_size) # 注意这里的batch_size是每个GPU上batch_size # 分布式模型 model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.local_rank], output_device=args.local_rank, find_unused_parameters=True)
-