libai.utils

libai.utils.distributed module

libai.utils.distributed.convert_to_distributed_default_setting(t)[source]

Helper function to convert all eager local tensor in nn.Module in the model to global tensor with data parallelism as default.

libai.utils.distributed.get_dist_util()[source]

Get distributed utils if it’s been setup. Otherwise, initialize it with single node/single gpu environment.

libai.utils.distributed.get_hidden_sbp()[source]

Hidden states sbp.

libai.utils.distributed.get_layer_placement(layer_idx, device_type=None)[source]

Get flow.placement object with the initialized distributed environment according to the layer_idx.

Parameters
  • layer_idx (int) – layer index indicating the rank groups. This is very useful for pipeline parallelism training where different layers are on different ranks.

  • device_type (str, optional) – device type. Defaults to “cuda”.

libai.utils.distributed.get_nd_sbp(sbp_list)[source]

Get nd sbp signature list, which is consistent with 1D/2D mesh GPUs.

Parameters

sbp_list (list) – a sbp list with 2D mesh.

Returns

A modified sbp list according to the initialized distributed environment.

libai.utils.distributed.same_sbp(lhs_sbp, rhs_sbp)[source]

Determine if two sbp signatures are the same.

libai.utils.distributed.setup_dist_util(cfg)[source]

Initialize the distributed environment with configuration.

Example:

from omegaconf import DictConfig

# set the hybrid parallel distributed environment with 2D mesh GPUs
setup_dist_util(
    DictConfig(
        dict(
            data_parallel_size=2,
            tensor_parallel_size=2,
            pipeline_parallel_size=1,
        )
    )
)
libai.utils.distributed.synchronize()[source]

Helper function to synchronize (barrier) among all processes when using distributed training.

libai.utils.distributed.ttol(tensor, pure_local=False, ranks=None)[source]

Global tensor to local tensor.

libai.utils.distributed.tton(tensor, local_only=False, ranks=None)[source]

Global tensor to numpy ndarray.

libai.utils.events module

class libai.utils.events.CommonMetricPrinter(batch_size, max_iter)[source]

Print common metrics to the terminal, including iteration time, ETA, memory, all losses, and the learning rate. It also applies smoothing using a window of 20 elements. It’s meant to print common metrics in common ways. To print something in more customized ways, please implement a similar printer by yourself.

class libai.utils.events.EventStorage(start_iter=0)[source]

The user-facing class that provides metric storage functionalities. In the future we may add support for storing / logging other types of data if needed.

clear_histograms()[source]

Delete all the stored histograms for visualization. This should be called after histograms are written to tensorboard.

clear_images()[source]

Delete all the stored images for visualization. This should be called after images are written to tensorboard.

histories()[source]
Returns

the HistoryBuffer for all scalars

Return type

dict[name -> HistoryBuffer]

history(name)[source]
Returns

the scalar history for name

Return type

HistoryBuffer

property iter

Returns the current iteration number. When used together with a trainer, this is ensured to be the same as trainer.iter.

latest()[source]
Returns

mapping from the name of each scalar to the most recent value and the iteration number its added.

Return type

dict[str -> (float, int)]

latest_with_smoothing_hint(window_size=20)[source]

Similar to latest(), but the returned values are either the un-smoothed original latest value, or a median of the given window_size, depending on whether the smoothing_hint is True. This provides a default behavior that other writers can use.

name_scope(name)[source]
Yields

A context within which all the events added to this storage will be prefixed by the name scope.

put_image(img_name, img_tensor)[source]

Add an img_tensor associated with img_name to be shown on tensorboard.

Parameters
  • img_name (str) – The name of the image to put into tensorboard.

  • img_tensor (flow.Tensor or numpy.array) – An uint8 or float Tensor of shape [channel, height, width] where channel is 3. The image format should be RGB. The elements in img_tensor can either have values in [0, 1] (float32) or [0, 255] (uint8). The img_tensor will be visualized in tensorboard.

put_scalar(name, value, smoothing_hint=True)[source]

Add a scalar value to the HistoryBuffer associated with name.

Parameters

smoothing_hint (bool) – a ‘hint’ on whether this scalar is noisy and should be smoothed when logged. The hint will be accessible through EventStorage.smoothing_hints(). A writer may ignore the hint and apply custom smoothing rule. It defaults to True because most scalars we save need to be smoothed to provide any useful signal.

put_scalars(*, smoothing_hint=True, **kwargs)[source]

Put multiple scalars from keyword arguments.

Example:

storage.put_scalars(loss=my_loss, accuracy=my_accuracy, smoothing_hint=True)
smoothing_hints()[source]
Returns

the user-provided hint on whether the scalar is noisy and needs smoothing.

Return type

dict[name -> bool]

step()[source]

User should either: (1) Call this function to increment storage.iter when needed. Or (2) Set storage.iter to the correct iteration number before each iteration.

The storage will then be able to associate the new data with an iteration number.

class libai.utils.events.JSONWriter(json_file, window_size=20)[source]

Write scalars to a json file. It saves scalars as one json per line (instead of a big json) for easy parsing. Example of parsing such a json file:

$ cat metrics.json | jq -s '.[0:2]'
[
  {
    "data_time": 0.008433341979980469,
    "iteration": 19,
    "total_loss": 1.9228371381759644,
    "lr": 0.007173333333333333,
    "time": 0.25401854515075684
  },
  {
    "data_time": 0.007216215133666992,
    "iteration": 39,
    "total_loss": 1.282649278640747,
    "lr": 0.007706666666666667,
    "time": 0.2490077018737793
  }
]
$ cat metrics.json | jq '.loss_mask'
0.7126231789588928
0.689423680305481
0.6776131987571716
...
libai.utils.events.get_event_storage()[source]
Returns

The EventStorage object that’s currently being used. Throw an error if no EventStorage is currently enabled.

libai.utils.logger module

libai.utils.logger.log_every_n(lvl, msg, n=1, *, name=None)[source]

Log once per n times. :param lvl: the logging level :type lvl: int :param msg: :type msg: str :param n: :type n: int :param name: name of the logger to use. Will use the caller’s module by default. :type name: str

libai.utils.logger.log_every_n_seconds(lvl, msg, n=1, *, name=None)[source]

Log no more than once per n seconds. :param lvl: the logging level :type lvl: int :param msg: :type msg: str :param n: :type n: int :param name: name of the logger to use. Will use the caller’s module by default. :type name: str

libai.utils.logger.log_first_n(lvl, msg, n=1, *, name=None, key='caller')[source]

Log only for the first n times.

Parameters
  • lvl (int) – the logging level

  • msg (str) –

  • n (int) –

  • name (str) – name of the logger to use. Will use the caller’s module by default.

  • key (str or tuple[str]) – the string(s) can be one of “caller” or “message”, which defines how to identify duplicated logs. For example, if called with n=1, key=”caller”, this function will only log the first call from the same caller, regardless of the message content. If called with n=1, key=”message”, this function will log the same content only once, even if they are called from different places. If called with n=1, key=(“caller”, “message”), this function will not log only if the same caller has logged the same message before.

libai.utils.logger.setup_logger(output=None, distributed_rank=0, *, color=True, name='libai', abbrev_name=None)[source]
Parameters
  • output (str) – a file name or a directory to save log. If None, will not save log file. If ends with “.txt” or “.log”, assumed to be a file name. Otherwise, logs will be saved to output/log.txt.

  • name (str) – the root module name of this logger

  • abbrev_name (str) – an abbreviation of the module, to avoid long names in logs. Set to “” to not log the root module in logs. By default, will abbreviate “detectron2” to “d2” and leave other modules unchanged.

libai.utils.checkpoint module

class libai.utils.checkpoint.Checkpointer(model: oneflow.nn.modules.module.Module, save_dir: str = '', *, save_to_disk: bool = True, **checkpointables: object)[source]

A checkpointer that can save/load model as well as extra checkpointable objects.

get_checkpoint_file()[source]
Returns

The latest checkpoint file in target directory.

Return type

str

has_checkpoint()[source]
Returns

whether a checkpoint exists in the target directory.

Return type

bool

load(path: str, checkpointables: Optional[List[str]] = None)object[source]

Load from the given checkpoint. When path points to network file, this function has to be called on all ranks.

Parameters
  • path (str) – path or url to the checkpoint. If empty, will not load anything.

  • checkpointables (list) – List of checkpointable names to load. If not specified (None), will load all the possible checkpointables.

Returns

extra data loaded from the checkpoint that has not been processed. For example, those saved with save(**extra_data)().

Return type

dict

resume_or_load(path: str, *, resume: bool = True)[source]

If resume is True, this method attempts to resume from the last checkpoint (if exists). Otherwise, load checkpoint from the given path. This is useful when restarting an interrupted training job. :param path: path to the checkpoint. :type path: str :param resume: if True, resume from the last checkpoint if it exists. :type resume: bool

Returns

same as load().

save(name: str, **kwargs: Dict[str, str])[source]

Dump model and checkpointables to a file.

Parameters
  • name (str) – name of the file.

  • kwargs (dict) – extra arbitrary data to save.

tag_last_checkpoint(last_filename_basename: str)[source]

Tag the last checkpoint. :param last_filename_basename: the basename of the last filename. :type last_filename_basename: str

class libai.utils.checkpoint.PeriodicCheckpointer(checkpointer: libai.utils.checkpoint.Checkpointer, period: int, max_iter: Optional[int] = None, max_to_keep: Optional[int] = None, file_prefix: str = 'model')[source]

Save checkpoints periodically. When .step(iteration) is called, it will execute checkpointer.save on the given checkpointer, if iteration is a multiple of period or if max_iter is reached.

save(name: str, **kwargs: Any)[source]

Same argument as Checkpointer.save(). Use this method to manually save checkpoints outside the schedule.

Parameters
  • name (str) – file name.

  • kwargs (Any) – extra data to save, same as in Checkpointer.save().

step(iteration: int, **kwargs: Any)[source]

Perform the appropriate action at the given iteration.

Parameters
  • iteration (int) – the current epoch, ranged in [0, max_iter-1].

  • kwargs (Any) – extra data to save, same as in Checkpointer.save().