# coding=utf-8
# Copyright 2021 The OneFlow Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import dill
import numpy as np
import oneflow as flow
from omegaconf import OmegaConf
from libai.config import try_get_key
logger = logging.getLogger(__name__)
_DIST_UTIL = None
def _merge_devices(devices):
num_gpus_per_node = get_world_size() // get_num_nodes()
node_devices = [node_id * num_gpus_per_node + device_id for node_id, device_id in devices]
return node_devices
class _DistributeUtil(object):
def __init__(self, cfg):
self._init_distributed_env(cfg)
self._init_parallel_size(cfg)
self._init_placement_group(cfg)
self._init_parallel_hierarchy()
def _init_distributed_env(self, cfg):
"""Initialize the distributed environment."""
num_nodes = get_num_nodes()
num_gpus_per_node = get_world_size() // num_nodes
if try_get_key(cfg, "num_gpus_per_node", default=num_gpus_per_node) != num_gpus_per_node:
# This means key(num_gpus_per_node) saved in config is not equal
# to environment variable.
# Give user a warning about inconsistent reproduce environment.
logger.warning(
"'train.dist.num_gpus_per_node' are not equal to environment variable. "
f"{cfg.num_gpus_per_node} != {num_gpus_per_node}"
)
if try_get_key(cfg, "num_nodes", default=num_nodes) != num_nodes:
logger.warning(
"'train.dist.num_nodes' are not equal to"
f"environment variable. {cfg.num_nodes} != {num_nodes}"
)
# Set the actual value to config
cfg.num_nodes = num_nodes
cfg.num_gpus_per_node = num_gpus_per_node
self._num_nodes = num_nodes
self._num_gpus_per_node = num_gpus_per_node
self._world_size = num_gpus_per_node * num_nodes
# Add set device type
self._device_type = try_get_key(cfg, "device_type", default="cuda")
def _init_parallel_size(self, cfg):
# tensor parallel size
self._tensor_parallel_size = min(cfg.tensor_parallel_size, self.world_size)
assert self.world_size % self._tensor_parallel_size == 0, (
f"world size ({self.world_size}) is not divisible by"
f" tensor parallel size ({self._tensor_parallel_size})"
)
# Set the actual tensor parallel size to cfg
cfg.tensor_parallel_size = self._tensor_parallel_size
# pipeline parallel size
self._pipeline_parallel_size = min(
cfg.pipeline_parallel_size, self.world_size // cfg.tensor_parallel_size
)
# Set the actual pipeline parallel size to cfg
cfg.pipeline_parallel_size = self._pipeline_parallel_size
if cfg.pipeline_parallel_size > 1:
assert (
try_get_key(cfg, "pipeline_num_layers") is not None
), "cfg.train.dist.pipeline_num_layers must be set when run pipeline parallel"
assert cfg.pipeline_num_layers >= self._pipeline_parallel_size, (
f"number of layers ({cfg.pipeline_num_layers}) is less than"
f" pipeline model parallel size ({self._pipeline_parallel_size})"
)
if try_get_key(cfg, "custom_pipeline_stage_id") is not None:
assert OmegaConf.is_list(
cfg.custom_pipeline_stage_id
), "type of cfg.train.dist.custom_pipeline_stage_id must be list"
cfg.custom_pipeline_stage_id = list(cfg.custom_pipeline_stage_id)
assert max(cfg.custom_pipeline_stage_id) < self._world_size, (
f"the element {max(cfg.custom_pipeline_stage_id)} in"
" cfg.train.dist.custom_pipeline_stage_id is out of range"
f" for total rank {self._world_size}"
)
assert len(cfg.custom_pipeline_stage_id) == cfg.pipeline_num_layers, (
"the length of cfg.train.dist.custom_pipeline_stage_id"
f" {len(cfg.custom_pipeline_stage_id)} must be equal to"
" cfg.train.dist.pipeline_num_layers"
f" {cfg.train.dist.pipeline_num_layers}"
)
else:
# no pipeline parallel, just set 10000
if try_get_key(cfg, "pipeline_num_layers") is None:
cfg.pipeline_num_layers = 10000
self._model_parallel_size = self._pipeline_parallel_size * self._tensor_parallel_size
assert self.world_size % self._model_parallel_size == 0, (
f"world size ({self.world_size}) is not divisible by"
f" tensor model parallel size ({self._tensor_parallel_size}) times"
f" pipeline model parallel size ({self._pipeline_parallel_size})"
)
# data parallel size
self._data_parallel_size = self.world_size // self._model_parallel_size
# Set the actual data parallel size to cfg
cfg.data_parallel_size = self._data_parallel_size
def _init_placement_group(self, cfg):
node_ids = [i // self.num_gpus_per_node for i in range(self.world_size)]
device_ids = list(range(self.num_gpus_per_node)) * self.num_nodes
# [(0, 0), (0, 1), (0, 2), (0, 3), (1, 0), (1, 1), (1, 2), (1, 3)]
devices = [(n, d) for n, d in zip(node_ids, device_ids)]
num_devices_per_stage = self.world_size // self._pipeline_parallel_size
stages_devices = [
_merge_devices(devices[i : (i + num_devices_per_stage)])
for i in range(0, self.world_size, num_devices_per_stage)
]
# change pipeline_num_layers to make the middle stages contain more layers
if (
self._pipeline_parallel_size >= 4
and cfg.pipeline_num_layers >= 8
and cfg.pipeline_num_layers % self._pipeline_parallel_size == 0
):
temp_num_layers_per_stage = cfg.pipeline_num_layers // self._pipeline_parallel_size
actual_pipeline_num_layers = cfg.pipeline_num_layers + min(
self._pipeline_parallel_size - 1, temp_num_layers_per_stage
)
else:
actual_pipeline_num_layers = cfg.pipeline_num_layers
num_layers_per_stage = actual_pipeline_num_layers // self._pipeline_parallel_size
stage_offset = actual_pipeline_num_layers % self._pipeline_parallel_size
# stage_offset can make the later stages contain more layers when pipeline_num_layers
# cannot be divided by pipeline_parallel_size.
# This can make pipeline parallel more memory efficient.
self._layer_stage_ids = []
for i in range(0, actual_pipeline_num_layers - stage_offset, num_layers_per_stage):
stage_id = i // num_layers_per_stage
if stage_id >= (self._pipeline_parallel_size - stage_offset):
self._layer_stage_ids.append(stage_id)
self._layer_stage_ids.extend([stage_id] * num_layers_per_stage)
self._layer_stage_ids = self._layer_stage_ids[: cfg.pipeline_num_layers]
# when pipeline_parallel_size > 1, we add pipeline_stage_id infomation into cfg
if cfg.pipeline_parallel_size > 1:
cfg.auto_pipeline_stage_id = self._layer_stage_ids
# set pipeline_stage_id by users' setting
if try_get_key(cfg, "custom_pipeline_stage_id") is not None:
self._layer_stage_ids = cfg.custom_pipeline_stage_id
cfg.actual_pipeline_stage_id = self._layer_stage_ids
self._layer_ranks = [stages_devices[stage_id] for stage_id in self._layer_stage_ids]
def _init_parallel_hierarchy(self):
if self.is_data_model_parallel():
self._parallel_hierarchy = (
self._data_parallel_size,
self._tensor_parallel_size,
)
else:
self._parallel_hierarchy = None
@property
def num_nodes(self):
return self._num_nodes
@property
def num_gpus_per_node(self):
return self._num_gpus_per_node
@property
def world_size(self):
return self._world_size
@property
def parallel_hierarchy(self):
return self._parallel_hierarchy
@property
def tensor_parallel_size(self):
return self._tensor_parallel_size
@property
def pipeline_parallel_size(self):
return self._pipeline_parallel_size
@property
def model_parallel_size(self):
return self._tensor_parallel_size
@property
def data_parallel_size(self):
return self._data_parallel_size
@property
def device_type(self):
return self._device_type
def set_device_type(self, device_type):
assert device_type in ["cpu", "cuda"], f"not supported for {device_type}"
self._device_type = device_type
def get_layer_ranks(self, layer_idx):
layer_ranks = self._layer_ranks[layer_idx]
if self._parallel_hierarchy is None:
return layer_ranks
else:
assert len(self._parallel_hierarchy) == 2
return np.asarray(layer_ranks).reshape(self._parallel_hierarchy).tolist()
def get_layer_stage_id(self, layer_idx):
return self._layer_stage_ids[layer_idx]
def is_tensor_model_parallel(self):
return self._tensor_parallel_size > 1
def is_data_parallel(self):
return self._data_parallel_size > 1
def is_pipeline_model_parallel(self):
return self._pipeline_parallel_size > 1
def is_data_model_parallel(self):
return self.is_tensor_model_parallel() and self.is_data_parallel()
[docs]def setup_dist_util(cfg):
"""Initialize the distributed environment with configuration.
Example:
.. code-block:: python
from omegaconf import DictConfig
# set the hybrid parallel distributed environment with 2D mesh GPUs
setup_dist_util(
DictConfig(
dict(
data_parallel_size=2,
tensor_parallel_size=2,
pipeline_parallel_size=1,
)
)
)
"""
global _DIST_UTIL
_DIST_UTIL = _DistributeUtil(cfg)
[docs]def get_dist_util():
"""Get distributed utils if it's been setup. Otherwise, initialize it with
single node/single gpu environment."""
global _DIST_UTIL
if _DIST_UTIL is None:
logger.warning(
"Distributed env is not set up, configure it by default (single node, single gpu)."
)
from omegaconf import DictConfig
setup_dist_util(
DictConfig(
dict(
data_parallel_size=1,
tensor_parallel_size=1,
pipeline_parallel_size=1,
)
)
)
return _DIST_UTIL
[docs]def get_layer_placement(layer_idx, device_type=None):
"""
Get ``flow.placement`` object with the initialized distributed environment
according to the ``layer_idx``.
Args:
layer_idx (int): layer index indicating the rank groups. This is very useful for pipeline
parallelism training where different layers are on different ranks.
device_type (str, optional): device type. Defaults to "cuda".
"""
dist_util = get_dist_util()
device_type = dist_util.device_type if device_type is None else device_type
if not flow.cuda.is_available() and device_type == "cuda":
device_type = "cpu"
return flow.placement(
device_type,
dist_util.get_layer_ranks(layer_idx),
)
[docs]def get_nd_sbp(sbp_list):
"""Get nd sbp signature list, which is consistent with 1D/2D mesh GPUs.
Args:
sbp_list (list): a sbp list with 2D mesh.
Returns:
A modified sbp list according to the initialized distributed environment.
"""
assert isinstance(sbp_list, list)
assert len(sbp_list) == 2
assert all(isinstance(sbp, flow.sbp.sbp) for sbp in sbp_list)
dist_util = get_dist_util()
if dist_util.is_data_model_parallel():
return sbp_list
elif dist_util.is_data_parallel():
return sbp_list[:1]
elif dist_util.is_tensor_model_parallel():
return sbp_list[1:]
else:
return [flow.sbp.broadcast]
[docs]def get_hidden_sbp():
"""Hidden states sbp."""
return get_nd_sbp([flow.sbp.split(0), flow.sbp.broadcast])
def get_data_parallel_rank():
dist_util = get_dist_util()
return (flow.env.get_rank() // dist_util.model_parallel_size) % dist_util.data_parallel_size
def get_data_parallel_size():
dist_util = get_dist_util()
return dist_util.data_parallel_size
def get_tensor_parallel_size():
dist_util = get_dist_util()
return dist_util.tensor_parallel_size
def get_pipeline_parallel_size():
dist_util = get_dist_util()
return dist_util.pipeline_parallel_size
[docs]def same_sbp(lhs_sbp, rhs_sbp):
"""Determine if two sbp signatures are the same."""
assert len(lhs_sbp) == len(rhs_sbp)
for i in range(len(lhs_sbp)):
if lhs_sbp[i] != rhs_sbp[i]:
return False
return True
def get_rank() -> int:
return flow.env.get_rank()
def get_local_rank() -> int:
return flow.env.get_local_rank()
def is_main_process() -> bool:
return get_rank() == 0
def is_last_process() -> bool:
return get_rank() == get_world_size() - 1
def get_world_size():
return flow.env.get_world_size()
def get_num_nodes():
return flow.env.get_node_size()
def set_device_type(device_type):
dist_util = get_dist_util()
dist_util.set_device_type(device_type)
def broadcast_py_object(obj, src: int = 0):
rank = flow.env.get_rank()
if src == rank:
obj_bytes = dill.dumps(obj)
return dill.loads(flow._oneflow_internal.cpu_broadcast(obj_bytes, src))
else:
return dill.loads(flow._oneflow_internal.cpu_broadcast(None, src))
[docs]def convert_to_distributed_default_setting(t):
"""
Helper function to convert all eager local tensor in :attr:`nn.Module` in the model to
global tensor with data parallelism as default.
"""
if not t.is_global:
return t.to_global(
sbp=get_nd_sbp([flow.sbp.broadcast, flow.sbp.broadcast]),
placement=get_layer_placement(0),
)
else:
dist_util = get_dist_util()
device_type = dist_util.device_type
return t.to_global(placement=flow.placement(device_type, ranks=t.placement.ranks))
[docs]def ttol(tensor, pure_local=False, ranks=None):
"""Global tensor to local tensor."""
if tensor.is_global:
placement = tensor.placement if not ranks else flow.placement("cuda", ranks)
if pure_local:
tensor = tensor.to_global(placement=placement).to_local()
else:
tensor = tensor.to_global(
sbp=get_nd_sbp([flow.sbp.broadcast, flow.sbp.broadcast]), placement=placement
).to_local()
return tensor
[docs]def tton(tensor, local_only=False, ranks=None):
"""Global tensor to numpy ndarray."""
if tensor.is_global:
tensor = ttol(tensor, local_only, ranks)
return tensor.numpy()
def tensor_to_rank0(tensor, device="cuda", to_local=False):
"""Global tensor to rank0."""
assert device in ["cpu", "cuda"], f"not supported for device:{device}"
if tensor.is_global:
# Consider if it's 2d mesh, ranks should be [[0]] instead of [0]
placement = flow.placement(device, ranks=[0] if tensor.placement.ranks.ndim == 1 else [[0]])
tensor = tensor.to_global(
sbp=get_nd_sbp([flow.sbp.broadcast, flow.sbp.broadcast]), placement=placement
)
if to_local:
tensor = ttol(tensor)
return tensor
[docs]def synchronize():
"""
Helper function to synchronize (barrier) among all processes when
using distributed training.
"""
world_size = get_world_size()
if world_size == 1:
return
flow.comm.barrier()