Parallel Executor

ParallelExecutor is an executor that executes Program separately on multiple nodes in a data-parallelism manner. Users can use the Python script to run ParallelExecutor. The execution process of ParallelExecutor is as follows:

  • First it builds SSA Graph and a thread pool based on Program, the number of GPU cards (or CPU cores) and BuildStrategy ;
  • During execution, it executes the Op depending on whether the input of Op is ready, so that multiple Ops that do not depend on each other can be executed in parallel in the thread pool;

When constructing ParallelExecutor, you need to specify the device type of the current Program, namely GPU or CPU :

  • execution on GPU : ParallelExecutor will automatically detect the number of currently available GPU s, and execute Program on each GPU . The user can also specify the GPU that the executor can use by setting the CUDA_VISIBLE_DEVICES environment variable;
  • execution on multi-threaded CPU : ParallelExecutor will automatically detect the number of currently available CPU s, and take it as the number of threads in the executor . Each thread executes Program separately. The user can also specify the number of threads currently used for training by setting the CPU_NUM environment variable.

ParallelExecutor supports model training and model prediction:

  • Model training: ParallelExecutor aggregates the parameter gradients on multiple nodes during the execution process, and then updates the parameters;
  • Model prediction: during the execution of ParallelExecutor, each node runs the current Program independently;

ParallelExecutor supports two modes of gradient aggregation during model training, AllReduce and Reduce :

  • In AllReduce mode, ParallelExecutor calls AllReduce operation to make the parameter gradients on multiple nodes completely equal, and then each node independently updates the parameters;
  • In Reduce mode, ParallelExecutor will pre-allocate updates of all parameters to different nodes. During the execution ParallelExecutor calls Reduce operation to aggregate parameter gradients on the pre-specified node, and the parameters are updated. Finally, the Broadcast operation is called to send the updated parameters to other nodes.

These two modes are specified by build_strategy. For how to use them, please refer to BuildStrategy .

Note: If you use CPU to execute Program in multi-thread in Reduce mode, the parameters of Program will be shared among multiple threads. On some models , Reduce mode can save a lot of memory.

Since the execution speed of the model is related to the model structure and the execution strategy of the executor, ParallelExecutor allows you to modify the relevant parameters of the executor, such as the size of thread pool ( num_threads ), how many iterations should be done to clean up temporary variables num_iteration_per_drop_scope . For more information, please refer to ExecutionStrategy.

# Note:
#   - If you want to specify the GPU cards which are used to run
#     in ParallelExecutor, you should define the CUDA_VISIBLE_DEVICES
#     in environment.
#   - If you want to use multi CPU to run the program in ParallelExecutor,
#     you should define the CPU_NUM in the environment.

# First create the Executor.
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
exe = fluid.Executor(place)

# Run the startup program once and only once.
exe.run(fluid.default_startup_program())

# Define train_exe and test_exe
exec_strategy = fluid.ExecutionStrategy()
exec_strategy.num_threads = dev_count * 4 # the size of thread pool.
build_strategy = fluid.BuildStrategy()
build_strategy.memory_optimize = True if memory_opt else False

train_exe = fluid.ParallelExecutor(use_cuda=use_cuda,
                                   main_program=train_program,
                                   build_strategy=build_strategy,
                                   exec_strategy=exec_strategy,
                                   loss_name=loss.name)
# NOTE: loss_name is unnecessary for test_exe.
test_exe = fluid.ParallelExecutor(use_cuda=True,
                                  main_program=test_program,
                                  build_strategy=build_strategy,
                                  exec_strategy=exec_strategy,
                                  share_vars_from=train_exe)

train_loss, = train_exe.run(fetch_list=[loss.name], feed=feed_dict)
test_loss, = test_exe.run(fetch_list=[loss.name], feed=feed_dict)
  • Related API :