Source code for tsad.utils.evaluating.univariate_funcs

import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

from .src import extract_cp_confusion_matrix, filter_detecting_boundaries


[docs]def confusion_matrix(true, prediction): true_ = true == 1 prediction_ = prediction == 1 TP = (true_ & prediction_).sum() TN = (~true_ & ~prediction_).sum() FP = (~true_ & prediction_).sum() FN = (true_ & ~prediction_).sum() return TP, TN, FP, FN
[docs]def single_average_delay(detecting_boundaries, prediction, anomaly_window_destination, clear_anomalies_mode): """ anomaly_window_destination: 'lefter', 'righter', 'center'. Default='right' """ detecting_boundaries = filter_detecting_boundaries(detecting_boundaries) point = 0 if clear_anomalies_mode else -1 dict_cp_confusion = extract_cp_confusion_matrix(detecting_boundaries, prediction, point=point) missing = 0 detectHistory = [] all_true_anom = 0 FP = 0 FP += len(dict_cp_confusion['FPs']) missing += len(dict_cp_confusion['FNs']) all_true_anom += len(dict_cp_confusion['TPs']) + len(dict_cp_confusion['FNs']) if anomaly_window_destination == 'lefter': def average_time(output_cp_cm_tp): return output_cp_cm_tp[2] - output_cp_cm_tp[1] elif anomaly_window_destination == 'righter': def average_time(output_cp_cm_tp): return output_cp_cm_tp[1] - output_cp_cm_tp[0] elif anomaly_window_destination == 'center': def average_time(output_cp_cm_tp): return output_cp_cm_tp[1] - (output_cp_cm_tp[0] + (output_cp_cm_tp[2] - output_cp_cm_tp[0]) / 2) else: raise Exception("Choose anomaly_window_destination") for fp_case_window in dict_cp_confusion['TPs']: detectHistory.append(average_time(dict_cp_confusion['TPs'][fp_case_window])) return missing, detectHistory, FP, all_true_anom
[docs]def my_scale(fp_case_window=None, A_tp=1, A_fp=0, koef=1, detalization=1000, clear_anomalies_mode=True, plot_figure=False): """ ts - segment on which the window is applied """ x = np.linspace(-np.pi / 2, np.pi / 2, detalization) x = x if clear_anomalies_mode else x[::-1] y = (A_tp - A_fp) / 2 * -1 * np.tanh(koef * x) / (np.tanh(np.pi * koef / 2)) + (A_tp - A_fp) / 2 + A_fp if not plot_figure: event = int((fp_case_window[1] - fp_case_window[0]) / \ (fp_case_window[-1] - fp_case_window[0]) * detalization) if event >= len(x): event = len(x) - 1 score = y[event] return score else: return y
[docs]def single_evaluate_nab(detecting_boundaries, prediction, table_of_coef=None, clear_anomalies_mode=True, scale_func="improved", scale_koef=1, plot_figure=True # TODO ): """ detecting_boundaries: list of list of two float values The list of lists of left and right boundary indices for scoring results of labeling if empty. Can be [[]], or [[],[t1,t2],[]] table_of_coef: pandas array (3x4) of float values Table of coefficients for NAB score function indices: 'Standard','LowFP','LowFN' columns:'A_tp','A_fp','A_tn','A_fn' scale_func {default}, improved недостатки scale_func default - 1 - зависит от относительного шага, а это значит, что если слишком много точек в scoring window то перепад будет слишком жестким в середение. 2- то самая левая точка не равно Atp, а права не равна Afp (особенно если пррименять расплывающую множитель) clear_anomalies_mode тогда слева от границы Atp срправа Afp, иначе fault mode, когда слева от границы Afp срправа Atp """ # def sigm_scale(len_ts, A_tp, A_fp, koef=1): # x = np.arange(-int(len_ts/2), len_ts - int(len_ts/2)) # x = x if clear_anomalies_mode else x[::-1] # y = (A_tp-A_fp)*(1/(1+np.exp(5*x*koef))) + A_fp # return y # def my_scale(len_ts,A_tp,A_fp,koef=1): # """ts - участок на котором надо жахнуть окно """ # x = np.linspace(-np.pi/2,np.pi/2,len_ts) # x = x if clear_anomalies_mode else x[::-1] # # Приведение если неравномерный шаг. # #x_new = x_old * ( np.pi / (x_old[-1]-x_old[0])) - x_old[0]*( np.pi / (x_old[-1]-x_old[0])) - np.pi/2 # y = (A_tp-A_fp)/2*-1*np.tanh(koef*x)/(np.tanh(np.pi*koef/2)) + (A_tp-A_fp)/2 + A_fp # return y if scale_func == "improved": scale_func = my_scale # elif scale_func == "default": # scale_func = sigm_scale else: raise Exception("choose the scale_func") # filter detecting_boundaries = filter_detecting_boundaries(detecting_boundaries) if table_of_coef is None: table_of_coef = pd.DataFrame([[1.0, -0.11, 1.0, -1.0], [1.0, -0.22, 1.0, -1.0], [1.0, -0.11, 1.0, -2.0]]) table_of_coef.index = ['Standard', 'LowFP', 'LowFN'] table_of_coef.index.name = "Metric" table_of_coef.columns = ['A_tp', 'A_fp', 'A_tn', 'A_fn'] # GO point = 0 if clear_anomalies_mode else -1 dict_cp_confusion = extract_cp_confusion_matrix(detecting_boundaries, prediction, point=point) Scores, Scores_perfect, Scores_null = [], [], [] for profile in ['Standard', 'LowFP', 'LowFN']: A_tp = table_of_coef['A_tp'][profile] A_fp = table_of_coef['A_fp'][profile] A_fn = table_of_coef['A_fn'][profile] score = 0 score += A_fp * len(dict_cp_confusion['FPs']) score += A_fn * len(dict_cp_confusion['FNs']) for fp_case_window in dict_cp_confusion['TPs']: set_times = dict_cp_confusion['TPs'][fp_case_window] score += scale_func(set_times, A_tp, A_fp, koef=scale_koef) Scores.append(score) Scores_perfect.append(len(detecting_boundaries) * A_tp) Scores_null.append(len(detecting_boundaries) * A_fn) return np.array([np.array(Scores), np.array(Scores_null), np.array(Scores_perfect)])