Source code for tsad.utils.evaluating.evaluating

"""
Evaluating module
"""

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec

from .univariate_funcs import confusion_matrix, single_average_delay, single_evaluate_nab
from .src import single_detecting_boundaries, check_errors, extract_cp_confusion_matrix


[docs]def evaluating(true, prediction, metric='nab', window_width=None, portion=0.1, anomaly_window_destination='lefter', clear_anomalies_mode=True, intersection_mode='cut right window', table_of_coef=None, scale_func="improved", scale_koef=1, plot_figure=False, verbose=True ): """ Parameters ---------- true: variants: or: if one dataset : pd.Series with binary int labels (1 is anomaly, 0 is not anomaly); or: if one dataset : list of pd.Timestamp of true labels, or [] if haven't labels ; or: if one dataset : list of list of t1,t2: left and right detection, boundaries of pd.Timestamp or [[]] if haven't labels or: if many datasets: list (len of number of datasets) of pd.Series with binary int labels; or: if many datasets: list of list of pd.Timestamp of true labels, or true = [ts,[]] if haven't labels for specific dataset; or: if many datasets: list of list of list of t1,t2: left and right detection boundaries of pd.Timestamp; If we haven't true labels for specific dataset then we must insert empty list of labels: true = [[[]],[[t1,t2],[t1,t2]]]. __True labels of anomalies or changepoints. It is important to have appropriate labels (CP or anomaly) for corresponding metric (See later "metric") prediction: variants: or: if one dataset : pd.Series with binary int labels (1 is anomaly, 0 is not anomaly); or: if many datasets: list (len of number of datasets) of pd.Series with binary int labels. __Predicted labels of anomalies or changepoints. It is important to have appropriate labels (CP or anomaly) for corresponding metric (See later "metric") metric: {'nab', 'binary', 'average_time', 'confusion_matrix'}. Default='nab' Affects to output (see later: Returns) Changepoint problem: {'nab', 'average_time'}. Standard AD problem: {'binary', 'confusion_matrix'}. 'nab' is Numenta Anomaly Benchmark metric 'average_time' is both average delay or time to failure depend on situation. 'binary': FAR, MAR, F1. 'confusion_matrix' standard confusion_matrix for any point. window_width: 'str' for pd.Timedelta Width of detection window. Default=None. portion : float, default=0.1 The portion is needed if window_width = None. The width of the detection window in this case is equal to a portion of the width of the length of prediction divided by the number of real CPs in this dataset. Default=0.1. anomaly_window_destination: {'lefter', 'righter', 'center'}. Default='right' The parameter of the location of the detection window relative to the anomaly. 'lefter' : the detection window will be on the left side of the anomaly 'righter' : the detection window will be on the right side of the anomaly 'center' : the scoring window will be positioned relative to the center of anom. clear_anomalies_mode : boolean, default=True. True : then the `left value of a Scoring function is Atp and the `right is Afp. Only the `first value inside the detection window is taken. False: then the `right value of a Scoring function is Atp and the `left is Afp. Only the `last value inside the detection window is taken. intersection_mode: {'cut left window', 'cut right window', 'both'}. Default='cut right window' The parameter will be used if the detection windows overlap for true changepoints, which is generally undesirable and requires a different approach than simply cropping the scoring window using this parameter. 'cut left window' : will cut the overlapping part of the left window 'cut right window': will cut the intersecting part of the right window 'both' : will crop the intersecting portion of both the left and right windows verbose: boolean, default=True. If True, then output useful information plot_figure : boolean, default=False. If True, then drawing the score fuctions, detection windows and predictions It is used for example, for calibration the scale_koef. table_of_coef (metric='nab'): pd.DataFrame of specific form. See bellow. Application profiles of NAB metric.If Default is None: table_of_coef = pd.DataFrame([[1.0,-0.11,1.0,-1.0], [1.0,-0.22,1.0,-1.0], [1.0,-0.11,1.0,-2.0]]) table_of_coef.index = ['Standard','LowFP','LowFN'] table_of_coef.index.name = "Metric" table_of_coef.columns = ['A_tp','A_fp','A_tn','A_fn'] scale_func (metric='nab'): "default" of "improved". Default="improved". Scoring function in NAB metric. 'default' : standard NAB scoring function 'improved' : Our function for resolving disadvantages of standard NAB scoring function scale_koef : float > 0. Default=1.0. Smoothing factor. The smaller it is, the smoother the scoring function is. Returns ---------- metrics : value of metrics, depend on metric 'nab': tuple - Standard profile, float - Low FP profile, float - Low FN profile 'average_time': tuple - Average time (average delay, or time to failure) - Missing changepoints, int - FPs, int - Number of true changepoints, int 'binary': tuple - F1 metric, float - False alarm rate, %, float - Missing Alarm Rate, %, float 'binary': tuple - TPs, int - TNs, int - FPs, int - FNS, int """ assert isinstance(true, pd.Series) or isinstance(true, list) # checking prediction if isinstance(prediction, pd.Series): true = [true] prediction = [prediction] elif isinstance(prediction, list): if not all(isinstance(my_el, pd.Series) for my_el in prediction): raise Exception('Incorrect format for prediction') else: raise Exception('Incorrect format for prediction') # checking dataset length: Number of dataset unequal assert len(true) == len(prediction) # final check input_variant = check_errors(true) def check_sort(my_list, input_variant): for dataset in my_list: if input_variant==2: assert all(np.sort(dataset)==np.array(dataset)) elif input_variant==3: assert all(np.sort(np.concatenate(dataset))==np.concatenate(dataset)) elif input_variant==1: assert all(dataset.index.values == dataset.sort_index().index.values) check_sort(true, input_variant) check_sort(prediction, 1) # part 2. To detected boundaries if ((metric == 'nab') or (metric == 'average_time')) and (window_width is None) and (input_variant != 3): print( f"Since you didn't choose window_width and portion, portion will be default ({portion})") if input_variant == 1: detecting_boundaries = [single_detecting_boundaries(true_series=true[i], true_list_ts=None, prediction=prediction[i], window_width=window_width, portion=portion, anomaly_window_destination=anomaly_window_destination, intersection_mode=intersection_mode) for i in range(len(true))] elif input_variant == 2: detecting_boundaries = [single_detecting_boundaries(true_series = None, true_list_ts=true[i], prediction=prediction[i], window_width=window_width, portion=portion, anomaly_window_destination=anomaly_window_destination, intersection_mode=intersection_mode) for i in range(len(true))] elif input_variant == 3: detecting_boundaries = true.copy() # Next anti fool system [[[t1,t2]],[]] -> [[[t1,t2]],[[]]] for i in range(len(detecting_boundaries)): if len(detecting_boundaries[i])==0: detecting_boundaries[i]=[[]] else: raise Exception('Unknown format for true data') # part 3. To compute metric if plot_figure: num_datasets = len(true) if ((metric=='binary') or (metric=='confusion_matrix')) \ and (input_variant==1): f = plt.figure(figsize=(16,5*num_datasets)) grid = gridspec.GridSpec(num_datasets, 1) for i in range(num_datasets): globals()['ax'+str(i)] = f.add_subplot(grid[i]) prediction[i].plot(ax=globals()['ax'+str(i)],label='pred',marker='o') true[i].plot(ax=globals()['ax'+str(i)],label='true',marker='o') globals()['ax'+str(i)].legend() plt.show() else: from .univariate_funcs import my_scale f = plt.figure(figsize=(16,5*num_datasets)) grid = gridspec.GridSpec(num_datasets, 1) detalization = 100 for i in range(num_datasets): globals()['ax'+str(i)] = f.add_subplot(grid[i]) print_legend_boundary=True def plot_cp(couple, anomaly_window_destination, ax, label): if anomaly_window_destination== 'lefter': ax.axvline(couple[1],c='r',label=label) elif anomaly_window_destination== 'righter': ax.axvline(couple[0],c='r',label=label) elif anomaly_window_destination== 'center': ax.axvline(couple[0]+((couple[1]-couple[0])/2),c='r',label=label) for couple in detecting_boundaries[i]: if len(couple)>0: globals()['ax'+str(i)].axvspan(couple[0],couple[1], alpha=0.5, color='green', label='detection \nboundary' if print_legend_boundary else None) nab = pd.Series(my_scale(plot_figure=True,detalization=detalization), index=pd.date_range(couple[0],couple[1],periods=detalization)) nab.plot(ax=globals()['ax'+str(i)], linewidth=0.4, color='brown', label='nab scoring func' if print_legend_boundary else None) plot_cp(couple, anomaly_window_destination, globals()['ax' + str(i)], label='Changepoint' if print_legend_boundary else None) print_legend_boundary = False else: pass prediction[i].plot(ax=globals()['ax'+str(i)],label='pred', marker='o') globals()['ax'+str(i)].legend() plt.show() if metric=='nab': matrix = np.zeros((3,3)) for i in range(len(prediction)): matrix_ = single_evaluate_nab(detecting_boundaries[i], prediction[i], table_of_coef=table_of_coef, clear_anomalies_mode = clear_anomalies_mode, scale_func = scale_func, scale_koef=scale_koef, plot_figure=plot_figure) matrix = matrix + matrix_ results = {} desc = ['Standard', 'LowFP', 'LowFN'] for t, profile_name in enumerate(desc): results[profile_name] = round(100*(matrix[0,t]-matrix[1,t])/(matrix[2,t]-matrix[1,t]), 2) if verbose: print(profile_name, ' - ', results[profile_name]) return results elif metric == 'average_time': missing, detectHistory, FP, all_true_anom = 0, [], 0, 0 for i in range(len(prediction)): missing_, detectHistory_, FP_, all_true_anom_ = single_average_delay(detecting_boundaries[i], prediction[i], anomaly_window_destination=anomaly_window_destination, clear_anomalies_mode=clear_anomalies_mode) missing, detectHistory, FP, all_true_anom = missing+missing_, detectHistory+detectHistory_, FP+FP_, all_true_anom+all_true_anom_ add = np.mean(detectHistory) if verbose: print('Amount of true anomalies',all_true_anom) print(f'A number of missed CPs = {missing}') print(f'A number of FPs = {int(FP)}') print('Average time', add) return add, missing, int(FP), all_true_anom elif (metric == 'binary') or (metric == 'confusion_matrix'): if all(isinstance(my_el, pd.Series) for my_el in true): TP,TN,FP,FN = 0,0,0,0 for i in range(len(prediction)): TP_,TN_,FP_,FN_ = confusion_matrix(true[i],prediction[i]) TP,TN,FP,FN = TP+TP_,TN+TN_,FP+FP_,FN+FN_ else: print('For this metric it is better if you use pd.Series format for true \nwith common index of true and prediction') TP,TN,FP,FN = 0,0,0,0 for i in range(len(prediction)): dict_cp_confusion = extract_cp_confusion_matrix(detecting_boundaries[i], prediction[i], binary=True) TP+=np.sum([len(dict_cp_confusion['TPs'][window][1]) for window in dict_cp_confusion['TPs']]) FP+=len(dict_cp_confusion['FPs']) FN+=len(dict_cp_confusion['FNs']) TN+= len(prediction[i]) - TP - FP - FN if metric == 'binary': f1 = round(TP/(TP+(FN+FP)/2), 2) far = round(FP/(FP+TN)*100,2) mar = round(FN/(FN+TP)*100,2) if verbose: print(f'False Alarm Rate {far} %' ) print(f'Missing Alarm Rate {mar} %') print(f'F1 metric {f1}') return f1, far, mar elif metric == 'confusion_matrix': if verbose: print('TP',TP) print('TN',TN) print('FP',FP) print('FN',FN) return TP, TN, FP, FN else: raise Exception("Choose the performance metric")