Github - NVIDIA/DALI

docs - NVIDIA DALI documentation

NVIDIA/DALI(NVIDIA Data Loading Library) 库包含了用于加速深度学习应用的数据预处理的高度优化的构建模块(highly optimized building blocks)和执行引擎(execution engine).

深度学习应用中,往往需要复杂的、多阶段的数据预处理管道. 这些数据管道主要是在CPU上执行的数据密集型操作(compute-intensive operation),b 比如,从磁盘加载数据,解码,裁剪,随机resize,颜色和空间增强,格式转换,等等,都主要是在 CPUs 上完成的,其限制了训练和推断时的性能和可扩展性.

另外,深度学习框架有很多不同的数据预处理实现,这就为网络训练和推断工作流的可移植性和代码维护带来了挑战.

DALI 是高度优化后的构建模块的集合,是用于加速输入数据预处理的执行引擎.

DALI 作为单独的库,为不同数据管道的加速,提供了更优的性能和灵活性. DALI 库可以很方便的整合仅不同深度学习训练和推断应用中.

1. DALI Highlights

[1] - 全数据管道加速 - 从磁盘读取到用于训练和推断的数据处理.

[2] - 灵活性高 - 可配置图(configurable graphs)和定制操作子(custom operators).

[3] - 支持图像分类和分割的数据加载.

[4] - 易于整合 - 直接的框架插件和开源绑定(open source bindings).

[5] - 训练工作流的便携性,支持多种输入格式:JPEG,
PNG (fallback to CPU), TIFF (fallback to CPU), BMP (fallback to CPU), raw formats, LMDB, RecordIO, TFRecord.

[6] - 定制化的可扩展性.

2. DALI 安装

NVIDIA GPU Cloud 中的 TensorFlow, PyTorch 和 MXNet containers(18.07 以后版本) 已经预安装了 DALI 库.

2.1. prebuilt DALI 安装

依赖项

[1] - Linux x64.

[2] - NVIDIA Driver supporting CUDA 9.0 or later (i.e., 384.xx or later driver releases).

[3] - 深度框架:

  • MXNet 1.3 mxnet-cu90 or later.
  • PyTorch 0.4 or later.
  • TensorFlow 1.7 or later.

CUDA9.0 安装

pip install --extra-index-url https://developer.download.nvidia.com/compute/redist/cuda/9.0 nvidia-dali

CUDA10.0 安装(DALI 0.80 以后版本):

pip install --extra-index-url https://developer.download.nvidia.com/compute/redist/cuda/10.0 nvidia-dali

DALI 0.6.1 版本以后,nvidia-dali 不再包含 DALI TensorFlow 插件的 prebuilt 版本,因此需要采用如下方式安装:

pip install --extra-index-url https://developer.download.nvidia.com/compute/redist/cuda/9.0 nvidia-dali-tf-plugin

DALI 0.8.0 版本以后,支持 CUDA10.0:

pip install --extra-index-url https://developer.download.nvidia.com/compute/redist/cuda/10.0 nvidia-dali-tf-plugin

此时,会自动安装 nvidia-dali 及其依赖项.

安装指定版本:

pip install --extra-index-url https://developer.download.nvidia.com/compute/redist nvidia-dali-tf-plugin==0.6.1

### 2.2. Nightly builds DALI 安装

在安装每日更新的 DALI 库前,建议先卸载常规 DALI.

CUDA9:

pip install --extra-index-url https://developer.download.nvidia.com/compute/redist/nightly/cuda/9.0 nvidia-dali-nightly
pip install --extra-index-url https://developer.download.nvidia.com/compute/redist/nightly/cuda/9.0 nvidia-dali-tf-plugin-nightly

CUDA10:

pip install --extra-index-url https://developer.download.nvidia.com/compute/redist/nightly/cuda/10.0 nvidia-dali-nightly
pip install --extra-index-url https://developer.download.nvidia.com/compute/redist/nightly/cuda/10.0 nvidia-dali-tf-plugin-nightly

2.3. Weekly builds DALI 安装

每周更新版本

CUDA10

pip install --extra-index-url https://developer.download.nvidia.com/compute/redist/weekly/cuda/10.0 nvidia-dali-weekly
pip install --extra-index-url https://developer.download.nvidia.com/compute/redist/weekly/cuda/10.0 nvidia-dali-tf-plugin-weekly

2.4. DALI 源码安装

Compiling DALI from source

3. 简单使用

深度学习框架提供的数据输入和数据增强管道,一般可分为如下两类:

[1] - 快速,但不灵活 - C++ 编码,它对外保留为单一的 python 对象,仅提供了特定的操作集和顺序.

[2] - 慢,但灵活 - 基于 C++ 或 Python 编码的构建模块集合,其可以任意组合数据管道,但速度变慢. 一个最大的问题是,这种数据管道会遇到 Python 的 GIL(Global Interpreter Lock) 问题. 这就需要开发者采用多进程、有效的输入管道需要复杂的设计.

3.1. DALI Pipeline

DALI 中最重要的类型,其包含了所有必要信息以及与定义、构建、运行等相关的函数.

from nvidia.dali.pipeline import Pipeline
help(Pipeline)

输出如:

Help on class Pipeline in module nvidia.dali.pipeline:

class Pipeline(__builtin__.object)
 |  Pipeline class encapsulates all data required to define and run
 |  DALI input pipeline.
 |
 |  Parameters
 |  ----------
 |  `batch_size` : int, optional, default = -1
 |      Batch size of the pipeline. Negative values for this parameter
 |      are invalid - the default value may only be used with
 |      serialized pipeline (the value stored in serialized pipeline
 |      is used instead).
 |  `num_threads` : int, optional, default = -1  # CPU 线程数
 |      Number of CPU threads used by the pipeline.
 |      Negative values for this parameter are invalid - the default
 |      value may only be used with serialized pipeline (the value
 |      stored in serialized pipeline is used instead).
 |  `device_id` : int, optional, default = -1  # GPU Id
 |      Id of GPU used by the pipeline.
 |      Negative values for this parameter are invalid - the default
 |      value may only be used with serialized pipeline (the value
 |      stored in serialized pipeline is used instead).
 |  `seed` : int, optional, default = -1  # 随机数生成
 |      Seed used for random number generation. Leaving the default value
 |      for this parameter results in random seed.
 |  `exec_pipelined` : bool, optional, default = True  
 |      Whether to execute the pipeline in a way that enables
 |      overlapping CPU and GPU computation, typically resulting
 |      in faster execution speed, but larger memory consumption.
 |  `exec_async` : bool, optional, default = True
 |      Whether to execute the pipeline asynchronously.
 |      This makes :meth:`nvidia.dali.pipeline.Pipeline.run` method
 |      run asynchronously with respect to the calling Python thread.
 |      In order to synchronize with the pipeline one needs to call
 |      :meth:`nvidia.dali.pipeline.Pipeline.outputs` method.
 |  `bytes_per_sample` : int, optional, default = 0
 |      A hint for DALI for how much memory to use for its tensors.
 |  `set_affinity` : bool, optional, default = False
 |      Whether to set CPU core affinity to the one closest to the
 |      GPU being used.
 |  `max_streams` : int, optional, default = -1 # CUDA 流数
 |      Limit the number of CUDA streams used by the executor.
 |      Value of -1 does not impose a limit.
 |      This parameter is currently unused (and behavior of
 |      unrestricted number of streams is assumed).
 |  `prefetch_queue_depth` : int or {"cpu_size": int, "gpu_size": int}, optional, default = 2
 |      Depth of the executor pipeline. Deeper pipeline makes DALI
 |      more resistant to uneven execution time of each batch, but it
 |      also consumes more memory for internal buffers.
 |      Specifying a dict:
 |      ``{ "cpu_size": x, "gpu_size": y }``
 |      instead of integer will cause the pipeline to use separated
 |      queues executor, with buffer queue size `x` for cpu stage
 |      and `y` for mixed and gpu stages. It is not supported when both `exec_async`
 |      and `exec_pipelined` are set to `False`.
 |      Executor will buffer cpu and gpu stages separatelly,
 |      and will fill the buffer queues when the first :meth:`nvidia.dali.pipeline.Pipeline.run`
 |      is issued.
 |
 |  Methods defined here:
 |
 |  __init__(self, batch_size=-1, num_threads=-1, device_id=-1, seed=-1, exec_pipelined=True, prefetch_queue_depth=2, exec_async=True, bytes_per_sample=0, set_affinity=False, max_streams=-1, default_cuda_stream_priority=0)
 |
 |  build(self)
 |      Build the pipeline.
 |
 |      Pipeline needs to be built in order to run it standalone.
 |      Framework-specific plugins handle this step automatically.
 |
 |  define_graph(self)
 |      This function is defined by the user to construct the
 |      graph of operations for their pipeline.
 |
 |      It returns a list of outputs created by calling DALI Operators.
 |
 |  deserialize_and_build(self, serialized_pipeline)
 |      Deserialize and build the pipeline given in serialized form.
 |
 |      Parameters
 |      ----------
 |      serialized_pipeline : str
 |                            Serialized pipeline.
 |
 |  epoch_size(self, name=None)
 |      Epoch size of a pipeline.
 |
 |      If the `name` parameter is `None`, returns a dictionary of pairs
 |      `(reader name, epoch size for that reader)`.
 |      If the `name` parameter is not `None`, returns epoch size for that
 |      reader.
 |
 |      Parameters
 |      ----------
 |      name : str, optional, default = None
 |          The reader which should be used to obtain epoch size.
 |
 |  feed_input(self, ref, data, layout=TensorLayout.NHWC)
 |      Bind the NumPy array to a tensor produced by ExternalSource
 |      operator. It is worth mentioning that `ref` should not be overriden
 |      with other operator outputs.
 |
 |  iter_setup(self)
 |      This function can be overriden by user-defined
 |      pipeline to perform any needed setup for each iteration.
 |      For example, one can use this function to feed the input
 |      data from NumPy arrays.
 |
 |  outputs(self)
 |      Returns the outputs of the pipeline and releases previous buffer.
 |
 |      If the pipeline is executed asynchronously, this function blocks
 |      until the results become available. It rises StopIteration if data set
 |      reached its end - usually when iter_setup cannot produce any more data
 |
 |  reset(self)
 |      Resets pipeline iterator
 |
 |      If pipeline iterator reached the end then reset its state to the beginning.
 |
 |  run(self)
 |      Run the pipeline and return the result.
 |
 |      If the pipeline was created with `exec_pipelined` option set to `True`,
 |      this function will also start prefetching the next iteration for
 |      faster execution.
 |
 |  save_graph_to_dot_file(self, filename)
 |      Saves the pipeline graph to a file.
 |
 |      Parameters
 |      ----------
 |      filename : str
 |                 Name of the file to which the graph is written.
 |
 |  serialize(self)
 |      Serialize the pipeline to a Protobuf string.
 |
 |  ----------------------------------------------------------------------
 |  Data descriptors defined here:
 |
 |  __dict__
 |      dictionary for instance variables (if defined)
 |
 |  __weakref__
 |      list of weak references to the object (if defined)
 |
 |  batch_size
 |      Batch size.
 |
 |  device_id
 |      Id of the GPU used by the pipeline.
 |
 |  num_threads
 |      Number of CPU threads used by the pipeline.

3.2. 定义 DALI pipeline

以猫狗分类为例.

数据集路径结构如下:

from __future__ import print_function
import os.path
import fnmatch

for root, dir, files in os.walk("data/images"):
        depth = root.count('/')
        ret = ""
        if depth > 0:
            ret += "  " * (depth - 1) + "|-"
        print (ret + root)
        for items in fnmatch.filter(files, "*"):
                print (" " * len(ret) + "|-" + items)

如:

images
|-file_list.txt
|-images/dog
  |-dog_4.jpg
  |-dog_5.jpg
  |-dog_9.jpg
  |-dog_6.jpg
  |-dog_3.jpg
  |-dog_7.jpg
  |-dog_10.jpg
  |-dog_2.jpg
  |-dog_8.jpg
  |-dog_1.jpg
  |-dog_11.jpg
|-images/kitten
  |-cat_10.jpg
  |-cat_5.jpg
  |-cat_9.jpg
  |-cat_8.jpg
  |-cat_1.jpg
  |-cat_7.jpg
  |-cat_6.jpg
  |-cat_3.jpg
  |-cat_2.jpg
  |-cat_4.jpg

这里,创建的 pipeline 将从数据集路径中读取图片,解码图片,并返回 (image, labels) 对.

import nvidia.dali.ops as ops
import nvidia.dali.types as types

image_dir = "data/images"
batch_size = 8

class SimplePipeline(Pipeline):
    def __init__(self, batch_size, num_threads, device_id):
        super(SimplePipeline, self).__init__(batch_size, num_threads, device_id, seed = 12)
        self.input = ops.FileReader(file_root = image_dir)
        # instead of path to file directory file with pairs image_name image_label_value can be provided
        # self.input = ops.FileReader(file_root = image_dir, file_list = image_dir + '/file_list.txt')
        self.decode = ops.ImageDecoder(device = 'cpu', output_type = types.RGB)

    def define_graph(self):
        jpegs, labels = self.input()
        images = self.decode(jpegs)
        return (images, labels)

这里,SimplePipeline 类是 dali.pipeline.Pipeline 的子类.

此处,仅需要实现两个方法: 构建类 constructor 和 define_graph 函数.

构建的 pipeline 的全局参数有:

[1] - batch size

[2] - 计算所用的 CPU 线程数

[3] - GPU 设备Id.(SimplePipeline 没有采用到 GPU 进行计算)

[4] - 随机数生成种子

此外,还包含两个成员变量,采用 dali.ops 模块的操作子:

[1] - FileReader - 遍历路径,并返回 (image, label) 对.

[2] - ImageDecoder - 采用编码的图像作为输入,并输出解码后的 RGB 图像.

3.3. 构建 DALI pipeline

pipe = SimplePipeline(batch_size, 1, 0)
pipe.build()

3.4. 运行 DALI pipeline

pipe_out = pipe.run()
print(pipe_out)

输出如:

[<nvidia.dali.backend_impl.TensorListCPU object at 0x7ff6080bf180>, <nvidia.dali.backend_impl.TensorListCPU object at 0x7ff6080bf5e0>]

数据格式为 DALI Tensor. 其可以转换为 Numpy array,但并不是所有的 DALI Tensor 都能够进行转换.

images, labels = pipe_out
print("Images is_dense_tensor: " + str(images.is_dense_tensor()))
print("Labels is_dense_tensor: " + str(labels.is_dense_tensor()))

输出如:

Images is_dense_tensor: False
Labels is_dense_tensor: True

labels 的值如:

import numpy as np

labels_tensor = labels.as_tensor()
print (labels_tensor.shape())
print (np.array(labels_tensor))

输出如:

[8L, 1L]
[[0]
 [0]
 [0]
 [0]
 [0]
 [0]
 [0]
 [0]]

images 的值需要对 TensorList 中的所有 tensors 进行循环,如:

from __future__ import division
import matplotlib.gridspec as gridspec
import matplotlib.pyplot as plt

def show_images(image_batch):
    columns = 4
    rows = (batch_size + 1) // (columns)
    fig = plt.figure(figsize = (32,(32 // columns) * rows))
    gs = gridspec.GridSpec(rows, columns)
    for j in range(rows*columns):
        plt.subplot(gs[j])
        plt.axis("off")
        plt.imshow(image_batch.at(j))
    plt.show()    

#
show_images(images)

3.5. 添加数据增强 DALI pipline

3.5.1. Random shuffle

随机打乱数据顺序.

class ShuffledSimplePipeline(Pipeline):
    def __init__(self, batch_size, num_threads, device_id):
        super(ShuffledSimplePipeline, self).__init__(batch_size, num_threads, device_id, seed = 12)
        self.input = ops.FileReader(file_root = image_dir, random_shuffle = True, initial_fill = 21)
        self.decode = ops.ImageDecoder(device = 'cpu', output_type = types.RGB)

    def define_graph(self):
        jpegs, labels = self.input()
        images = self.decode(jpegs)
        return (images, labels)
    
#
pipe = ShuffledSimplePipeline(batch_size, 1, 0)
pipe.build()

pipe_out = pipe.run()
images, labels = pipe_out
show_images(images)    

输出如:

3.5.2. Rotate

DALI 不仅可以从磁盘读取图片,并 batch 化为 tensors;还可以对图片进行各种数据增强.

图片固定角度旋转.

class RotatedSimplePipeline(Pipeline):
    def __init__(self, batch_size, num_threads, device_id):
        super(RotatedSimplePipeline, self).__init__(batch_size, num_threads, device_id, seed = 12)
        self.input = ops.FileReader(file_root = image_dir, random_shuffle = True, initial_fill = 21)
        self.decode = ops.ImageDecoder(device = 'cpu', output_type = types.RGB)
        self.rotate = ops.Rotate(angle = 10.0)

    def define_graph(self):
        jpegs, labels = self.input()
        images = self.decode(jpegs)
        rotated_images = self.rotate(images)
        return (rotated_images, labels)
#
pipe = RotatedSimplePipeline(batch_size, 1, 0)
pipe.build()

pipe_out = pipe.run()
images, labels = pipe_out
show_images(images)

输出如:

3.5.3. RandomRotated

随机旋转一定角度.

class RandomRotatedSimplePipeline(Pipeline):
    def __init__(self, batch_size, num_threads, device_id):
        super(RandomRotatedSimplePipeline, self).__init__(batch_size, num_threads, device_id, seed = 12)
        self.input = ops.FileReader(file_root = image_dir, random_shuffle = True, initial_fill = 21)
        self.decode = ops.ImageDecoder(device = 'cpu', output_type = types.RGB)
        self.rotate = ops.Rotate()
        self.rng = ops.Uniform(range = (-10.0, 10.0))

    def define_graph(self):
        jpegs, labels = self.input()
        images = self.decode(jpegs)
        angle = self.rng()
        rotated_images = self.rotate(images, angle = angle)
        return (rotated_images, labels)

#
pipe = RandomRotatedSimplePipeline(batch_size, 1, 0)
pipe.build()

pipe_out = pipe.run()
images, labels = pipe_out
show_images(images)

输出如:

3.6. GPU 加速

DALI 库提供了 GPU 加速操作子,用于提升数据输入和数据增强的速度,而且易于扩展到 multi-GPU 系统.

3.6.1. tensors 复制到 GPU

RandomRotatedSimplePipeline Pipeline 为例:

class RandomRotatedGPUPipeline(Pipeline):
    def __init__(self, batch_size, num_threads, device_id):
        super(RandomRotatedGPUPipeline, self).__init__(batch_size, num_threads, device_id, seed = 12)
        self.input = ops.FileReader(file_root = image_dir, random_shuffle = True, initial_fill = 21)
        self.decode = ops.ImageDecoder(device = 'cpu', output_type = types.RGB)
        self.rotate = ops.Rotate(device = "gpu") #
        self.rng = ops.Uniform(range = (-10.0, 10.0))

    def define_graph(self):
        jpegs, labels = self.input()
        images = self.decode(jpegs)
        angle = self.rng()
        rotated_images = self.rotate(images.gpu(), angle = angle)  #
        return (rotated_images, labels)
#
pipe = RandomRotatedGPUPipeline(batch_size, 1, 0)
pipe.build()

pipe_out = pipe.run()
print(pipe_out)

输出如:

[<nvidia.dali.backend_impl.TensorListGPU object at 0x7ff565380ed8>, <nvidia.dali.backend_impl.TensorListCPU object at 0x7ff564060148>]

TensorListGPU 即为在 GPU 上的 tensors,采用 as_cpu 可以将 GPU 数据复制到 CPU:

images, labels = pipe_out
show_images(images.as_cpu())

输出如:

注意事项: DALI 库不支持将数据在 pipeline 内部从 GPU 移动到 CPU. 在所有执行路径中,CPU运算符不能跟在GPU运算符后面.

3.6.2. 混合解码(Hybrid decoding)

在某些场景下,尤其是,高分辨率图片,JPEG格式存储的图片的解码是一个瓶颈. 对此,nvJPEG 库将比较有帮助,其将解码过程分为 CPU 和 GPU,显著降低解码时间.

指定 ImageDecodermixed 参数,以开启 nvJPEG 支持. 其它格式的文件仍在 CPU 上进行解码.

nvJPEG

class HybridPipeline(Pipeline):
    def __init__(self, batch_size, num_threads, device_id):
        super(HybridPipeline, self).__init__(batch_size, num_threads, device_id, seed = 12)
        self.input = ops.FileReader(file_root = image_dir, random_shuffle = True, initial_fill = 21)
        self.decode = ops.ImageDecoder(device = "mixed", output_type = types.RGB)

    def define_graph(self):
        jpegs, labels = self.input()
        images = self.decode(jpegs)
        # images are on the GPU
        return (images, labels)
    
#
pipe = HybridPipeline(batch_size, 1, 0)
pipe.build()


pipe_out = pipe.run()
images, labels = pipe_out
show_images(images.as_cpu())

ImageDecoder 设置 device=mixed 参数,将使用混合方法同时在 CPU 和 GPU 上进行计算. 也就是说,其接收 CPU 输入,但返回 GPU 输出.

输出如:

ImageDecoder 参数设置为 cpumixed 的速度对比:

from timeit import default_timer as timer

test_batch_size = 64

def speedtest(pipeclass, batch, n_threads):
    pipe = pipeclass(batch, n_threads, 0)
    pipe.build()
    # warmup
    for i in range(5):
        pipe.run()
    # test
    n_test = 20
    t_start = timer()
    for i in range(n_test):
        pipe.run()
    t = timer() - t_start
    print("Speed: {} imgs/s".format((n_test * batch)/t))
#
speedtest(ShuffledSimplePipeline, test_batch_size, 4)
#Speed: 2905.71010277 imgs/s

#
speedtest(HybridPipeline, test_batch_size, 4)
#Speed: 5714.61475087 imgs/s

4. 示例

From: nvidia-dali GPU加速预处理 - 2019.08.17

import nvidia.dali.ops as ops
import nvidia.dali.types as types
from nvidia.dali.pipeline import Pipeline
from nvidia.dali.plugin.pytorch import DALIClassificationIterator, DALIGenericIterator
 
 
class HybridTrainPipe(Pipeline):
    def __init__(self, batch_size, num_threads, device_id, data_dir, crop, dali_cpu=False, local_rank=0, world_size=1):
        super(HybridTrainPipe, self).__init__(
            batch_size, num_threads, device_id, seed=12 + device_id)
        dali_device = "gpu"
        self.input = ops.FileReader(file_root=data_dir, 
                                    shard_id=local_rank, 
                                    num_shards=world_size, 
                                    random_shuffle=True)
        self.decode = ops.ImageDecoder(device="mixed", output_type=types.RGB)
        self.res = ops.RandomResizedCrop(
            device="gpu", 
            size=crop, 
            random_area=[0.08, 1.25])
        self.cmnp = ops.CropMirrorNormalize(
            device="gpu",
            output_dtype=types.FLOAT,
            output_layout=types.NCHW,
            image_type=types.RGB,
            mean=[0.485 * 255, 0.456 * 255, 0.406 * 255],
            std=[0.229 * 255, 0.224 * 255, 0.225 * 255])
        self.coin = ops.CoinFlip(probability=0.5)
        print('DALI "{0}" variant'.format(dali_device))
 
    def define_graph(self):
        rng = self.coin()
        self.jpegs, self.labels = self.input(name="Reader")
        images = self.decode(self.jpegs)
        images = self.res(images)
        output = self.cmnp(images, mirror=rng)
        return [output, self.labels]
 
 
class HybridValPipe(Pipeline):
    def __init__(self, batch_size, num_threads, device_id, data_dir, crop, size, local_rank=0, world_size=1):
        super(HybridValPipe, self).__init__(
            batch_size, num_threads, device_id, seed=12 + device_id)
        self.input = ops.FileReader(file_root=data_dir, 
                                    shard_id=local_rank, 
                                    num_shards=world_size,
                                    random_shuffle=False)
        self.decode = ops.ImageDecoder(device="mixed", output_type=types.RGB)
        self.res = ops.Resize(
            device="gpu", 
            resize_shorter=size, 
            interp_type=types.INTERP_TRIANGULAR)
        self.cmnp = ops.CropMirrorNormalize(
            device="gpu",
            output_dtype=types.FLOAT,
            output_layout=types.NCHW,
            crop=(crop, crop),
            image_type=types.RGB,
            mean=[0.485 * 255, 0.456 * 255, 0.406 * 255],
            std=[0.229 * 255, 0.224 * 255, 0.225 * 255])
 
    def define_graph(self):
        self.jpegs, self.labels = self.input(name="Reader")
        images = self.decode(self.jpegs)
        images = self.res(images)
        output = self.cmnp(images)
        return [output, self.labels]
 
 
def get_imagenet_iter_dali(type, image_dir, batch_size, 
                           num_threads, device_id, num_gpus, 
                           crop, val_size=256,
                           world_size=1,
                           local_rank=0):
    if type == 'train':
        pip_train = HybridTrainPipe(
            batch_size=batch_size, 
            num_threads=num_threads, 
            device_id=local_rank,
            data_dir=image_dir + '/train',
            crop=crop, 
            world_size=world_size, local_rank=local_rank)
        
        pip_train.build()
        dali_iter_train = DALIClassificationIterator(
            pip_train, 
            size=pip_train.epoch_size("Reader") // world_size)
        
        return dali_iter_train
    
    elif type == 'val':
        pip_val = HybridValPipe(
            batch_size=batch_size, 
            num_threads=num_threads, 
            device_id=local_rank,
            data_dir=image_dir + '/val',
            crop=crop, 
            size=val_size, 
            world_size=world_size, local_rank=local_rank)
        pip_val.build()
        dali_iter_val = DALIClassificationIterator(
            pip_val, 
            size=pip_val.epoch_size("Reader") // world_size)
        
        return dali_iter_val
    
if __name__ == '__main__':
    train_loader = get_imagenet_iter_dali(
        type='train', image_dir='/path/to/imagenet', 
        batch_size=256,
        num_threads=4, crop=224, device_id=0, num_gpus=1)
    print('[INFO]start iterate')
    start = time.time()
    for i, data in enumerate(train_loader):
        images = data[0]["data"].cuda(non_blocking=True)
        labels = data[0]["label"].squeeze().long().cuda(non_blocking=True)
    end = time.time()
    print('[INFO]end iterate')
    print('[INFO]dali iterate time: %fs' % (end - start))

5. 相关

[1] - 英伟达DALI加速技巧:让数据预处理速度比原生PyTorch快4倍 - 2020.02.04

[2] - Nvidia dali speeding up pytorch

Last modification:May 9th, 2021 at 04:06 pm