Source code for paddle.fluid.transpiler.memory_optimization_transpiler

#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import print_function

import six
import sys
from collections import defaultdict, MutableSet
from .. import core
from ... import compat as cpt
from ..framework import Program, default_main_program, Parameter, Variable, core
from ..backward import _rename_arg_
from functools import reduce
from six.moves import range

dtype_to_size = {
    core.VarDesc.VarType.FP16: 2,
    core.VarDesc.VarType.FP32: 4,
    core.VarDesc.VarType.FP64: 8,
    core.VarDesc.VarType.INT16: 2,
    core.VarDesc.VarType.INT32: 4,
    core.VarDesc.VarType.INT64: 8,
    core.VarDesc.VarType.BOOL: 1,
    core.VarDesc.VarType.UINT8: 1,

    "while", "while_grad", "conditional_block", "conditional_block_grad"

SUB_BLOCK_PAIR = [("while", "while_grad"),
                  ("conditional_block", "conditional_block_grad")]

FLAGS_memory_optimize = ""

class OrderedSet(MutableSet):
    def __init__(self, iterable=None):
        self.end = end = []
        end += [None, end, end]  # sentinel node for doubly linked list = {}  # key --> [key, prev, next]
        if iterable is not None:
            self |= iterable

    def __len__(self):
        return len(

    def __contains__(self, key):
        return key in

    def add(self, key):
        if key not in
            end = self.end
            curr = end[1]
            curr[2] = end[1] =[key] = [key, curr, end]

    def update(self, other):
        for e in other:

    def discard(self, key):
        if key in
            key, prev, next =
            prev[2] = next
            next[1] = prev

    def remove(self, key):

    def __iter__(self):
        end = self.end
        curr = end[2]
        while curr is not end:
            yield curr[0]
            curr = curr[2]

    def __reversed__(self):
        end = self.end
        curr = end[1]
        while curr is not end:
            yield curr[0]
            curr = curr[1]

    def pop(self, last=True):
        if not self:
            raise KeyError('set is empty')
        key = self.end[1][0] if last else self.end[2][0]
        return key

    def __repr__(self):
        if not self:
            return '%s()' % (self.__class__.__name__, )
        return '%s(%r)' % (self.__class__.__name__, list(self))

    def __eq__(self, other):
        if isinstance(other, OrderedSet):
            return len(self) == len(other) and list(self) == list(other)
        return set(self) == set(other)

class ControlFlowGraph(object):
    def __init__(self, program, ops, forward_num, skip_opt):
        self._program = program
        self._ops = ops
        self._forward_num = forward_num
        self._successors = defaultdict(OrderedSet)
        self._presuccessors = defaultdict(OrderedSet)
        self._uses = defaultdict(OrderedSet)
        self._defs = defaultdict(OrderedSet)
        self._live_in = defaultdict(OrderedSet)
        self._live_out = defaultdict(OrderedSet)

        self._skip_opt = skip_opt
        self.pool = []

    def _add_connections(self, connections):
        """Populates _successors and _presuccessors for two neighbor nodes."""
        for node1, node2 in connections:
            self._add(node1, node2)

    def _add(self, node1, node2):

    # TODO(panyx0718): We need to have a unified way of building intermediate
    # representation.
    def _build_graph(self):
        """Build a graph based on op sequence.
        self.op_size = len(self._ops)
        op_node_connections = [(i, i + 1) for i in range(self.op_size - 1)]
        for i in range(self.op_size):

    def _update_graph(self, old_name, new_name, begin_idx=0):
        for i in range(begin_idx, self.op_size):
            if old_name in self._uses[i]:
            if old_name in self._defs[i]:
            if old_name in self._live_in[i]:
            if old_name in self._live_out[i]:

    def _dataflow_analyze(self):
        live_in = defaultdict(set)
        worklist = list(range(len(self._ops) - 1, -1, -1))
        while worklist:
            i = worklist.pop(0)
            live_in[i] = set(self._live_in[i])
            for s in self._successors[i]:
                self._live_out[i] |= self._live_in[s]
            self._live_in[i] = self._uses[i] | (
                self._live_out[i] - self._defs[i])
            if live_in[i] != set(self._live_in[i]):
                for d in self._presuccessors[i]:

    def _fill_pool(self, i, is_forward):
        def comparator(x, cache):
            x_shape = x[1]
            cache_shape = cache[1]
            x_size = abs(reduce(lambda x, y: x * y, x_shape))
            cache_size = abs(reduce(lambda x, y: x * y, cache_shape))
            if (x_shape[0] == -1 and cache_shape[0] == -1) or \
               (x_shape[0] != -1 and cache_shape[0] != -1) :
                return x_size <= cache_size
                return False

        def find_var_in_block(x):
            known_vars = set()
            for op in self._ops:
            return x in known_vars

        block_desc = self._ops[i].block()
        in_diff, _ = self._get_diff(self._live_in[i], self._live_out[i])
        # NOTE: must sort the in_diff set for cases that get different cache var.
        # FIXME(typhoonzero): maybe use a "sorted set" is better than this.
        can_optimize = [
            x for x in sorted(in_diff)
            if self._check_var_validity(block_desc, x, is_forward)
        if can_optimize:
            for var_name in can_optimize:
                cache = (var_name, self._find_var(block_desc, var_name,
                if cache not in self.pool and find_var_in_block(var_name):
                    i = 0
                    while i < len(self.pool):
                        mycache = self.pool[i]
                        mysize = mycache[1][0]
                        cache_size = cache[1][0]
                        if (mysize == -1 and cache_size == -1) or \
                           (mysize != -1 and cache_size != -1):
                            if comparator(mycache, cache):
                                i += 1
                        elif mysize == -1 and cache_size != -1:
                            i += 1
                        elif mysize != -1 and cache_size == -1:
                    self.pool.insert(i, cache)

    def _get_diff(self, a, b):
        u = a & b
        return a - u, b - u

    def _has_var(self, block_desc, var_name, is_forward):
        if is_forward:
            return block_desc.has_var(cpt.to_bytes(var_name))
            return block_desc.has_var_recursive(cpt.to_bytes(var_name))

    def _find_var(self, block_desc, var_name, is_forward):
        if is_forward:
            return block_desc.find_var(cpt.to_bytes(var_name))
            return block_desc.find_var_recursive(cpt.to_bytes(var_name))

    def _check_var_validity(self, block_desc, x, is_forward):
        if str(x) == "@EMPTY@":
            return False
        if not self._has_var(block_desc, x, is_forward):
            return False
        if self._find_var(block_desc, x, is_forward).persistable():
            return False
        if self._find_var(block_desc, x,
                          is_forward).type() != core.VarDesc.VarType.LOD_TENSOR:
            return False
        if x in self._skip_opt:
            return False
        if not self._find_var(block_desc, x, is_forward).shape():
            return False
        return True

    # TODO(panyx0718): This needs to be less hacky. It seems memory optimization
    # doesn't consider vars copied between cpu and gpu.
    def _update_skip_opt_set(self):
        for i in range(self.op_size):
            op = self._ops[i]
            if op.has_attr("force_cpu") and op.attr("force_cpu") == True:

    def release_memory(self, skip_opt_set=None):
        if skip_opt_set:
        fwd_id = 0
        bwd_id = 0
        for i in range(self.op_size):
            op = self._ops[i]
            if op.type() in SUB_BLOCK_OPS:
            block_desc = op.block()
            is_forward = i < self._forward_num
            in_diff, out_diff = self._get_diff(self._live_in[i],
            can_optimize = [
                x for x in in_diff
                if self._check_var_validity(block_desc, x, is_forward)
            if can_optimize:
                index = i + fwd_id + 1 if is_forward else i - self._forward_num + bwd_id + 1
                delete_op = block_desc._insert_op(index)
                delete_op.set_input("X", can_optimize)
                if is_forward:
                    fwd_id += 1
                    bwd_id += 1

    def memory_optimize(self, skip_opt_set=None, level=0):
        def compare_shape(x_shape, cache_shape, opt_level):
            if opt_level == 0:
                return x_shape == cache_shape
            elif opt_level == 1:
                if (x_shape[0] == -1) ^ (cache_shape[0] == -1):
                    return False
                x_size = abs(reduce(lambda x, y: x * y, x_shape))
                cache_size = abs(reduce(lambda x, y: x * y, cache_shape))
                if x_size <= cache_size:
                    return True
                raise ValueError("only support opt_level 0 or 1.")
            return False

        # update skip set to meet users' demand
        if skip_opt_set:
        counter = 0
        for i in range(self.op_size):
            op = self._ops[i]
            if op.type() in SUB_BLOCK_OPS:
            block_desc = op.block()
            is_forward = i < self._forward_num
            if self.pool:
                # NOTE: must sort the in_diff set for cases that get different cache var.
                defs_can_optimize = [
                    x for x in self._defs[i]
                    if self._check_var_validity(block_desc, x, is_forward)
                out_pair = [
                    (x, self._find_var(block_desc, x, is_forward).shape())
                    for x in defs_can_optimize
                for x, x_shape in out_pair:
                    # If x is both in uses and defs, it can not be optimized!
                    if x in self._uses[i]:
                    if x == FLAGS_memory_optimize:
                        print("start match var ", x, " of op ", op.type())
                    for index, cache_pair in enumerate(self.pool):
                        cache_var = cache_pair[0]
                        cache_shape = cache_pair[1]
                        if not self._has_var(block_desc, cache_var, is_forward):
                            if PRINT_LOG:
                                print("cache %s not exists!" %
                        if x == cache_var:
                            if PRINT_LOG:
                                print("x : ", cpt.to_text(x), " cache : ",
                                      cpt.to_text(cache_var), " is same var!")

                        x_dtype = self._find_var(block_desc, x,
                        cache_dtype = self._find_var(block_desc, cache_var,
                        if x_dtype != cache_dtype:
                            if PRINT_LOG:
                                print("x_dtype and cache_dtype are different")

                        if not compare_shape(x_shape, cache_shape, level):
                        # TODO(qijun): dtype_to_size[x_dtype] and dtype_to_size[cache_dtype]
                        if PRINT_LOG:
                                ("!!! %d,  %s => %s, cache idx %d, pool size %d"
                                 % (counter, x + str(x_shape),
                                    cache_var + str(cache_shape), index,
                            counter += 1
                        # Rename the var to the cache var already with
                        # memory allocated in order to reuse the memory.
                        _rename_arg_(self._ops, x, cache_var, begin_idx=i)
                            x)).desc = self._find_var(block_desc, cache_var,
                        self._program.block([cpt.to_text(x)] = \
                            Variable(self._program.block(, name=cpt.to_text(x))
                        self._update_graph(x, cache_var, begin_idx=i)
            self._fill_pool(i, is_forward)

def _process_sub_block_pair(pdesc, sub_block_pair):
    """Creates a list of tuple each of which tracks info of a subblock.

      Note: this function doesn't handle nested subblocks yet.
      TODO(panyx0718): assert if case nested subblocks happen.

    :param pdesc: ProgramDesc.
    :param sub_block_pair: A list op pairs. Each op pair is the forward
        op and backward op. The ops in the list are special that they contain
        a subblock of ops.
    :return: A list of tuples, each tuple is (all ops in a subblock pair
        including forward and backward, number of forward ops,
        all output args names of the ops in the subblock pairs).
    ops_list = []
    block_desc = pdesc.block(0)
    op_size = block_desc.op_size()
    for fwd_op, bwd_op in sub_block_pair:
        sub_block_ids = []
        grad_sub_block_ids = []
        sub_block_id_pair = []
        sub_op_dict = {}
        for i in range(op_size):
            op = block_desc.op(i)
            if op.type() == fwd_op:
                sub_op_dict[op.attr("sub_block").id] = op
            elif op.type() == bwd_op:
                sub_op_dict[op.attr("sub_block").id] = op

        # Find fwd_op/bwd_op block pair
        for grad_id in grad_sub_block_ids:
            fwd_id = pdesc.block(grad_id).get_forward_block_idx()
            if fwd_id in sub_block_ids:
                sub_block_id_pair.append((fwd_id, grad_id))

        # Get fwd_op/bwd_op block ops
        for fwd_id, grad_id in sub_block_id_pair:
            sub_block_ops = []
            sub_block = pdesc.block(fwd_id)
            block_op_size = sub_block.op_size()
            for i in range(block_op_size):

            grad_sub_block = pdesc.block(grad_id)
            grad_sub_block_op_size = grad_sub_block.op_size()
            for i in range(grad_sub_block_op_size):

            sub_op_output = set()
            ops_list.append((sub_block_ops, block_op_size, sub_op_output))

        # Process rest fwd_op block ops
        for fwd_id in sub_block_ids:
            sub_block_ops = []
            sub_block = pdesc.block(fwd_id)
            sub_block_op_size = sub_block.op_size()
            for i in range(sub_block_op_size):
            sub_op_output = set()
            ops_list.append((sub_block_ops, sub_block_op_size, sub_op_output))
    return ops_list

def _get_cfgs(input_program):
    """Process each block and create ControlFlowGraph for each of them.

    :param input_program: Program object.
    :return: A list of ControlFlowGraph, each corresponds to a block.
    ops_list = []
    pdesc = input_program._get_desc()
    block_desc = pdesc.block(0)
    op_size = block_desc.op_size()

    # Only process one level of nested subblock.
    ops_list.extend(_process_sub_block_pair(pdesc, SUB_BLOCK_PAIR))

    skip_opt_set = set()
    for _, _, skip_opt in ops_list:

    # Get global block ops
        0, ([block_desc.op(i) for i in range(op_size)], op_size, skip_opt_set))
    cfgs = [
        ControlFlowGraph(input_program, ops, forward_num, skip_opt)
        for ops, forward_num, skip_opt in ops_list
    return cfgs

def _is_opt_role_op(op):
    op_maker = core.op_proto_and_checker_maker
    optimize_role = core.op_proto_and_checker_maker.OpRole.Optimize
    if op_maker.kOpRoleAttrName() in op.attr_names and \
            int(op.all_attrs()[op_maker.kOpRoleAttrName()]) == int(optimize_role):
        return True

[docs]def memory_optimize(input_program, skip_opt_set=None, print_log=False, level=0, skip_grads=True): """ | Legacy memory optimization strategy, reduce total memory consumption by reuse variable memory between different operators. | Simple sample to explain the algorithm: .. code-block:: python c = a + b # assume this is the last time a is used d = b * c | since **a** will not be used anymore after **"c = a + b"**, and the size of **a** and **d** are the same, we can use variable **a** to replace variable **d**, so actually we can optimize the above code to below: .. code-block:: python c = a + b a = b * c | Please notice that, in this legacy design, we are using variable **a** to replace **d** directly, which means after you call this API, some variables may disappear, and some variables may hold unexpected values, like the above case, actually **a** holds the value of **d** after execution. | So to protect important variables from being reused/removed in the optimization, we provide skip_opt_set to allow you specify a variable whitelist. The variables in the skip_opt_set will not be affected by memory_optimize API. Note: | **This API is deprecated, please avoid to use it in your new code.** | Does not support operators which will create sub-block like While, IfElse etc. Args: input_program(str): Input Program skip_opt_set(set): vars wil be skipped in memory optimze print_log(bool): whether to print debug log. level(int): 0 or 1, 0 means we replace a with b only when a.size == b.size, 1 means we can replace a with b if a.size <= b.size Returns: None Examples: .. code-block:: python import paddle.fluid as fluid main_prog = fluid.Program() startup_prog = fluid.Program() place = fluid.CPUPlace() exe = fluid.Executor(place) fluid.memory_optimize(main_prog) """ sys.stderr.write('memory_optimize is deprecated. ' 'Use CompiledProgram and Executor\n') def to_name_str(var): if isinstance(var, Variable): return elif isinstance(var, str): return var elif isinstance(var, six.string_types): return str(var) else: raise TypeError(str(var) + " should be Variable or str") if level != 0 and level != 1: raise ValueError("only support opt_level 0 or 1.") if skip_opt_set is not None: if isinstance(skip_opt_set, set) or isinstance(skip_opt_set, list): skip_opt_set = set(skip_opt_set) else: raise ValueError("only support skip_opt_set as set.") global PRINT_LOG PRINT_LOG = print_log if skip_grads: grad_set = set() OP_ROLE_VAR = core.op_proto_and_checker_maker.kOpRoleVarAttrName() for op in input_program.global_block().ops: if _is_opt_role_op(op): if op.attr(OP_ROLE_VAR): grad_name = op.attr(OP_ROLE_VAR)[1] grad_set.add(grad_name) if not skip_opt_set: skip_opt_set = grad_set else: skip_opt_set.update(grad_set) if skip_opt_set is not None: skip_opt_set = set(map(to_name_str, skip_opt_set)) cfgs = _get_cfgs(input_program) input_program._is_mem_optimized = True for cfg in cfgs: cfg.memory_optimize(skip_opt_set=skip_opt_set, level=level)
[docs]def release_memory(input_program, skip_opt_set=None): """ Modify the input program and insert :code:`delete_op` to early drop not used variables. The modification will be performed inplace. Notes: This is an experimental API and could be removed in next few releases. Users should not use this API. Args: input_program(Program): The program will be inserted :code:`delete_op`. skip_opt_set(set): vars wil be skipped in memory optimze Returns: None Examples: .. code-block:: python import paddle.fluid as fluid # build network # ... # deprecated API fluid.release_memory(fluid.default_main_program()) """ cfgs = _get_cfgs(input_program) input_program._is_mem_optimized = True for cfg in cfgs: cfg.release_memory(skip_opt_set=skip_opt_set)