• 异步数据读取
    • 创建PyReader对象
    • 设置PyReader对象的数据源
    • 使用PyReader进行模型训练和测试

    异步数据读取

    除同步Feed方式外,我们提供了PyReader。PyReader的性能比 同步数据读取 更好,因为PyReader的数据读取和模型训练过程是异步进行的,且能与 double_buffer_reader 配合以进一步提高数据读取性能。此外, double_buffer_reader 负责异步完成CPU Tensor到GPU Tensor的转换,一定程度上提升了数据读取效率。

    创建PyReader对象

    创建PyReader对象的方式为:

    1. import paddle.fluid as fluid
    2.  
    3. image = fluid.layers.data(name='image', dtype='float32', shape=[784])
    4. label = fluid.layers.data(name='label', dtype='int64', shape=[1])
    5.  
    6. ITERABLE = True
    7.  
    8. py_reader = fluid.io.PyReader(feed_list=[image, label], capacity=64, use_double_buffer=True, iterable=ITERABLE)

    其中,

    • feed_list为需要输入的数据层变量列表;
    • capacity为PyReader对象的缓存区大小;
    • use_double_buffer默认为True,表示使用 double_buffer_reader 。建议开启,可提升数据读取速度;
    • iterable默认为True,表示该PyReader对象是可For-Range迭代的。当iterable=True时,PyReader与Program解耦,定义PyReader对象不会改变Program;当iterable=False时,PyReader会在Program中插入数据读取相关的op。需要注意的是:Program.clone() (参见 cn_api_fluid_Program_clone )不能实现PyReader对象的复制。如果您要创建多个不同PyReader对象(例如训练和预测阶段需创建两个不同的PyReader),则需重定义两个PyReader对象。若需要共享训练阶段和测试阶段的模型参数,您可以通过 fluid.unique_name.guard() 的方式来实现。注:Paddle采用变量名区分不同变量,且变量名是根据 unique_name 模块中的计数器自动生成的,每生成一个变量名计数值加1。 fluid.unique_name.guard() 的作用是重置 unique_name 模块中的计数器,保证多次调用 fluid.unique_name.guard() 配置网络时对应变量的变量名相同,从而实现参数共享。

    下面是一个使用PyReader配置训练阶段和测试阶段网络的例子:

    1. import paddle
    2. import paddle.fluid as fluid
    3. import paddle.dataset.mnist as mnist
    4.  
    5. def network():
    6. image = fluid.layers.data(name='image', dtype='float32', shape=[784])
    7. label = fluid.layers.data(name='label', dtype='int64', shape=[1])
    8. reader = fluid.io.PyReader(feed_list=[image, label], capacity=64)
    9.  
    10. # Here, we omitted the definition of loss of the model
    11. return loss , reader
    12.  
    13. # Create main program and startup program for training
    14. train_prog = fluid.Program()
    15. train_startup = fluid.Program()
    16.  
    17. with fluid.program_guard(train_prog, train_startup):
    18. # Use fluid.unique_name.guard() to share parameters with test network
    19. with fluid.unique_name.guard():
    20. train_loss, train_reader = network()
    21. adam = fluid.optimizer.Adam(learning_rate=0.01)
    22. adam.minimize(train_loss)
    23.  
    24. # Create main program and startup program for testing
    25. test_prog = fluid.Program()
    26. test_startup = fluid.Program()
    27. with fluid.program_guard(test_prog, test_startup):
    28. # Use fluid.unique_name.guard() to share parameters with train network
    29. with fluid.unique_name.guard():
    30. test_loss, test_reader = network()

    设置PyReader对象的数据源

    PyReader对象通过 decorate_sample_generator()decorate_sample_list_generatordecorate_batch_generator() 方法设置其数据源。这三个方法均接收Python生成器 generator 作为参数,其区别在于:

    • decorate_sample_generator() 要求 generator 返回的数据格式为[img_1, label_1],其中img_1和label_1为单个样本的Numpy Array类型数据。
    • decorate_sample_list_generator() 要求 generator 返回的数据格式为[(img_1, label_1), (img_2, label_2), …, (img_n, label_n)],其中img_i和label_i均为每个样本的Numpy Array类型数据,n为batch size。
    • decorate_batch_generator() 要求 generator 返回的数据的数据格式为[batched_imgs, batched_labels],其中batched_imgs和batched_labels为batch级的Numpy Array或LoDTensor类型数据。当PyReader的iterable=True(默认)时,必须给这三个方法传 places 参数,指定将读取的数据转换为CPU Tensor还是GPU Tensor。当PyReader的iterable=False时,不需传places参数。

    例如,假设我们有两个reader,其中fake_sample_reader每次返回一个sample的数据,fake_batch_reader每次返回一个batch的数据。

    1. import paddle.fluid as fluid
    2. import numpy as np
    3.  
    4. # sample级reader
    5. def fake_sample_reader():
    6. for _ in range(100):
    7. sample_image = np.random.random(size=(784, )).astype('float32')
    8. sample_label = np.random.random_integers(size=(1, ), low=0, high=9).astype('int64')
    9. yield sample_image, sample_label
    10.  
    11.  
    12. # batch级reader
    13. def fake_batch_reader():
    14. batch_size = 32
    15. for _ in range(100):
    16. batch_image = np.random.random(size=(batch_size, 784)).astype('float32')
    17. batch_label = np.random.random_integers(size=(batch_size, 1), low=0, high=9).astype('int64')
    18. yield batch_image, batch_label
    19.  
    20. image1 = fluid.layers.data(name='image1', dtype='float32', shape=[784])
    21. label1 = fluid.layers.data(name='label1', dtype='int64', shape=[1])
    22.  
    23. image2 = fluid.layers.data(name='image2', dtype='float32', shape=[784])
    24. label2 = fluid.layers.data(name='label2', dtype='int64', shape=[1])
    25.  
    26. image3 = fluid.layers.data(name='image3', dtype='float32', shape=[784])
    27. label3 = fluid.layers.data(name='label3', dtype='int64', shape=[1])

    对应的PyReader设置如下:

    1. import paddle
    2. import paddle.fluid as fluid
    3.  
    4. ITERABLE = True
    5. USE_CUDA = True
    6. USE_DATA_PARALLEL = True
    7.  
    8. if ITERABLE:
    9. # 若PyReader可迭代,则必须设置places参数
    10. if USE_DATA_PARALLEL:
    11. # 若进行多GPU卡训练,则取所有的CUDAPlace
    12. # 若进行多CPU核训练,则取多个CPUPlace,本例中取了8个CPUPlace
    13. places = fluid.cuda_places() if USE_CUDA else fluid.cpu_places(8)
    14. else:
    15. # 若进行单GPU卡训练,则取单个CUDAPlace,本例中0代表0号GPU卡
    16. # 若进行单CPU核训练,则取单个CPUPlace,本例中1代表1个CPUPlace
    17. places = fluid.cuda_places(0) if USE_CUDA else fluid.cpu_places(1)
    18. else:
    19. # 若PyReader不可迭代,则不需要设置places参数
    20. places = None
    21.  
    22. # 使用sample级的reader作为PyReader的数据源
    23. py_reader1 = fluid.io.PyReader(feed_list=[image1, label1], capacity=10, iterable=ITERABLE)
    24. py_reader1.decorate_sample_generator(fake_sample_reader, batch_size=32, places=places)
    25.  
    26. # 使用sample级的reader + paddle.batch设置PyReader的数据源
    27. py_reader2 = fluid.io.PyReader(feed_list=[image2, label2], capacity=10, iterable=ITERABLE)
    28. sample_list_reader = paddle.batch(fake_sample_reader, batch_size=32)
    29. sample_list_reader = paddle.reader.shuffle(sample_list_reader, buf_size=64) # 还可以进行适当的shuffle
    30. py_reader2.decorate_sample_list_generator(sample_list_reader, places=places)
    31.  
    32. # 使用batch级的reader作为PyReader的数据源
    33. py_reader3 = fluid.io.PyReader(feed_list=[image3, label3], capacity=10, iterable=ITERABLE)
    34. py_reader3.decorate_batch_generator(fake_batch_reader, places=places)

    使用PyReader进行模型训练和测试

    使用PyReader进行模型训练和测试的例程如下。

    • 第一步,我们需组建训练网络和预测网络,并定义相应的PyReader对象,设置好PyReader对象的数据源。
    1. import paddle
    2. import paddle.fluid as fluid
    3. import paddle.dataset.mnist as mnist
    4. import six
    5.  
    6. ITERABLE = True
    7.  
    8. def network():
    9. # 创建数据层对象
    10. image = fluid.layers.data(name='image', dtype='float32', shape=[784])
    11. label = fluid.layers.data(name='label', dtype='int64', shape=[1])
    12.  
    13. # 创建PyReader对象
    14. reader = fluid.io.PyReader(feed_list=[image, label], capacity=64, iterable=ITERABLE)
    15.  
    16. # Here, we omitted the definition of loss of the model
    17. return loss , reader
    18.  
    19. # 创建训练的main_program和startup_program
    20. train_prog = fluid.Program()
    21. train_startup = fluid.Program()
    22.  
    23. # 定义训练网络
    24. with fluid.program_guard(train_prog, train_startup):
    25. # fluid.unique_name.guard() to share parameters with test network
    26. with fluid.unique_name.guard():
    27. train_loss, train_reader = network()
    28. adam = fluid.optimizer.Adam(learning_rate=0.01)
    29. adam.minimize(train_loss)
    30.  
    31. # 创建预测的main_program和startup_program
    32. test_prog = fluid.Program()
    33. test_startup = fluid.Program()
    34.  
    35. # 定义预测网络
    36. with fluid.program_guard(test_prog, test_startup):
    37. # Use fluid.unique_name.guard() to share parameters with train network
    38. with fluid.unique_name.guard():
    39. test_loss, test_reader = network()
    40.  
    41. place = fluid.CUDAPlace(0)
    42. exe = fluid.Executor(place)
    43.  
    44. # 运行startup_program进行初始化
    45. exe.run(train_startup)
    46. exe.run(test_startup)
    47.  
    48. # Compile programs
    49. train_prog = fluid.CompiledProgram(train_prog).with_data_parallel(loss_name=train_loss.name)
    50. test_prog = fluid.CompiledProgram(test_prog).with_data_parallel(share_vars_from=train_prog)
    51.  
    52. # 设置PyReader的数据源
    53. places = fluid.cuda_places() if ITERABLE else None
    54.  
    55. train_reader.decorate_sample_list_generator(
    56. paddle.reader.shuffle(paddle.batch(mnist.train(), 512), buf_size=1024), places=places)
    57.  
    58. test_reader.decorate_sample_list_generator(paddle.batch(mnist.test(), 512), places=places)
    • 第二步:根据PyReader对象是否iterable,选用不同的方式运行网络。若iterable=True,则PyReader对象是一个Python的生成器,可直接for-range迭代。for-range返回的结果通过exe.run的feed参数传入执行器。
    1. def run_iterable(program, exe, loss, py_reader):
    2. for data in py_reader():
    3. loss_value = exe.run(program=program, feed=data, fetch_list=[loss])
    4. print('loss is {}'.format(loss_value))
    5.  
    6. for epoch_id in six.moves.range(10):
    7. run_iterable(train_prog, exe, train_loss, train_reader)
    8. run_iterable(test_prog, exe, test_loss, test_reader)

    若iterable=False,则需在每个epoch开始前,调用 start() 方法启动PyReader对象;并在每个epoch结束时,exe.run会抛出 fluid.core.EOFException 异常,在捕获异常后调用 reset() 方法重置PyReader对象的状态,以便启动下一轮的epoch。iterable=False时无需给exe.run传入feed参数。具体方式为:

    1. def run_non_iterable(program, exe, loss, py_reader):
    2. py_reader.start()
    3. try:
    4. while True:
    5. loss_value = exe.run(program=program, fetch_list=[loss])
    6. print('loss is {}'.format(loss_value))
    7. except fluid.core.EOFException:
    8. print('End of epoch')
    9. py_reader.reset()
    10.  
    11. for epoch_id in six.moves.range(10):
    12. run_non_iterable(train_prog, exe, train_loss, train_reader)
    13. run_non_iterable(test_prog, exe, test_loss, test_reader)