Navigation

    Gpushare.com

    • Register
    • Login
    • Search
    • Popular
    • Categories
    • Recent
    • Tags

    Pytorch数据加载预处理——DALI加速

    技术分享📚有奖励
    2
    3
    400
    Loading More Posts
    • Oldest to Newest
    • Newest to Oldest
    • Most Votes
    Reply
    • Reply as topic
    Log in to reply
    This topic has been deleted. Only users with topic management privileges can see it.
    • 189****7698
      189****7698 last edited by 189****7698

      承蒙可爱又热情的客服琳琳酱邀请,我小帅酱又来分享一篇干货啦。上次分享过一篇基于torchvision的DataLoader加速文章。但是受限于CPU计算速度瓶颈,基于torchvision的DadaLoder无法进一步有效地加速。
      那么,有没有直接把在CPU上预处理的过程搬到速度更快吞吐量更大的GPU上的方法呢?答案是,有!英伟达DALI加速技术可以实现你高效科研的梦想,它可以把复杂的数据预处理搬到GPU上进行,就连数据加载都可以实现CPU和GPU混合加速加载。经过一周多反复研读官方说明文档以及亲身实践,我这次分享一篇目前最新最容易学会的DALI加速文章,以供大家参考(目前大家能搜到的大部分DALI加速代码要么太老,要么有很多bug)。下面我以图像分类为例,其实DALI可支持各种数据域,包括图像、视频、音频和体积数据等,所以像图像分类、目标检测、语音识别这些都是可以实现加速的,可自行查阅资料实现。
      DALI加速——代码
      平时我们都是用torch.utils.data.DataLoader加载和预处理图像,然后将CPU上的tensor送进GPU进行训练和测试,DALI就是构造一个新的DataLoader,速度比原生pytorch快很多。
      我们先看torch.utils.data.DataLoader,这里为了表示更一般的情况,我加入了一个自定义的CUTOUT用于数据增强。

      import torch
      import numpy as np
      from torchvision import transforms, datasets
      from torch.utils.data import DataLoader
      
      
      class CUTOUT(object):
          """Randomly mask out one or more patches from an image.
          Args:
              n_holes (int): Number of patches to cut out of each image.
              length (int): The length (in pixels) of each square patch.
          """
      
          def __init__(self, n_holes, length):
              self.n_holes = n_holes
              self.length = length
      
          def __call__(self, img):
              """
              Args:
                  img (Tensor): Tensor image of size (C, H, W).
              Returns:
                  Tensor: Image with n_holes of dimension length x length cut out of it.
              """
              h = img.size(1)
              w = img.size(2)
      
              mask = np.ones((h, w), np.float32)
      
              for n in range(self.n_holes):
                  y = np.random.randint(h)
                  x = np.random.randint(w)
                  y1 = np.clip(y - self.length // 2, 0, h)
                  y2 = np.clip(y + self.length // 2, 0, h)
                  x1 = np.clip(x - self.length // 2, 0, w)
                  x2 = np.clip(x + self.length // 2, 0, w)
                  mask[y1: y2, x1: x2] = 0.
              mask = torch.from_numpy(mask)
              mask = mask.expand_as(img)
              img = img * mask
      
              return img
      
      
      # 数据预处理
      transform_train = transforms.Compose([
          transforms.Resize(int(img_size*1.2)),
          transforms.RandomRotation(20),
          transforms.RandomCrop(img_size),
          transforms.RandomHorizontalFlip(),
          transforms.ColorJitter(
              brightness=0.5,
              contrast=0.5,
              saturation=0.5,
              hue=0.3
          ),
          transforms.ToTensor(),
          transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5]),
          CUTOUT(n_holes, length),
      ])
      transform_test = transforms.Compose([
          transforms.Resize(img_size),
          transforms.ToTensor(),
          transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])
      ])
      
      # 获取数据
      train = datasets.ImageFolder(data_root_train, transform=transform_train)
      test = datasets.ImageFolder(data_root_test, transform=transform_test)
      
      # 将数据封装成迭代器
      train_loader = DataLoader(
          dataset=train,
          batch_size=batch_size,
          shuffle=True,
          pin_memory=True,
          num_workers=num_workers,
          persistent_workers=True
      )
      test_loader = DataLoader(
          dataset=test,
          batch_size=batch_size,
          shuffle=False,
          pin_memory=True,
          num_workers=num_workers,
          persistent_workers=True
      )
      

      再看下DALI使用GPU预处理的dataloader代码,在这里我们需要注意一个地方,DALI可以选择纯CPU加载和预处理,或者CPU&GPU混合加载+GPU预处理。如果使用GPU预处理,则需要把下面的np换成GPU版本的cupy (pip install cupy-cuda***,***为cuda版本)。但是我发现使用自定义处理的类之后(即CUTOUT),使用GPU的速度还不如CPU快,原因还没搞清楚。

      import numpy as np
      import cupy as cp
      from nvidia.dali.pipeline import Pipeline
      import nvidia.dali.ops as ops
      from nvidia.dali.plugin.pytorch import DALIClassificationIterator
      from nvidia.dali.plugin.base_iterator import LastBatchPolicy
      
      
      class CUTOUT(object):
          """
          Randomly mask out one or more patches from an image.
          """
          """
          Args:
              n_holes (int): Number of patches to cut out of each image.
              length (int): The length (in pixels) of each square patch.
              img: image of size (C, H, W), dtype=float32.
          Returns:
              Image with n_holes of dimension length x length cut out of it, dtype=float32.
          P.S. If you set device='gpu', you need to replace numpy as cupy (pip install cupy-cuda***)
          """
          def __init__(self, n_holes, length):
              self.n_holes = n_holes
              self.length = length
      
          def __call__(self, imgs):
              c, h, w = imgs.shape
              mask = np.ones((h, w), np.float32)
              for n in range(self.n_holes):
                  y = np.random.randint(h)
                  x = np.random.randint(w)
                  # 防止mask超过图像范围
                  y1 = np.clip(y - self.length // 2, 0, h)
                  y2 = np.clip(y + self.length // 2, 0, h)
                  x1 = np.clip(x - self.length // 2, 0, w)
                  x2 = np.clip(x + self.length // 2, 0, w)
                  mask[y1: y2, x1: x2] = 0.
              mask = np.expand_dims(mask, 0).repeat(c, axis=0)
              imgs = imgs * mask
      
              return imgs
      
      
      class TrainPipeline(Pipeline):
          def __init__(self, batch_size, num_threads, device_id, data_root, img_size, n_holes, length, custom_cutout=False):
              if custom_cutout:
                  super(TrainPipeline, self).__init__(batch_size, num_threads, device_id,
                                                      exec_async=False,
                                                      exec_pipelined=False)
                  mode = 'cpu'
                  self.decode = ops.decoders.Image(device='cpu')  # pipeline中定义了一个解码图像的模块,输出的格式为RGB顺序
              else:
                  super(TrainPipeline, self).__init__(batch_size, num_threads, device_id, prefetch_queue_depth=4)
                  mode = 'gpu'
                  self.decode = ops.decoders.Image(device='mixed')
      
              self.img_size = img_size
              # readers.File类似torchvision.datasets.ImageFolder,dali还有其他高阶API,可自行研究使用
              self.input = ops.readers.File(file_root=data_root, random_shuffle=True)
              # Resize
              self.resize = ops.Resize(device=mode, resize_x=int(img_size*1.2), resize_y=int(img_size*1.2))
              # Randomcrop,类似于torchvision.transforms.RandomCrop
              self.randomcrop = ops.RandomResizedCrop(device=mode, size=img_size, random_area=[0.3, 1.0])
              # CropMirrorNormalize可以实现normalize和随机水平翻转,类似于torchvision.transforms.Normalize & RandomHorizontalFlip
              self.normalize = ops.CropMirrorNormalize(device=mode, mean=[0.5*255, 0.5*255, 0.5*255],
                                                       std=[0.5*255, 0.5*255, 0.5*255])
              # 获取随机数
              self.rng1 = ops.random.Uniform()
              self.rng2 = ops.random.CoinFlip()
              # 实例化改变图片色彩的类,类似于torchvision.transforms.ColorJitter
              self.colortwist = ops.ColorTwist(device=mode)
              # 实例化旋转图像的类,类似于torchvision.transforms.RandomRotation
              self.rotate = ops.Rotate(device=mode, fill_value=0)
              # gridmask,类似于cutout这种随机遮挡块操作
              self.gridmask = ops.GridMask(device=mode)
              """
              自定义cutout预处理,缺点是全部需要用cpu进行,
              需要设exec_async=False和exec_pipelined=False
              """
              self.custom_cutout = custom_cutout
              self.mask = ops.PythonFunction(device=mode, function=CUTOUT(n_holes, length), num_outputs=1)
      
          # 作用是在调用该pipeline时,应该如何对数据进行实际的操作,可以理解为pytorch的module的forward函数
          def define_graph(self):
              rng1 = self.rng1(range=(0.5, 1.5))  # brightness or contrast
              rng2 = self.rng1(range=(0.5, 1.0))  # saturation
              rng3 = self.rng1(range=(0, 0.3))  # hue
              rng4 = self.rng1(range=(-20, 20))  # rotation
              rng5 = self.rng2(probability=0.5)  # horizontal flip
              rng6 = self.rng1(range=(0.2, 0.6))  # gridmask: ratio between black square width and tile width.
              rng7 = self.rng1(range=(-50, 50))  # gridmask: offset of mask
      
              jpegs, labels = self.input(name='Reader')
              images = self.decode(jpegs)
              images = self.resize(images)
              images = self.rotate(images, angle=rng4)
              images = self.randomcrop(images)
              images = self.colortwist(images, brightness=rng1, contrast=rng1, saturation=rng2, hue=rng3)
              if self.custom_cutout:
                  images = self.mask(images)
              else:
                  images = self.gridmask(images, ratio=rng6, shift_x=rng7, shift_y=rng7, tile=int(self.img_size*0.25))
              images = self.normalize(images, mirror=rng5)
      
              return images, labels
      
      
      class TestPipeline(Pipeline):
          def __init__(self, batch_size, num_threads, device_id, data_root, img_size):
              super(TestPipeline, self).__init__(batch_size, num_threads, device_id, prefetch_queue_depth=4)
      
              self.decode = ops.decoders.Image(device='mixed')
              self.input = ops.readers.File(file_root=data_root, random_shuffle=False)
              self.resize = ops.Resize(device='gpu', resize_x=img_size, resize_y=img_size)
              self.normalize = ops.CropMirrorNormalize(device='gpu', mean=[0.5 * 255, 0.5 * 255, 0.5 * 255],
                                                       std=[0.5 * 255, 0.5 * 255, 0.5 * 255])
      
          def define_graph(self):
              jpegs, labels = self.input(name='Reader')
              images = self.decode(jpegs)
              images = self.resize(images)
              images = self.normalize(images)
      
              return images, labels
      
      
      def get_dali_iter(mode, batch_size, num_threads, device_id, data_root, img_size, n_holes, length, custom_cutout):
          if mode == 'train':
              pipe_train = TrainPipeline(batch_size, num_threads, device_id, data_root, img_size, n_holes, length,
                                         custom_cutout=custom_cutout)
              pipe_train.build()
              # DALIClassificationIterator: Returns 2 outputs (data and label) in the form of PyTorch’s Tensor, 即DataLoader
              train_loader = DALIClassificationIterator(pipe_train, size=pipe_train.epoch_size('Reader'),
                                                        last_batch_policy=LastBatchPolicy.PARTIAL, auto_reset=True)
              return train_loader
      
          elif mode == 'test':
              pipe_test = TestPipeline(batch_size, num_threads, device_id, data_root, img_size)
              pipe_test.build()
              """
              LastBatchPolicy.PARTIAL的作用等同于drop_last=False,保留最后一个batch的样本(该batch的样本数<batch size)
              用于训练或测试,测试的话一定要用这个,不然得到的测试结果会有不准确
              """
              test_loader = DALIClassificationIterator(pipe_test, size=pipe_test.epoch_size('Reader'),
                                                       last_batch_policy=LastBatchPolicy.PARTIAL, auto_reset=True)
              return test_loader
      

      函数get_dali_iter可以根据输入的mode类型(‘train’ or ‘test’)返回相应的dataloader。

      import torch
      import torch.nn as nn
      from time import time
      from torchvision import models
      
      
      # 导入模型
      device = torch.device('cuda:' + device_id if torch.cuda.is_available() else "cpu")
      class Resnet18(nn.Module):
          def __init__(self, n_class):
              super(Resnet18, self).__init__()
              model = models.resnet18(pretrained=True)
              self.backbone = nn.Sequential(*list(model.children())[:-1])
              self.fc = nn.Linear(512, n_class)
      
          def forward(self, x):
              x = self.backbone(x)
              feature = torch.flatten(x, 1)
              out = self.fc(feature)
              return feature, out
      
      
      # 将模型搬到device上
      model = Resnet18(num_classes).to(device)
      # 定义损失函数及优化器
      criterion = nn.CrossEntropyLoss()
      optimizer = torch.optim.SGD(model.parameters(), lr=lr, momentum=0.9)
      
      
      # torchvision训练+测试代码
      def test_cpu(model, eval_loader):
          model.eval()
          correct = 0
          total = 0
          with torch.no_grad():
              for (img, label) in eval_loader:
                  img, label = img.to(device), label.to(device)
                  _, output = model(img)
                  predicted = torch.argmax(output, dim=1)
                  total += label.size(0)
                  correct += (predicted == label).sum().item()
              accu = correct / total
          return accu
      
      
      start = time()
      for epoch in range(epochs):
          model.train()
          for batch_idx, (img, label) in enumerate(train_loader):
              img, label = img.to(device), label.to(device)
              _, output = model(img)
              loss = criterion(output, label)
              optimizer.zero_grad()
              loss.backward()
              optimizer.step()
          # 测试模型
          if validation:
              t_accu = test_cpu(model, test_loader)
      print('Running time of torchvision: {}'.format(time() - start))
      
      
      # DALI with GPU训练+测试代码
      def test_dali(model, eval_loader):
          model.eval()
          correct = 0
          total = 0
          with torch.no_grad():
              for data in eval_loader:
                  img = data[0]["data"]
                  label = data[0]["label"].squeeze(-1).long().to(device)
                  _, output = model(img)
                  predicted = torch.argmax(output, dim=1)
                  total += label.size(0)
                  correct += (predicted == label).sum().item()
              accu = correct / total
          return accu
      
      
      start = time()
      for _ in range(epochs):
          model.train()
          for batch_idx, data in enumerate(train_loader):
              # 图像和标签导入部分跟torchvision不一样,其余都一样
              img = transform(data[0]["data"])
              label = data[0]["label"].squeeze(-1).long().to(device)
      
              _, output = model(img)
              loss = criterion(output, label)
              optimizer.zero_grad()
              loss.backward()
              optimizer.step()
      
          # 测试模型
          if validation:
              t_accu = test_dali(model, test_loader)
      print('Running time of DALI: {}'.format(time() - start))
      

      下面我进行了两组实验来证明DALI的加速效果,模型:ResNet18,训练图像数量:2000+,验证模式下的测试图像数量:700+,训练100个epoch,验证模式下每训完一个epoch测试一轮。

      • DALI加速——对比实验1
        CPU: Intel® Core™ i9-10980XE CPU @ 3.00GHz
        GPU: Tesla T4 16G,图灵架构
        62bb5aaa-99e2-43c2-aa80-55202ed48589-image.png
        从上图可以发现,在使用线程数较少的情况下,传统的torchvision速度更快,但随着线程数的增加,torchvision速度并没有太大变化,而DALI加速明显(注意,这里为了统一比较,我将torchvision的num_workers变量和DALI的num_threads视为相同变量)。可以看一下右边第三幅图,当使用较多线程时,DALI在只使用CPU的情况下仍然比torchvision要快(注意,这里DALI使用的CUTOUT与torchvision一致)。
        bc841216-20a5-4ce5-9be3-9f2d99267391-image.png
        从上图可以发现,在DALI在使用GPU的情况下,加速效果均优于torchvision,特别是在batch size较大的情况下(注意,由于DALI原生API没有CUTOUT的增强操作,这里我用GridMask代替,两者作用都是给图片随机打上马赛克)。
        以上两个实验是在仅训练的情况下进行的,下面我做了一个补充实验,验证使用torchvision训练和使用DALI测试能比全部使用torchvision快多少。为什么要进行这个实验,因为有些人担心使用DALI之后,数据增强跟原来不一样(实际上,我测试发现两者的差异几乎可以忽略),或者说自己的数据增强方式在DALI框架下很难实现(实际上,通过自定义增强+DALI with CPU可以实现),那也可以在训练阶段使用torchvision而测试阶段使用DALI加速!因为在测试时我们的图片都是Resize,归一化,Totensor的,DALI和torchvision在该情况下是一模一样的,当我们测试集图片较多时,DALI可以明显加速测试,为什么不用它呢?
        077d86c9-33d0-4e9b-9e40-135b694ec3c1-image.png
        从上图可以发现,当使用DALI代替torchvision进行测试后,运算速度得到了一定幅度的提升,当batch size增大时,这种差距会越来越明显。当我们显存冗余时,我们可以直接把验证的batch size拉满以最大化运算速度。

      • DALI加速——对比实验2
        这个实验我是在恒源云服务器上跑的(时间就是金钱,加速省下来的时间就是💴)。该云服务器配置如下:
        CPU: AMD EPYC 7302,分配8个核心
        GPU: NVIDIA GeForce RTX 3090,安培架构
        2e842f44-a0ab-43ed-b189-2d078ea14cf3-image.png
        从上图可以发现,在使用torchvision的情况下,并不是num_workers数目越大越好,在该云服务器的配置下,num_workers=8可以实现较好的加速效果;另一方面,DALI在使用GPU的情况下,加速效果基本不受batch size和num_threads影响。当我们在某些情况,比如需要固定batch size同时又希望加速实验时(因为调整batch size的同时我们需要调整相应的学习率等超参数,导致需要进一步实验进行超参数验证,得不偿失),DALI是你的不二之选。另外,可以发现,DALI在旗舰卡3090的加持下,加速效果比T4明显许多,基本能在torchvision基础上实现两倍甚至更快的加速效果。这也验证了官方的建议,DALI可以在伏打和安培架构下实现更快的加速,手上有这些架构的显卡的同学要偷笑了。我认为,当用于训练或者测试的图像数量越多,DALI加速效果越明显!

      下面列举一些常用的N卡及其架构,其余型号可自行查询。
      伏打架构:TITAN V, Tesla V100等
      安培架构:30系列
      图灵架构:20/16系列, Tesla T4, RTX5000, RTX6000等

      使用DALI前后的GPU利用率变化图:
      动图
      可以发现,使用DALI之后,GPU利用率一直维持在100%附近,而torchvision则是一会高一会低,GPU利用效率不高。使用DALI会占据更多的显存,这也是加速带来的代价。

      如何安装DALI这些更多细节,可见我知乎写的这篇文章数据加载预处理DALI加速-保姆级教程
      本文原创不易,麻烦点波赞和关注再走~

      131****7225 1 Reply Last reply Reply Quote 2
      • 189****7698
        189****7698 @131****7225 last edited by

        @131-7225 👏 👏 👏

        1 Reply Last reply Reply Quote 0
        • 131****7225
          131****7225 @189****7698 last edited by

          @189-7698 👯 热情可爱琳琳酱来啦

          189****7698 1 Reply Last reply Reply Quote 0
          • First post
            Last post