Navigation

    Gpushare.com

    • Register
    • Login
    • Search
    • Popular
    • Categories
    • Recent
    • Tags

    paddle版本accelerate

    技术交流
    1
    1
    59
    Loading More Posts
    • Oldest to Newest
    • Newest to Oldest
    • Most Votes
    Reply
    • Reply as topic
    Log in to reply
    This topic has been deleted. Only users with topic management privileges can see it.
    • 183****0229
      183****0229 last edited by 183****0229

      个人尝试实现了下paddle版本的accelerate。(功能不完善,但是差不多可以多GPU训练)
      链接:https://github.com/JunnYu/paddle_accelerate

      依赖:

      • easydict
      • paddlenlp

      使用(FP16下使用BERT微调RTE任务):

      代码

      run.py

      import logging
      import random
      
      import numpy as np
      import paddle
      import paddle.nn as nn
      from easydict import EasyDict as Config
      from paddle.metric import Accuracy
      from paddle.optimizer import AdamW
      from paddlenlp.transformers import BertForSequenceClassification, BertTokenizer
      
      from paddle_accelerate import Accelerator
      from utils import get_dev_dataloader, get_train_dataloader
      
      logger = logging.getLogger(__name__)
      
      
      def set_seed(args):
          random.seed(args.seed)
          np.random.seed(args.seed)
          paddle.seed(args.seed)
      
      
      def main(args):
          logging.basicConfig(
              format="%(asctime)s - %(levelname)s - %(name)s - %(message)s",
              datefmt="%m/%d/%Y %H:%M:%S",
              level=logging.INFO,
              handlers=[
                  logging.FileHandler(
                      "run.log",
                      mode="w",
                      encoding="utf-8",
                  )
              ],
          )
          accelerator = Accelerator(fp16=args.fp16)
          logger.info(accelerator.state)
          set_seed(args)
          tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
          model = BertForSequenceClassification.from_pretrained("bert-base-uncased")
          optimizer = AdamW(
              learning_rate=3e-5,
              beta1=0.9,
              beta2=0.999,
              parameters=model.parameters(),
              weight_decay=0.01,
          )
          train_dataloader = get_train_dataloader(tokenizer, args)
          dev_dataloader = get_dev_dataloader(tokenizer, args)
          model, optimizer = accelerator.prepare(model, optimizer)
      
          loss_fct = nn.CrossEntropyLoss()
          global_step = 0
          metric = Accuracy()
      
          for epoch in range(args.num_train_epochs):
              for batch in train_dataloader:
                  model.train()
                  input_ids, segment_ids, labels = batch
                  logits = model(input_ids, segment_ids)
                  loss = loss_fct(logits, labels)
                  accelerator.backward(loss)
                  optimizer.step()
                  optimizer.zero_grad()
                  global_step += 1
      
                  if global_step % args.logging_steps == 0:
                      accelerator.print(f"loss: {loss.item()}")
      
                  if global_step % args.save_steps == 0:
                      model.eval()
                      metric.reset()
                      with paddle.no_grad():
                          for batch in dev_dataloader:
                              input_ids, segment_ids, labels = batch
                              logits = model(input_ids, segment_ids)
                              correct = metric.compute(
                                  accelerator.gather(logits), accelerator.gather(labels)
                              )
                              metric.update(correct)
                      res = metric.accumulate()
                      accelerator.print(f"epoch {epoch}: acc = ", res)
                      accelerator.print("=" * 50)
                      if args.output_dir is not None:
                          accelerator.wait_for_everyone()
                          if accelerator.is_local_main_process:
                              unwrapped_model = accelerator.unwrap_model(model)
                              unwrapped_model.save_pretrained(args.output_dir)
                              tokenizer.save_pretrained(args.output_dir)
                              accelerator.print(f"save into {args.output_dir}")
      
      
      if __name__ == "__main__":
          args = Config(
              task_name="rte",
              num_train_epochs=20,
              train_batch_size=8,
              eval_batch_size=32,
              num_workers=0,
              is_test=False,
              max_seq_length=128,
              fp16=True,
              logging_steps=10,
              save_steps=50,
              seed=42,
              output_dir="outputs",
          )
          main(args)
      

      utils.py

      import json
      import os
      import pickle
      import random
      from functools import partial
      
      import numpy as np
      import paddle
      from paddle.io import DataLoader, DistributedBatchSampler
      from paddlenlp.data import Pad, Stack, Tuple
      from paddlenlp.datasets import load_dataset
      from paddlenlp.transformers import (
          CosineDecayWithWarmup,
          LinearDecayWithWarmup,
          PolyDecayWithWarmup,
      )
      
      scheduler_type2cls = {
          "linear": LinearDecayWithWarmup,
          "cosine": CosineDecayWithWarmup,
          "poly": PolyDecayWithWarmup,
      }
      
      
      def set_seed(args):
          random.seed(args.seed)
          np.random.seed(args.seed)
          paddle.seed(args.seed)
      
      
      def get_writer(args):
          if args.writer_type == "visualdl":
              from visualdl import LogWriter
      
              writer = LogWriter(logdir=args.logdir)
          elif args.writer_type == "tensorboard":
              from tensorboardX import SummaryWriter
      
              writer = SummaryWriter(logdir=args.logdir)
          else:
              raise ValueError("writer_type must be in ['visualdl', 'tensorboard']")
          return writer
      
      
      def get_scheduler(
          learning_rate,
          scheduler_type,
          num_warmup_steps=None,
          num_training_steps=None,
          **scheduler_kwargs,
      ):
          if scheduler_type not in scheduler_type2cls.keys():
              data = " ".join(scheduler_type2cls.keys())
              raise ValueError(f"scheduler_type must be choson from {data}")
      
          if num_warmup_steps is None:
              raise ValueError(f"requires `num_warmup_steps`, please provide that argument.")
      
          if num_training_steps is None:
              raise ValueError(
                  f"requires `num_training_steps`, please provide that argument."
              )
      
          return scheduler_type2cls[scheduler_type](
              learning_rate=learning_rate,
              total_steps=num_training_steps,
              warmup=num_warmup_steps,
              **scheduler_kwargs,
          )
      
      
      def save_json(data, file_name):
          with open(file_name, "w", encoding="utf-8") as w:
              w.write(json.dumps(data, ensure_ascii=False, indent=4) + "\n")
      
      
      def save_pickle(data, file_path):
          with open(str(file_path), "wb") as f:
              pickle.dump(data, f)
      
      
      def load_pickle(input_file):
          with open(str(input_file), "rb") as f:
              data = pickle.load(f)
          return data
      
      
      def trans_func(example, tokenizer, label_list, max_seq_length=512, is_test=False):
          """convert a glue example into necessary features"""
          if not is_test:
              # `label_list == None` is for regression task
              label_dtype = "int64" if label_list else "float32"
              # Get the label
              label = example["labels"]
              label = np.array([label], dtype=label_dtype)
          # Convert raw text to feature
          if (int(is_test) + len(example)) == 2:
              example = tokenizer(example["sentence"], max_seq_len=max_seq_length)
          else:
              example = tokenizer(
                  example["sentence1"],
                  text_pair=example["sentence2"],
                  max_seq_len=max_seq_length,
              )
      
          if not is_test:
              return example["input_ids"], example["token_type_ids"], label
          else:
              return example["input_ids"], example["token_type_ids"]
      
      
      def get_train_dataloader(tokenizer, args):
          filename = os.path.join("caches", args.task_name + "_train" + ".pkl")
      
          if os.path.exists(filename):
              ds = load_pickle(filename)
          else:
              ds = load_dataset("glue", args.task_name, splits="train")
      
              ds.map(
                  partial(
                      trans_func,
                      tokenizer=tokenizer,
                      label_list=ds.label_list,
                      max_seq_length=args.max_seq_length,
                  ),
                  batched=False,
                  lazy=False,
              )
              save_pickle(ds, filename)
      
          batch_sampler = DistributedBatchSampler(
              ds, batch_size=args.train_batch_size, shuffle=True
          )
      
          batchify_fn = lambda samples, fn=Tuple(
              Pad(axis=0, pad_val=tokenizer.pad_token_id),  # input
              Pad(axis=0, pad_val=tokenizer.pad_token_type_id),  # segment
              Stack(dtype="int64" if ds.label_list else "float32"),  # label
          ): fn(samples)
      
          data_loader = DataLoader(
              dataset=ds,
              batch_sampler=batch_sampler,
              collate_fn=batchify_fn,
              num_workers=args.num_workers,
              return_list=True,
          )
      
          return data_loader
      
      
      def get_dev_dataloader(tokenizer, args):
          filename = os.path.join("caches", args.task_name + "_dev" + ".pkl")
      
          if os.path.exists(filename):
              ds = load_pickle(filename)
          else:
              ds = load_dataset("glue", args.task_name, splits="dev")
              ds.map(
                  partial(
                      trans_func,
                      tokenizer=tokenizer,
                      label_list=ds.label_list,
                      max_seq_length=args.max_seq_length,
                  ),
                  batched=False,
                  lazy=False,
              )
              save_pickle(ds, filename)
      
          batch_sampler = DistributedBatchSampler(
              ds, batch_size=args.eval_batch_size, shuffle=False
          )
      
          batchify_fn = lambda samples, fn=Tuple(
              Pad(axis=0, pad_val=tokenizer.pad_token_id),  # input
              Pad(axis=0, pad_val=tokenizer.pad_token_type_id),  # segment
              Stack(dtype="int64" if ds.label_list else "float32"),  # label
          ): fn(samples)
      
          data_loader = DataLoader(
              dataset=ds,
              batch_sampler=batch_sampler,
              collate_fn=batchify_fn,
              num_workers=args.num_workers,
              return_list=True,
          )
      
          return data_loader
      

      运行

      export LOCAL_RANK=0
      python -m paddle.distributed.launch --gpus "0,1" train.py
      
      1 Reply Last reply Reply Quote 1
      • First post
        Last post