paddle版本accelerate
-
个人尝试实现了下paddle版本的accelerate。(功能不完善,但是差不多可以多GPU训练)
链接:https://github.com/JunnYu/paddle_accelerate依赖:
- easydict
- paddlenlp
使用(FP16下使用BERT微调RTE任务):
代码
run.py
import logging import random import numpy as np import paddle import paddle.nn as nn from easydict import EasyDict as Config from paddle.metric import Accuracy from paddle.optimizer import AdamW from paddlenlp.transformers import BertForSequenceClassification, BertTokenizer from paddle_accelerate import Accelerator from utils import get_dev_dataloader, get_train_dataloader logger = logging.getLogger(__name__) def set_seed(args): random.seed(args.seed) np.random.seed(args.seed) paddle.seed(args.seed) def main(args): logging.basicConfig( format="%(asctime)s - %(levelname)s - %(name)s - %(message)s", datefmt="%m/%d/%Y %H:%M:%S", level=logging.INFO, handlers=[ logging.FileHandler( "run.log", mode="w", encoding="utf-8", ) ], ) accelerator = Accelerator(fp16=args.fp16) logger.info(accelerator.state) set_seed(args) tokenizer = BertTokenizer.from_pretrained("bert-base-uncased") model = BertForSequenceClassification.from_pretrained("bert-base-uncased") optimizer = AdamW( learning_rate=3e-5, beta1=0.9, beta2=0.999, parameters=model.parameters(), weight_decay=0.01, ) train_dataloader = get_train_dataloader(tokenizer, args) dev_dataloader = get_dev_dataloader(tokenizer, args) model, optimizer = accelerator.prepare(model, optimizer) loss_fct = nn.CrossEntropyLoss() global_step = 0 metric = Accuracy() for epoch in range(args.num_train_epochs): for batch in train_dataloader: model.train() input_ids, segment_ids, labels = batch logits = model(input_ids, segment_ids) loss = loss_fct(logits, labels) accelerator.backward(loss) optimizer.step() optimizer.zero_grad() global_step += 1 if global_step % args.logging_steps == 0: accelerator.print(f"loss: {loss.item()}") if global_step % args.save_steps == 0: model.eval() metric.reset() with paddle.no_grad(): for batch in dev_dataloader: input_ids, segment_ids, labels = batch logits = model(input_ids, segment_ids) correct = metric.compute( accelerator.gather(logits), accelerator.gather(labels) ) metric.update(correct) res = metric.accumulate() accelerator.print(f"epoch {epoch}: acc = ", res) accelerator.print("=" * 50) if args.output_dir is not None: accelerator.wait_for_everyone() if accelerator.is_local_main_process: unwrapped_model = accelerator.unwrap_model(model) unwrapped_model.save_pretrained(args.output_dir) tokenizer.save_pretrained(args.output_dir) accelerator.print(f"save into {args.output_dir}") if __name__ == "__main__": args = Config( task_name="rte", num_train_epochs=20, train_batch_size=8, eval_batch_size=32, num_workers=0, is_test=False, max_seq_length=128, fp16=True, logging_steps=10, save_steps=50, seed=42, output_dir="outputs", ) main(args)
utils.py
import json import os import pickle import random from functools import partial import numpy as np import paddle from paddle.io import DataLoader, DistributedBatchSampler from paddlenlp.data import Pad, Stack, Tuple from paddlenlp.datasets import load_dataset from paddlenlp.transformers import ( CosineDecayWithWarmup, LinearDecayWithWarmup, PolyDecayWithWarmup, ) scheduler_type2cls = { "linear": LinearDecayWithWarmup, "cosine": CosineDecayWithWarmup, "poly": PolyDecayWithWarmup, } def set_seed(args): random.seed(args.seed) np.random.seed(args.seed) paddle.seed(args.seed) def get_writer(args): if args.writer_type == "visualdl": from visualdl import LogWriter writer = LogWriter(logdir=args.logdir) elif args.writer_type == "tensorboard": from tensorboardX import SummaryWriter writer = SummaryWriter(logdir=args.logdir) else: raise ValueError("writer_type must be in ['visualdl', 'tensorboard']") return writer def get_scheduler( learning_rate, scheduler_type, num_warmup_steps=None, num_training_steps=None, **scheduler_kwargs, ): if scheduler_type not in scheduler_type2cls.keys(): data = " ".join(scheduler_type2cls.keys()) raise ValueError(f"scheduler_type must be choson from {data}") if num_warmup_steps is None: raise ValueError(f"requires `num_warmup_steps`, please provide that argument.") if num_training_steps is None: raise ValueError( f"requires `num_training_steps`, please provide that argument." ) return scheduler_type2cls[scheduler_type]( learning_rate=learning_rate, total_steps=num_training_steps, warmup=num_warmup_steps, **scheduler_kwargs, ) def save_json(data, file_name): with open(file_name, "w", encoding="utf-8") as w: w.write(json.dumps(data, ensure_ascii=False, indent=4) + "\n") def save_pickle(data, file_path): with open(str(file_path), "wb") as f: pickle.dump(data, f) def load_pickle(input_file): with open(str(input_file), "rb") as f: data = pickle.load(f) return data def trans_func(example, tokenizer, label_list, max_seq_length=512, is_test=False): """convert a glue example into necessary features""" if not is_test: # `label_list == None` is for regression task label_dtype = "int64" if label_list else "float32" # Get the label label = example["labels"] label = np.array([label], dtype=label_dtype) # Convert raw text to feature if (int(is_test) + len(example)) == 2: example = tokenizer(example["sentence"], max_seq_len=max_seq_length) else: example = tokenizer( example["sentence1"], text_pair=example["sentence2"], max_seq_len=max_seq_length, ) if not is_test: return example["input_ids"], example["token_type_ids"], label else: return example["input_ids"], example["token_type_ids"] def get_train_dataloader(tokenizer, args): filename = os.path.join("caches", args.task_name + "_train" + ".pkl") if os.path.exists(filename): ds = load_pickle(filename) else: ds = load_dataset("glue", args.task_name, splits="train") ds.map( partial( trans_func, tokenizer=tokenizer, label_list=ds.label_list, max_seq_length=args.max_seq_length, ), batched=False, lazy=False, ) save_pickle(ds, filename) batch_sampler = DistributedBatchSampler( ds, batch_size=args.train_batch_size, shuffle=True ) batchify_fn = lambda samples, fn=Tuple( Pad(axis=0, pad_val=tokenizer.pad_token_id), # input Pad(axis=0, pad_val=tokenizer.pad_token_type_id), # segment Stack(dtype="int64" if ds.label_list else "float32"), # label ): fn(samples) data_loader = DataLoader( dataset=ds, batch_sampler=batch_sampler, collate_fn=batchify_fn, num_workers=args.num_workers, return_list=True, ) return data_loader def get_dev_dataloader(tokenizer, args): filename = os.path.join("caches", args.task_name + "_dev" + ".pkl") if os.path.exists(filename): ds = load_pickle(filename) else: ds = load_dataset("glue", args.task_name, splits="dev") ds.map( partial( trans_func, tokenizer=tokenizer, label_list=ds.label_list, max_seq_length=args.max_seq_length, ), batched=False, lazy=False, ) save_pickle(ds, filename) batch_sampler = DistributedBatchSampler( ds, batch_size=args.eval_batch_size, shuffle=False ) batchify_fn = lambda samples, fn=Tuple( Pad(axis=0, pad_val=tokenizer.pad_token_id), # input Pad(axis=0, pad_val=tokenizer.pad_token_type_id), # segment Stack(dtype="int64" if ds.label_list else "float32"), # label ): fn(samples) data_loader = DataLoader( dataset=ds, batch_sampler=batch_sampler, collate_fn=batchify_fn, num_workers=args.num_workers, return_list=True, ) return data_loader
运行
export LOCAL_RANK=0 python -m paddle.distributed.launch --gpus "0,1" train.py