Source code for bigdl.orca.automl.metrics

#
# Copyright 2016 The BigDL Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from sklearn.metrics import mean_squared_error
from sklearn.metrics import r2_score
from sklearn.metrics import mean_absolute_error
from sklearn.metrics import mean_squared_log_error
import numpy as np
import pandas as pd
from bigdl.dllib.utils.log4Error import *
from bigdl.dllib.utils.log4Error import invalidInputError
from numpy import float64, ndarray
from typing import List, Tuple, Union, Optional, Callable, Any


EPSILON = 1e-10


def _standardize_input(y_true: "ndarray", y_pred: "ndarray",
                       multioutput: str) -> Tuple:
    """
    This function check the validation of the input
    input should be one of list/tuple/ndarray with same shape and not be None
    input dtype will be cast to np.float64
    input will be changed to corresponding 2-dim ndarray
    """
    if y_true is None or y_pred is None:
        invalidInputError(False,
                          "The input is None.")
    if not isinstance(y_true, (list, tuple, np.ndarray, pd.DataFrame)):
        invalidInputError(False,
                          "Expected array-like input. "
                          "Only list/tuple/ndarray/pd.DataFrame are supported")
    if isinstance(y_true, (list, tuple)):
        y_true = np.array(y_true)
    if isinstance(y_pred, (list, tuple)):
        y_pred = np.array(y_pred)
    if isinstance(y_true, pd.DataFrame):
        y_true = y_true.to_numpy()
    if isinstance(y_pred, pd.DataFrame):
        y_pred = y_pred.to_numpy()

    y_true = y_true.astype(np.float64)
    y_pred = y_pred.astype(np.float64)

    original_shape = y_true.shape[1:]

    if y_true.ndim == 1:
        y_true = y_true.reshape((-1, 1))
    else:
        y_true = y_true.reshape((y_true.shape[0], -1))

    if y_pred.ndim == 1:
        y_pred = y_pred.reshape((-1, 1))
    else:
        y_pred = y_pred.reshape((y_pred.shape[0], -1))

    if y_true.shape[0] != y_pred.shape[0]:
        invalidInputError(False,
                          "y_true and y_pred have different number of samples "
                          "({0}!={1})".format(y_true.shape[0], y_pred.shape[0]))
    if y_true.shape[1] != y_pred.shape[1]:
        invalidInputError(False,
                          "y_true and y_pred have different number of output "
                          "({0}!={1})".format(y_true.shape[1], y_pred.shape[1]))
    allowed_multioutput_str = ('raw_values', 'uniform_average',
                               'variance_weighted')
    if isinstance(multioutput, str):
        if multioutput not in allowed_multioutput_str:
            invalidInputError(False,
                              "Allowed 'multioutput' string values are {}. You provided"
                              " multioutput={!r}".format(allowed_multioutput_str, multioutput))

    return y_true, y_pred, original_shape


[docs]def sMAPE(y_true: "ndarray", y_pred: "ndarray", multioutput: str='raw_values') -> Union[float64, "ndarray"]: """ Calculate Symmetric mean absolute percentage error (sMAPE). .. math:: \\text{sMAPE} = \\frac{100\%}{n} \\sum_{t=1}^n \\frac{|y_t-\\hat{y_t}|}{|y_t|+|\\hat{y_t}|} :param y_true: Array-like of shape = (n_samples, \*). Ground truth (correct) target values. :param y_pred: Array-like of shape = (n_samples, \*). Estimated target values. :param multioutput: String in ['raw_values', 'uniform_average'] :return: Float or ndarray of floats. A non-negative floating point value (the best value is 0.0), or an array of floating point values, one for each individual target. """ y_true, y_pred, original_shape = _standardize_input(y_true, y_pred, multioutput) output_errors = np.mean(100 * np.abs(y_true - y_pred) / (np.abs(y_true) + np.abs(y_pred) + EPSILON), axis=0,) if multioutput == 'raw_values': return output_errors.reshape(original_shape) return np.mean(output_errors)
[docs]def MPE(y_true: "ndarray", y_pred: "ndarray", multioutput: str='raw_values') -> Union[float64, "ndarray"]: """ Calculate mean percentage error (MPE). .. math:: \\text{MPE} = \\frac{100\%}{n}\\sum_{t=1}^n \\frac{y_t-\\hat{y_t}}{y_t} :param y_true: Array-like of shape = (n_samples, \*). Ground truth (correct) target values. :param y_pred: Array-like of shape = (n_samples, \*). Estimated target values. :param multioutput: String in ['raw_values', 'uniform_average'] :return: Float or ndarray of floats. A non-negative floating point value (the best value is 0.0), or an array of floating point values, one for each individual target. """ y_true, y_pred, original_shape = _standardize_input(y_true, y_pred, multioutput) output_errors = np.mean(100 * (y_true - y_pred) / (y_true + EPSILON), axis=0,) if multioutput == 'raw_values': return output_errors.reshape(original_shape) return np.mean(output_errors)
[docs]def MAPE(y_true: "ndarray", y_pred: "ndarray", multioutput: str='raw_values') -> Union[float64, "ndarray"]: """ Calculate mean absolute percentage error (MAPE). .. math:: \\text{MAPE} = \\frac{100\%}{n}\\sum_{t=1}^n |\\frac{y_t-\\hat{y_t}}{y_t}| :param y_true: Array-like of shape = (n_samples, \*). Ground truth (correct) target values. :param y_pred: Array-like of shape = (n_samples, \*). Estimated target values. :param multioutput: String in ['raw_values', 'uniform_average'] :return: Float or ndarray of floats. A non-negative floating point value (the best value is 0.0), or an array of floating point values, one for each individual target. """ y_true, y_pred, original_shape = _standardize_input(y_true, y_pred, multioutput) output_errors = 100 * np.mean(np.abs((y_true - y_pred) / (y_true + EPSILON)), axis=0,) if multioutput == 'raw_values': return output_errors.reshape(original_shape) return np.mean(output_errors)
[docs]def MDAPE(y_true: "ndarray", y_pred: "ndarray", multioutput: str='raw_values') -> Union[float64, "ndarray"]: """ Calculate Median Absolute Percentage Error (MDAPE). .. math:: \\text{MDAPE} = 100\%\ median(|\\frac{y_1-\\hat{y_1}}{y_1}|, \\ldots, |\\frac{y_n-\\hat{y_n}}{y_n}|) :param y_true: Array-like of shape = (n_samples, \*). Ground truth (correct) target values. :param y_pred: Array-like of shape = (n_samples, \*). Estimated target values. :param multioutput: String in ['raw_values', 'uniform_average'] :return: Float or ndarray of floats. A non-negative floating point value (the best value is 0.0), or an array of floating point values, one for each individual target. """ y_true, y_pred, original_shape = _standardize_input(y_true, y_pred, multioutput) output_errors = np.median(100 * np.abs((y_true - y_pred) / (y_true + EPSILON)), axis=0,) if multioutput == 'raw_values': return output_errors.reshape(original_shape) return np.mean(output_errors)
[docs]def sMDAPE(y_true: "ndarray", y_pred: "ndarray", multioutput: str='raw_values') -> Union[float64, "ndarray"]: """ Calculate Symmetric Median Absolute Percentage Error (sMDAPE). .. math:: \\text{sMDAPE} = 100\%\ median(\\frac{|y_1-\\hat{y_1}|}{|y_1|+|\\hat{y_1}|}, \\ldots, \\frac{|y_n-\\hat{y_n}|}{|y_n|+|\\hat{y_n}|}) :param y_true: Array-like of shape = (n_samples, \*). Ground truth (correct) target values. :param y_pred: Array-like of shape = (n_samples, \*). Estimated target values. :param multioutput: String in ['raw_values', 'uniform_average'] :return: Float or ndarray of floats. A non-negative floating point value (the best value is 0.0), or an array of floating point values, one for each individual target. """ y_true, y_pred, original_shape = _standardize_input(y_true, y_pred, multioutput) output_errors = np.median(100 * np.abs(y_true - y_pred) / (np.abs(y_true) + np.abs(y_pred) + EPSILON), axis=0, ) if multioutput == 'raw_values': return output_errors.reshape(original_shape) return np.mean(output_errors)
[docs]def ME(y_true: "ndarray", y_pred: "ndarray", multioutput: str='raw_values') -> Union[float64, "ndarray"]: """ Calculate Mean Error (ME). .. math:: \\text{ME} = \\frac{1}{n}\\sum_{t=1}^n y_t-\\hat{y_t} :param y_true: Array-like of shape = (n_samples, \*). Ground truth (correct) target values. :param y_pred: Array-like of shape = (n_samples, \*). Estimated target values. :param multioutput: String in ['raw_values', 'uniform_average'] :return: Float or ndarray of floats. A floating point value (the best value is 0.0), or an array of floating point values, one for each individual target. """ y_true, y_pred, original_shape = _standardize_input(y_true, y_pred, multioutput) output_errors = np.mean(y_true - y_pred, axis=0,) if multioutput == 'raw_values': return output_errors.reshape(original_shape) return np.mean(output_errors)
[docs]def MSPE(y_true: "ndarray", y_pred: "ndarray", multioutput: str='raw_values') -> Union[float64, "ndarray"]: """ Calculate mean squared percentage error (MSPE). .. math:: \\text{MSPE} = \\frac{100\%}{n}\\sum_{t=1}^n (\\frac{y_n-\\hat{y_n}}{y_n})^2 :param y_true: Array-like of shape = (n_samples, \*). Ground truth (correct) target values. :param y_pred: Array-like of shape = (n_samples, \*). Estimated target values. :param multioutput: String in ['raw_values', 'uniform_average'] :return: Float or ndarray of floats. A non-negative floating point value (the best value is 0.0), or an array of floating point values, one for each individual target. """ y_true, y_pred, original_shape = _standardize_input(y_true, y_pred, multioutput) output_errors = 100 * np.mean(np.square((y_true - y_pred) / (y_true + EPSILON)), axis=0, ) if multioutput == 'raw_values': return output_errors.reshape(original_shape) return np.mean(output_errors)
[docs]def MSLE(y_true: "ndarray", y_pred: "ndarray", multioutput: str='raw_values') -> Union[float64, "ndarray"]: """ Calculate the mean squared log error(MSLE). .. math:: \\text{MSLE} = \\frac{1}{n}\\sum_{t=1}^n (log_e(1+y_t)-log_e(1+\\hat{y_t}))^2 :param y_true: Array-like of shape = (n_samples, \*). Ground truth (correct) target values. :param y_pred: Array-like of shape = (n_samples, \*). Estimated target values. :param multioutput: String in ['raw_values', 'uniform_average'] :return: Float or ndarray of floats. A non-negative floating point value (the best value is 0.0), or an array of floating point values, one for each individual target. """ y_true, y_pred, original_shape = _standardize_input(y_true, y_pred, multioutput) result = mean_squared_log_error(y_true, y_pred, multioutput=multioutput) if multioutput == 'raw_values': return result.reshape(original_shape) return result
[docs]def R2(y_true: "ndarray", y_pred: "ndarray", multioutput: str='raw_values') -> Union[float64, "ndarray"]: """ Calculate the r2 score. .. math:: R^2 = 1-\\frac{\\sum_{t=1}^n (y_t-\\hat{y_t})^2}{\\sum_{t=1}^n (y_t-\\bar{y})^2} :param y_true: Array-like of shape = (n_samples, \*). Ground truth (correct) target values. :param y_pred: Array-like of shape = (n_samples, \*). Estimated target values. :param multioutput: String in ['raw_values', 'uniform_average'] :return: Float or ndarray of floats. A non-negative floating point value (the best value is 1.0), or an array of floating point values, one for each individual target. """ y_true, y_pred, original_shape = _standardize_input(y_true, y_pred, multioutput) result = r2_score(y_true, y_pred, multioutput=multioutput) if multioutput == 'raw_values': return result.reshape(original_shape) return result
[docs]def MAE(y_true: "ndarray", y_pred: "ndarray", multioutput: str='raw_values') -> Union[float64, "ndarray"]: """ Calculate the mean absolute error (MAE). .. math:: \\text{MAE} = \\frac{1}{n}\\sum_{t=1}^n |y_t-\\hat{y_t}| :param y_true: Array-like of shape = (n_samples, \*). Ground truth (correct) target values. :param y_pred: Array-like of shape = (n_samples, \*). Estimated target values. :param multioutput: String in ['raw_values', 'uniform_average'] :return: Float or ndarray of floats. A non-negative floating point value (the best value is 0.0), or an array of floating point values, one for each individual target. """ y_true, y_pred, original_shape = _standardize_input(y_true, y_pred, multioutput) result = mean_absolute_error(y_true, y_pred, multioutput=multioutput) if multioutput == 'raw_values': return result.reshape(original_shape) return result
[docs]def RMSE(y_true: "ndarray", y_pred: "ndarray", multioutput: str='raw_values') -> Union[float64, "ndarray"]: """ Calculate square root of the mean squared error (RMSE). .. math:: \\text{RMSE} = \\sqrt{(\\frac{1}{n}\\sum_{t=1}^n (y_t-\\hat{y_t})^2)} :param y_true: Array-like of shape = (n_samples, \*). Ground truth (correct) target values. :param y_pred: Array-like of shape = (n_samples, \*). Estimated target values. :param multioutput: String in ['raw_values', 'uniform_average'] :return: Float or ndarray of floats. A non-negative floating point value (the best value is 0.0), or an array of floating point values, one for each individual target. """ return np.sqrt(MSE(y_true, y_pred, multioutput=multioutput))
[docs]def MSE(y_true: "ndarray", y_pred: "ndarray", multioutput: str='uniform_average') -> Union[float64, "ndarray"]: """ Calculate the mean squared error (MSE). .. math:: \\text{MSE} = \\frac{1}{n}\\sum_{t=1}^n (y_t-\\hat{y_t})^2 :param y_true: Array-like of shape = (n_samples, \*). Ground truth (correct) target values. :param y_pred: Array-like of shape = (n_samples, \*). Estimated target values. :param multioutput: String in ['raw_values', 'uniform_average'] :return: Float or ndarray of floats. A non-negative floating point value (the best value is 0.0), or an array of floating point values, one for each individual target. """ y_true, y_pred, original_shape = _standardize_input(y_true, y_pred, multioutput) result = mean_squared_error(y_true, y_pred, multioutput=multioutput) if multioutput == 'raw_values': return result.reshape(original_shape) return result
[docs]def Accuracy(y_true: "ndarray", y_pred: "ndarray", multioutput=None) -> Union[float64, "ndarray"]: """ Calculate the accuracy score (Accuracy). .. math:: \\text{Accuracy} = \\frac{1}{n}\\sum_{t=1}^n 1(y_t=\\hat{y_t}) :param y_true: Array-like of shape = (n_samples, \*). Ground truth (correct) target values. :param y_pred: Array-like of shape = (n_samples, \*). Estimated target values. :return: Float or ndarray of floats. A non-negative floating point value (the best value is 1.0), or an array of floating point values, one for each individual target. """ from sklearn.metrics._classification import accuracy_score y_true = np.squeeze(y_true) y_pred = np.squeeze(y_pred) if np.any(y_pred != y_pred.astype(int)): # y_pred is probability if y_pred.ndim == 1: y_pred = np.where(y_pred > 0.5, 1, 0) else: y_pred = np.argmax(y_pred, axis=1) return accuracy_score(y_true, y_pred)
[docs]class Evaluator(object): """ Evaluate metrics for y_true and y_pred. """ metrics_func = { # Absolute 'me': ME, 'mae': MAE, 'mse': MSE, 'rmse': RMSE, 'msle': MSLE, 'r2': R2, # Relative 'mpe': MPE, 'mape': MAPE, 'mspe': MSPE, 'smape': sMAPE, 'mdape': MDAPE, 'smdape': sMDAPE, 'accuracy': Accuracy, } max_mode_metrics = ('r2', 'accuracy')
[docs] @staticmethod def evaluate(metric: str, y_true: "ndarray", y_pred: "ndarray", multioutput: str='raw_values') -> float64: """ Evaluate a specific metric for y_true and y_pred. :param metric: String in ['me', 'mae', 'mse', 'rmse', 'msle', 'r2' , 'mpe', 'mape', 'mspe', 'smape', 'mdape', 'smdape', 'accuracy'] :param y_true: Array-like of shape = (n_samples, \*). Ground truth (correct) target values. :param y_pred: Array-like of shape = (n_samples, \*). Estimated target values. :param multioutput: String in ['raw_values', 'uniform_average'] :return: Float or ndarray of floats. A floating point value, or an array of floating point values, one for each individual target. """ Evaluator.check_metric(metric) result = Evaluator.metrics_func[metric](y_true, y_pred, multioutput=multioutput) if result.shape == (1,): return result[0] else: return result
@staticmethod def check_metric(metric: str) -> None: if not metric: invalidInputError(False, f"Got invalid metric name of {metric}!") if metric not in Evaluator.metrics_func.keys(): invalidInputError(False, "metric " + metric + " is not supported") @staticmethod def get_metric_mode(metric: Union[Callable[..., Any], str, None]) -> str: Evaluator.check_metric(metric) if metric in Evaluator.max_mode_metrics: return "max" else: return "min"