Detailed instruction for using distributed inference in LiBai¶
If you want to using distributed inference in LiBai from pretrained pytorch
model, you can refer to DALLE2 inferecn doc. And Chinese doc for distributed inference is also available.
Here we introduce how to use distributed infenrence in LiBai:
Check model.py
¶
check your model.py
first:
Ensure There are
libai.layers
in yourmodel.py
:# NOTE: you don't need to import all layers from libai, if you only use libai.layers.Linear # in your `model.py`, you model will run model/pipeline parallel only in `Linear` layers from libai.layers import ( Linear, LayerNorm, ... )
If you want to run pipeline parallel in LiBai, you should additionally insert code
x = x.to_global(placement=target_tensor.placement)
in yourmodel.forward()
. It is equal to torch codex.to(cuda_device)
, which move tensor from gpuA to gpuB. There are many examples in LiBai: example1, example2 …If you don’t know where to insert code, you can run your code first, and the it will raise bug in the line which needed
to_global
. for example:File "libai/libai/layers/layer_norm.py", line 129, in forward return flow._C.rms_layer_norm(hidden_states, self.weight, self.l2norm_epsilon) RuntimeError: return flow._C.rms_layer_norm(hidden_states, self.weight, self.l2norm_epsilon)RuntimeErrorExpected all tensors to be on the same placement, but found at least two placements, oneflow.placement(type="cuda", ranks=[0, 1]) (positional 0) and oneflow.placement(type="cuda", ranks=[2, 3]) (positional 1)!
Build config.py
¶
If your model is Trained from LiBai, you can use the same config.py
from training. refer to Couplets for more details
If your model is Trainer from other framework, you should build your own inference_config.py
, you can refer to dalle2_config.py
and t5_inference_config.py
Refine pipeline_inference.py
¶
The base class libai/inference/basic.py is already provided in LiBai
,
Users only need to overload the functions they need. refer to text_generation.py
If your model is trained from LiBai
, it will be easy to use, you can refer to distribute_infer.py in Couplets
If your model is trained from other framework, you need to build your own model_loader
to load your model weights in LiBai, refer to model_loader for more details
Give a simple example, the function overloaded in LiBai
:
from libai.inference.basic import BasePipeline
from libai.utils import distributed as dist
class MyPipeline(BasePipeline):
def _parse_parameters(self, **pipeline_parameters):
# By overloading this function, the input parameters in MyPipeline.__call__() hand out to preprocess/forward/postprocess stages of inference.
preprocess_params = {
"preprocess_param1": pipeline_parameters["preprocess_param1"],
"preprocess_param2": pipeline_parameters["preprocess_param2"],
}
forward_params = {
"forward_param": pipeline_parameters["forward_param"]
}
postprocess_params = {
"postprocess_param": pipeline_parameters["postprocess_param"]
}
return preprocess_params, forward_params, postprocess_params
def load_pretrain_weight(self, libai_cfg_model, model_path, mode="myloader"):
# load your pretrain weight in this functor
# set your own "MyLoader" if your model is pretrained from other framework
# set mode to "libai" if your model is pretrained from libai
if mode == "myloader":
import MyLoader
model_loader = MyLoader(
libai_cfg_model,
libai_cfg_model.cfg,
model_path,
...,
)
return model_loader.load()
else:
return super().load_pretrain_weight(
libai_cfg_model,
model_path,
mode=mode,
)
def preprocess(self, inputs, preprocess_param1, preprocess_param2, **kwargs):
...
# model_input_dict: {"key1": flow.Tensor1, ...}
return model_input_dict
def forward(self, model_input_dict, forward_param, **kwargs):
...
model_output_dict = self.model(**model_input_dict)
return model_output_dict
def postprocess(self, model_output_dict, postprocess_param, **kwargs):
...
return out_dict
if __name__ == "__main__":
pipeline = MyPipeline(
"path/to/myconfig.py",
data_parallel=1,
tensor_parallel=...,
pipeline_parallel=...,
pipeline_stage_id=...,
pipeline_num_layers=...,
model_path=...,
mode=...,
)
out = pipeline(
input_text=...,
preprocess_param1=...,
preprocess_param2=...,
forward_param=...,
postprocess_param=...,
)
if dist.is_main_process():
print(out)
Distributed run pipeline_inference.py
¶
To run model on 2 nodes with total 4 GPUs,
in node0
, run:
NODE=2 NODE_RANK=1 ADDR=192.168.0.1 PORT=12345 bash tools/infer.sh pipeline_inference.py 2
NODE=2
means total number of nodes
NODE_RANK=0
means current node is node0
ADDR=192.168.0.0
means the ip address of node0
PORT=12345
means the port of node0
in node1
, run:
NODE=2 NODE_RANK=1 ADDR=192.168.0.1 PORT=12345 bash tools/infer.sh pipeline_inference.py 2
NODE=2
means total number of nodes
NODE_RANK=1
means current node is node1
ADDR=192.168.0.0
means the ip address of node0
PORT=12345
means the port of node0