Source code for bigdl.nano.pytorch.inference.optimizer

# Copyright 2016 The BigDL Authors.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

from collections import namedtuple
import torch
from torch import nn
import subprocess
from importlib.util import find_spec
import time
import numpy as np
from copy import deepcopy
from typing import Dict, Callable, Tuple
from import DataLoader
from torchmetrics.metric import Metric
from bigdl.nano.utils.log4Error import invalidInputError, invalidOperationError
from bigdl.nano.pytorch.amp import BF16Model
from bigdl.nano.deps.openvino.openvino_api import PytorchOpenVINOModel, load_openvino_model
from bigdl.nano.deps.ipex.ipex_api import create_IPEXAccelerator, create_IPEXAccelerator_1_9, \
    PytorchIPEXJITModel, PytorchIPEXJITBF16Model, load_ipexjit_model
from bigdl.nano.deps.onnxruntime.onnxruntime_api import PytorchONNXRuntimeModel, \
from bigdl.nano.deps.neural_compressor.inc_api import load_inc_model, quantize as inc_quantize
from bigdl.nano.utils.inference.pytorch.model import AcceleratedLightningModule
from bigdl.nano.utils.inference.pytorch.model_utils import get_forward_args, get_input_example
from bigdl.nano.pytorch.utils import TORCH_VERSION_LESS_1_10
import warnings
# Filter out useless Userwarnings
warnings.filterwarnings('ignore', category=UserWarning, module='pytorch_lightning')
warnings.filterwarnings('ignore', category=DeprecationWarning, module='pytorch_lightning')
warnings.filterwarnings('ignore', category=UserWarning, module='torch')
warnings.filterwarnings('ignore', category=DeprecationWarning, module='torch')

import os
os.environ['LOGLEVEL'] = 'ERROR'  # remove parital output of inc

_whole_acceleration_options = ["inc", "ipex", "onnxruntime", "openvino", "pot",
                               "bf16", "jit", "channels_last"]

CompareMetric = namedtuple("CompareMetric", ["method_name", "latency", "accuracy"])

class AccelerationOption(object):
    __slot__ = _whole_acceleration_options

    def __init__(self, *args, **kwargs):
        initialize optimization option
        for option in _whole_acceleration_options:
            setattr(self, option, kwargs.get(option, False))
        self.method = kwargs.get("method", None)

    def get_precision(self):
        if or self.pot:
            return "int8"
        if self.bf16:
            return "bf16"
        return "fp32"

    def get_accelerator(self):
        if self.onnxruntime:
            return "onnxruntime"
        if self.openvino:
            return "openvino"
        if self.jit:
            return "jit"
        return None

# acceleration method combinations, developers may want to register some new
# combinations here
        "original": AccelerationOption(),
        "fp32_ipex": AccelerationOption(ipex=True),
        "bf16": AccelerationOption(bf16=True),
        "bf16_ipex": AccelerationOption(bf16=True, ipex=True),
        "int8": AccelerationOption(inc=True),
        "jit_fp32": AccelerationOption(jit=True),
        "jit_fp32_ipex": AccelerationOption(jit=True, ipex=True),
        "jit_fp32_ipex_channels_last": AccelerationOption(jit=True, ipex=True,
        "openvino_fp32": AccelerationOption(openvino=True),
        "openvino_int8": AccelerationOption(openvino=True, pot=True),
        "onnxruntime_fp32": AccelerationOption(onnxruntime=True),
        "onnxruntime_int8_qlinear": AccelerationOption(onnxruntime=True, inc=True,
        "onnxruntime_int8_integer": AccelerationOption(onnxruntime=True, inc=True,

[docs]class InferenceOptimizer: def __init__(self): ''' InferenceOptimizer for Pytorch Model. It can be used to accelerate inference pipeline with very few code changes. ''' # optimized_model_dict handles the optimized model and some metadata # in {"method_name": {"latency": ..., "accuracy": ..., "model": ...}} self.optimized_model_dict = {} self._optimize_result = None
[docs] def optimize(self, model: nn.Module, training_data: DataLoader, validation_data: DataLoader = None, metric: Callable = None, direction: str = "max", thread_num: int = None, logging: bool = False, latency_sample_num: int = 100) -> None: ''' This function will give all available inference acceleration methods a try and record the latency, accuracy and model instance inside the Optimizer for future usage. All model instance is setting to eval mode. :param model: A nn.module to be optimized :param training_data: A pytorch dataloader for training dataset. Users should be careful with this parameter since this dataloader might be exposed to the model, which causing data leak. The batch_size of this dataloader is important as well, users may want to set it to the same batch size you may want to use the model in real deploy environment. E.g. batch size should be set to 1 if you would like to use the accelerated model in an online service. :param validation_data: (optional) A pytorch dataloader for accuracy evaluation This is only needed when users care about the possible accuracy drop. :param metric: (optional) A callable object takes prediction and target and returns a accuracy value in this calling method `metric(pred, target)` :param direction: (optional) A string that indicates the higher/lower better for the metric, "min" for the lower the better and "max" for the higher the better. Default value is "max". :param thread_num: (optional) a int represents how many threads(cores) is needed for inference. :param logging: whether to log detailed information of model conversion. default: False. :param latency_sample_num: (optional) a int represents the number of repetitions to calculate the average latency. The default value is 100. ''' # TODO: may support accuracy_criterion # check if model is a nn.Module or inherited from a nn.Module invalidInputError(isinstance(model, nn.Module), "model should be a nn module.") invalidInputError(direction in ['min', 'max'], "Only support direction 'min', 'max'.") # get the available methods whose dep is met available_dict: Dict = _available_acceleration_combination() self._direction: str = direction # save direction as attr # record whether calculate accuracy in optimize by this attr if validation_data is not None and metric is not None: self._calculate_accuracy = True else: self._calculate_accuracy = False default_threads: int = torch.get_num_threads() thread_num: int = default_threads if thread_num is None else int(thread_num) result_map: Dict[str, Dict] = {} model.eval() # change model to eval mode forward_args = get_forward_args(model) input_sample = get_input_example(model, training_data, forward_args) st = time.perf_counter() try: with torch.no_grad(): model(*input_sample) except Exception: invalidInputError(False, "training_data is incompatible with your model input.") baseline_time = time.perf_counter() - st print("==========================Start Optimization==========================") start_time = time.perf_counter() for idx, (method, available) in enumerate(available_dict.items()): result_map[method] = {} if available is False: result_map[method]["status"] = "lack dependency" else: print(f"----------Start test {method} model " f"({idx+1}/{len(ALL_INFERENCE_ACCELERATION_METHOD)})----------") option: AccelerationOption = ALL_INFERENCE_ACCELERATION_METHOD[method] use_ipex: bool = option.ipex use_channels_last: bool = option.channels_last accelerator: str = option.get_accelerator() precision: str = option.get_precision() # if precision is fp32, then we will use trace method if precision == "fp32": try: if accelerator is None and use_ipex is False: acce_model = model else: if accelerator in ("jit", None): acce_model = \ InferenceOptimizer.trace(model=model, accelerator=accelerator, use_ipex=use_ipex, # channels_last is only for jit channels_last=use_channels_last, input_sample=input_sample) else: acce_model = \ InferenceOptimizer.trace(model=model, accelerator=accelerator, input_sample=input_sample, thread_num=thread_num, # remove output of openvino logging=logging) except Exception as e: print(e) result_map[method]["status"] = "fail to convert" print(f"----------Failed to convert to {method}----------") continue # if precision is int8 or bf16, then we will use quantize method elif precision in ("int8", "bf16"): ort_method: str = option.method try: acce_model = \ InferenceOptimizer.quantize(model=deepcopy(model), precision=precision, accelerator=accelerator, use_ipex=use_ipex, calib_dataloader=training_data, method=ort_method, thread_num=thread_num, # remove output of openvino logging=logging) except Exception as e: print(e) result_map[method]["status"] = "fail to convert" print(f"----------Failed to convert to {method}----------") continue result_map[method]["status"] = "successful" def func_test(model, input_sample): with torch.no_grad(): model(*input_sample) torch.set_num_threads(thread_num) try: result_map[method]["latency"], status =\ _throughput_calculate_helper(latency_sample_num, baseline_time, func_test, acce_model, input_sample) if status is False: result_map[method]["status"] = "early stopped" torch.set_num_threads(default_threads) continue except Exception as e: result_map[method]["status"] = "fail to forward" torch.set_num_threads(default_threads) continue torch.set_num_threads(default_threads) if self._calculate_accuracy: # here we suppose trace don't change accuracy, # so we jump it to reduce time cost of optimize if precision == "fp32" and method != "original": result_map[method]["accuracy"] = "not recomputed" else: result_map[method]["accuracy"] =\ _accuracy_calculate_helper(acce_model, metric, validation_data) else: result_map[method]["accuracy"] = None result_map[method]["model"] = acce_model print(f"----------Finish test {method} model " f"({idx+1}/{len(ALL_INFERENCE_ACCELERATION_METHOD)})----------") self.optimized_model_dict: Dict = result_map print("\n\n==========================Optimization Results==========================") self._optimize_result = _format_optimize_result(self.optimized_model_dict, self._calculate_accuracy) print(self._optimize_result) print("Optimization cost {:.3}s at all.".format(time.perf_counter() - start_time)) print("===========================Stop Optimization===========================")
[docs] def summary(self): ''' Print format string representation for optimization result ''' invalidOperationError(len(self.optimized_model_dict) > 0, "There is no optimization result. You should call .optimize() " "before summary()") print(self._optimize_result)
[docs] def get_best_model(self, accelerator: str = None, precision: str = None, use_ipex: bool = None, accuracy_criterion: float = None) -> Tuple[nn.Module, str]: ''' :param accelerator: (optional) Use accelerator 'None', 'onnxruntime', 'openvino', 'jit', defaults to None. If not None, then will only find the model with this specific accelerator. :param precision: (optional) Supported type: 'int8', 'bf16', defaults to None which represents 'fp32'. If not None, the will only find the model with thie specific precision. :param use_ipex: (optional) if not NOne, then will only find the model with this specific ipex setting :param :param accuracy_criterion: (optional) a float represents tolerable accuracy drop percentage, defaults to None meaning no accuracy control. :return: best model, corresponding acceleration option ''' invalidOperationError(len(self.optimized_model_dict) > 0, "There is no optimized model. You should call .optimize() " "before get_best_model()") invalidInputError(accelerator in [None, 'onnxruntime', 'openvino', 'jit'], "Only support accelerator 'onnxruntime', 'openvino' and 'jit'.") # TODO: include fp16? invalidInputError(precision in [None, 'int8', 'bf16'], "Only support precision 'int8', 'bf16'.") if accuracy_criterion is not None and not self._calculate_accuracy: invalidInputError(False, "If you want to specify accuracy_criterion, you need " "to set metric and validation_data when call 'optimize'.") best_model = self.optimized_model_dict["original"]["model"] best_metric = CompareMetric("original", self.optimized_model_dict["original"]["latency"], self.optimized_model_dict["original"]["accuracy"]) for method in self.optimized_model_dict.keys(): if method == "original" or self.optimized_model_dict[method]["status"] != "successful": continue option: AccelerationOption = ALL_INFERENCE_ACCELERATION_METHOD[method] result: Dict = self.optimized_model_dict[method] if accelerator is not None: if not getattr(option, accelerator): continue if precision is not None: if precision == 'bf16' and not option.bf16: continue if precision == 'int8' and not ( or option.pot): continue if use_ipex: if not option.ipex: continue if accuracy_criterion is not None: accuracy = result["accuracy"] compare_acc: float = best_metric.accuracy if accuracy == "not recomputed": pass elif self._direction == "min": if (accuracy - compare_acc) / compare_acc > accuracy_criterion: continue else: if (compare_acc - accuracy) / compare_acc > accuracy_criterion: continue # After the above conditions are met, the latency comparison is performed if result["latency"] < best_metric.latency: best_model = result["model"] if result["accuracy"] != "not recomputed": accuracy = result["accuracy"] else: accuracy = self.optimized_model_dict["original"]["accuracy"] best_metric = CompareMetric(method, result["latency"], accuracy) return best_model, _format_acceleration_option(best_metric.method_name)
[docs] @staticmethod def quantize(model: nn.Module, precision: str = 'int8', accelerator: str = None, use_ipex: bool = False, calib_dataloader: DataLoader = None, metric: Metric = None, accuracy_criterion: dict = None, approach: str = 'static', method: str = None, conf: str = None, tuning_strategy: str = None, timeout: int = None, max_trials: int = None, input_sample=None, thread_num: int = None, onnxruntime_session_options=None, logging: bool = True, **export_kwargs): """ Calibrate a Pytorch-Lightning model for post-training quantization. :param model: A model to be quantized. Model type should be an instance of nn.Module. :param precision: Global precision of quantized model, supported type: 'int8', 'bf16', 'fp16', defaults to 'int8'. :param accelerator: Use accelerator 'None', 'onnxruntime', 'openvino', defaults to None. None means staying in pytorch. :param calib_dataloader: A object for calibration. Required for static quantization. It's also used as validation dataloader. :param metric: A torchmetrics.metric.Metric object for evaluation. :param accuracy_criterion: Tolerable accuracy drop, defaults to None meaning no accuracy control. accuracy_criterion = {'relative': 0.1, 'higher_is_better': True} allows relative accuracy loss: 1%. accuracy_criterion = {'absolute': 0.99, 'higher_is_better':False} means accuracy must be smaller than 0.99. :param approach: 'static' or 'dynamic'. 'static': post_training_static_quant, 'dynamic': post_training_dynamic_quant. Default: 'static'. OpenVINO supports static mode only. :param method: Method to do quantization. When accelerator=None, supported methods: 'fx', 'eager', 'ipex', defaults to 'fx'. If you don't use ipex, suggest using 'fx' which executes automatic optimizations like fusion. For more information, please refer to When accelerator='onnxruntime', supported methods: 'qlinear', 'integer', defaults to 'qlinear'. Suggest 'qlinear' for lower accuracy drop if using static quantization. More details in This argument doesn't take effect for OpenVINO, don't change it for OpenVINO. :param conf: A path to conf yaml file for quantization. Default: None, using default config. :param tuning_strategy: 'bayesian', 'basic', 'mse', 'sigopt'. Default: 'bayesian'. :param timeout: Tuning timeout (seconds). Default: None, which means early stop. Combine with max_trials field to decide when to exit. :param max_trials: Max tune times. Default: None, which means no tuning. Combine with timeout field to decide when to exit. "timeout=0, max_trials=1" means it will try quantization only once and return satisfying best model. :param input_sample: An input example to convert pytorch model into ONNX/OpenVINO. :param thread_num: (optional) a int represents how many threads(cores) is needed for inference, only valid for accelerator='onnxruntime' or accelerator='openvino'. :param onnxruntime_session_options: The session option for onnxruntime, only valid when accelerator='onnxruntime', otherwise will be ignored. :param logging: whether to log detailed information of model conversion, only valid when accelerator='openvino', otherwise will be ignored. default: True. :param **export_kwargs: will be passed to torch.onnx.export function. :return: A accelerated Pytorch-Lightning Model if quantization is sucessful. """ if precision == 'bf16': if accelerator is None: if use_ipex: invalidInputError(not TORCH_VERSION_LESS_1_10, "torch version should >=1.10 to use ipex") use_jit = (accelerator == "jit") channels_last = export_kwargs["channels_last"] \ if "channels_last" in export_kwargs else None return PytorchIPEXJITBF16Model(model, input_sample=input_sample, use_ipex=use_ipex, use_jit=use_jit, channels_last=channels_last) bf16_model = BF16Model(model) return bf16_model else: invalidInputError(False, "Accelerator {} is invalid for BF16.".format(accelerator)) if precision == 'int8': if not accelerator or accelerator == 'onnxruntime': method_map = { None: { 'fx': 'pytorch_fx', 'eager': 'pytorch', 'ipex': 'pytorch_ipex', None: 'pytorch_fx' # default }, 'onnxruntime': { 'qlinear': 'onnxrt_qlinearops', 'integer': 'onnxrt_integerops', None: 'onnxrt_qlinearops' # default } } framework = method_map[accelerator].get(method, None) if accelerator == "onnxruntime": if not type(model).__name__ == 'PytorchONNXRuntimeModel': # try to establish onnx model if input_sample is None: # input_sample can be a dataloader input_sample = calib_dataloader if onnxruntime_session_options is None: import onnxruntime onnxruntime_session_options = onnxruntime.SessionOptions() if thread_num is not None: onnxruntime_session_options.intra_op_num_threads = thread_num onnxruntime_session_options.inter_op_num_threads = thread_num model = InferenceOptimizer.trace( model, input_sample=input_sample, accelerator='onnxruntime', onnxruntime_session_options=onnxruntime_session_options, **export_kwargs) """ If accelerator==None, quantized model returned should be an object of PytorchModel which is defined by neural-compressor containing a `GraphModule` for inference. Otherwise accelerator=='onnxruntime', it returns an ONNXModel object. A supported model which is able to run on Pytorch or ONNXRuntime can be fetched by `quantized_model.model`. """ return inc_quantize(model, calib_dataloader, metric, framework=framework, conf=conf, approach=approach, tuning_strategy=tuning_strategy, accuracy_criterion=accuracy_criterion, timeout=timeout, max_trials=max_trials, onnxruntime_session_options=onnxruntime_session_options) elif accelerator == 'openvino': model_type = type(model).__name__ if not model_type == 'PytorchOpenVINOModel': if input_sample is None: # input_sample can be a dataloader input_sample = calib_dataloader model = InferenceOptimizer.trace(model, input_sample=input_sample, accelerator='openvino', thread_num=thread_num, logging=logging, **export_kwargs) invalidInputError(type(model).__name__ == 'PytorchOpenVINOModel', "Invalid model to quantize. Please use a nn.Module or a model " "from trainer.trance(accelerator=='openvino')") drop_type = None higher_is_better = None maximal_drop = None if metric: if not isinstance(accuracy_criterion, dict): accuracy_criterion = {'relative': 0.99, 'higher_is_better': True} drop_type = 'relative' if 'relative' in accuracy_criterion else 'absolute' higher_is_better = accuracy_criterion.get('higher_is_better', None) maximal_drop = accuracy_criterion.get(drop_type, None) kwargs = { "metric": metric, "higher_better": higher_is_better, "drop_type": drop_type, "maximal_drop": maximal_drop, "max_iter_num": max_trials, # TODO following two keys are optional, if there is need, we can add them # "n_requests": None, # "sample_size": 300 } return model.pot(calib_dataloader, **kwargs) else: invalidInputError(False, "Accelerator {} is invalid.".format(accelerator)) invalidInputError(False, "Precision {} is invalid.".format(precision))
[docs] @staticmethod def trace(model: nn.Module, input_sample=None, accelerator: str = None, use_ipex: bool = False, thread_num: int = None, onnxruntime_session_options=None, logging: bool = True, **export_kwargs): """ Trace a pytorch model and convert it into an accelerated module for inference. For example, this function returns a PytorchOpenVINOModel when accelerator=='openvino'. :param model: An torch.nn.Module model, including pl.LightningModule. :param input_sample: A set of inputs for trace, defaults to None if you have trace before or model is a LightningModule with any dataloader attached. :param accelerator: The accelerator to use, defaults to None meaning staying in Pytorch backend. 'openvino', 'onnxruntime' and 'jit' are supported for now. :param use_ipex: whether we use ipex as accelerator for inferencing. default: False. :param thread_num: (optional) a int represents how many threads(cores) is needed for inference, only valid for accelerator='onnxruntime' or accelerator='openvino'. :param onnxruntime_session_options: The session option for onnxruntime, only valid when accelerator='onnxruntime', otherwise will be ignored. :param logging: whether to log detailed information of model conversion, only valid when accelerator='openvino', otherwise will be ignored. default: True. :param **kwargs: other extra advanced settings include 1. those be passed to torch.onnx.export function, only valid when accelerator='onnxruntime'/'openvino', otherwise will be ignored. 2. if channels_last is set and use_ipex=True, we will transform the data to be channels last according to the setting. Defaultly, channels_last will be set to True if use_ipex=True. :return: Model with different acceleration. """ invalidInputError( isinstance(model, nn.Module) and not isinstance(model, AcceleratedLightningModule), "Expect a nn.Module instance that is not traced or quantized" "but got type {}".format(type(model)) ) if accelerator == 'openvino': # openvino backend will not care about ipex usage return PytorchOpenVINOModel(model, input_sample, thread_num, logging, **export_kwargs) if accelerator == 'onnxruntime': # onnxruntime backend will not care about ipex usage if onnxruntime_session_options is None: import onnxruntime onnxruntime_session_options = onnxruntime.SessionOptions() if thread_num is not None: onnxruntime_session_options.intra_op_num_threads = thread_num onnxruntime_session_options.inter_op_num_threads = thread_num return PytorchONNXRuntimeModel(model, input_sample, onnxruntime_session_options, **export_kwargs) if accelerator == 'jit' or use_ipex: if use_ipex: invalidInputError(not TORCH_VERSION_LESS_1_10, "torch version should >=1.10 to use ipex") use_jit = (accelerator == "jit") channels_last = export_kwargs["channels_last"]\ if "channels_last" in export_kwargs else None return PytorchIPEXJITModel(model, input_sample=input_sample, use_ipex=use_ipex, use_jit=use_jit, channels_last=channels_last) invalidInputError(False, "Accelerator {} is invalid.".format(accelerator))
def _inc_checker(): ''' check if intel neural compressor is installed ''' return not find_spec("neural_compressor") is None def _ipex_checker(): ''' check if intel pytorch extension is installed ''' return not find_spec("intel_extension_for_pytorch") is None def _onnxruntime_checker(): ''' check if onnxruntime and onnx is installed ''' onnxruntime_installed = not find_spec("onnxruntime") is None onnx_installed = not find_spec("onnx") is None return onnxruntime_installed and onnx_installed def _openvino_checker(): ''' check if openvino-dev is installed ''' return not find_spec("openvino-dev") is None def _bf16_checker(): ''' bf16 availablity will be decided dynamically during the optimization ''' msg = subprocess.check_output(["lscpu"]).decode("utf-8") return "avx512_bf16" in msg or "amx_bf16" in msg def _available_acceleration_combination(): ''' :return: a dictionary states the availablity (if meet depdencies) ''' dependency_checker = {"inc": _inc_checker, "ipex": _ipex_checker, "onnxruntime": _onnxruntime_checker, "openvino": _openvino_checker, "pot": _openvino_checker, "bf16": _bf16_checker} available_dict = {} for method, option in ALL_INFERENCE_ACCELERATION_METHOD.items(): available_iter = True for name, value in option.__dict__.items(): if value is True: if name in dependency_checker and not dependency_checker[name](): available_iter = False available_dict[method] = available_iter return available_dict def _throughput_calculate_helper(iterrun, baseline_time, func, *args): ''' A simple helper to calculate average latency ''' start_time = time.perf_counter() time_list = [] for i in range(iterrun): st = time.perf_counter() with torch.no_grad(): func(*args) end = time.perf_counter() time_list.append(end - st) # if three samples cost more than 4x time than baseline model, prune it if i == 2 and end - start_time > 12 * baseline_time: return np.mean(time_list) * 1000, False # at least need 10 iters and try to control calculation # time less than 2 min if i + 1 >= min(iterrun, 10) and (end - start_time) > 2: iterrun = i + 1 break time_list.sort() # remove top and least 10% data time_list = time_list[int(0.1 * iterrun): int(0.9 * iterrun)] return np.mean(time_list) * 1000, True def _accuracy_calculate_helper(model, metric, data): ''' A quick helper to calculate accuracy ''' metric_list = [] sample_num = 0 with torch.no_grad(): for i, (data_input, target) in enumerate(data): metric_list.append(metric(model(data_input), target).numpy() * data_input.shape[0]) sample_num += data_input.shape[0] return np.sum(metric_list) / sample_num def _format_acceleration_option(method_name: str) -> str: ''' Get a string represation for current method's acceleration option ''' option = ALL_INFERENCE_ACCELERATION_METHOD[method_name] repr_str = "" for key, value in option.__dict__.items(): if value is True: if key == "pot": repr_str = repr_str + "int8" + " + " else: repr_str = repr_str + key + " + " elif isinstance(value, str): repr_str = repr_str + value + " + " if len(repr_str) > 0: repr_str = repr_str[:-2] return repr_str def _format_optimize_result(optimize_result_dict: dict, calculate_accuracy: bool) -> str: ''' Get a format string represation for optimization result ''' if calculate_accuracy is True: horizontal_line = " {0} {1} {2} {3}\n" \ .format("-" * 32, "-" * 22, "-" * 14, "-" * 22) repr_str = horizontal_line repr_str += "| {0:^30} | {1:^20} | {2:^12} | {3:^20} |\n" \ .format("method", "status", "latency(ms)", "accuracy") repr_str += horizontal_line for method, result in optimize_result_dict.items(): status = result["status"] latency = result.get("latency", "None") if latency != "None": latency = round(latency, 3) accuracy = result.get("accuracy", "None") if accuracy != "None" and isinstance(accuracy, float): accuracy = round(accuracy, 3) method_str = f"| {method:^30} | {status:^20} | " \ f"{latency:^12} | {accuracy:^20} |\n" repr_str += method_str repr_str += horizontal_line else: horizontal_line = " {0} {1} {2}\n" \ .format("-" * 32, "-" * 22, "-" * 14) repr_str = horizontal_line repr_str += "| {0:^30} | {1:^20} | {2:^12} |\n" \ .format("method", "status", "latency(ms)") repr_str += horizontal_line for method, result in optimize_result_dict.items(): status = result["status"] latency = result.get("latency", "None") if latency != "None": latency = round(latency, 3) method_str = f"| {method:^30} | {status:^20} | {latency:^12} |\n" repr_str += method_str repr_str += horizontal_line return repr_str