import pandas as pd
import numpy as np
[docs]def filter_detecting_boundaries(detecting_boundaries):
"""
[[t1,t2],[],[t1,t2]] -> [[t1,t2],[t1,t2]]
[[],[]] -> []
"""
_detecting_boundaries = []
for couple in detecting_boundaries.copy():
if len(couple)!=0:
_detecting_boundaries.append(couple)
detecting_boundaries = _detecting_boundaries
return detecting_boundaries
[docs]def single_detecting_boundaries(true_series, true_list_ts, prediction,
portion, window_width,
anomaly_window_destination, intersection_mode):
"""
Extract detecting_boundaries from series or list of timestamps
"""
if (true_series is not None) and (true_list_ts is not None):
raise Exception('Choose the ONE type')
elif true_series is not None:
true_timestamps = true_series[true_series==1].index
elif true_list_ts is not None:
if len(true_list_ts) == 0:
return [[]]
else:
true_timestamps = true_list_ts
else:
raise Exception('Choose the type')
#
detecting_boundaries=[]
td = pd.Timedelta(window_width) if window_width is not None else \
pd.Timedelta((prediction.index[-1]-prediction.index[0])/(len(true_timestamps)+1)*portion)
for val in true_timestamps:
if anomaly_window_destination == 'lefter':
detecting_boundaries.append([val - td, val])
elif anomaly_window_destination == 'righter':
detecting_boundaries.append([val, val + td])
elif anomaly_window_destination == 'center':
detecting_boundaries.append([val - td/2, val + td/2])
else:
raise('choose anomaly_window_destination')
# block for resolving intersection problem:
# important to watch right boundary to be never included to avoid windows intersection
if len(detecting_boundaries)==0:
return detecting_boundaries
new_detecting_boundaries = detecting_boundaries.copy()
intersection_count = 0
for i in range(len(new_detecting_boundaries)-1):
if new_detecting_boundaries[i][1] >= new_detecting_boundaries[i+1][0]:
# transform print to list of intersections
# print(f'Intersection of scoring windows {new_detecting_boundaries[i][1], new_detecting_boundaries[i+1][0]}')
intersection_count += 1
if intersection_mode == 'cut left window':
new_detecting_boundaries[i][1] = new_detecting_boundaries[i+1][0]
elif intersection_mode == 'cut right window':
new_detecting_boundaries[i+1][0] = new_detecting_boundaries[i][1]
elif intersection_mode == 'cut both':
_a = new_detecting_boundaries[i][1]
new_detecting_boundaries[i][1] = new_detecting_boundaries[i+1][0]
new_detecting_boundaries[i+1][0] = _a
else:
raise Exception("choose the intersection_mode")
# print(f'There are {intersection_count} intersections of scoring windows')
detecting_boundaries = new_detecting_boundaries.copy()
return detecting_boundaries
[docs]def check_errors(my_list):
"""
Check format of input true data
Parameters
----------
my_list - uniform format of true (See evaluating.evaluating)
Returns
----------
mx : depth of list, or variant of processing
"""
assert isinstance(my_list, list)
mx = 1
# ravel = []
level_list = {}
def check_error(my_list):
return not (
(all(isinstance(my_el, list) for my_el in my_list)) or
(all(isinstance(my_el, pd.Series) for my_el in my_list)) or
(all(isinstance(my_el, pd.Timestamp) for my_el in my_list))
)
def recurse(my_list, level=1):
nonlocal mx
nonlocal level_list
if check_error(my_list):
raise Exception(f"Non uniform data format in level {level}: {my_list}")
if level not in level_list.keys():
level_list[level] = [] # for checking format
for my_el in my_list:
level_list[level].append(my_el)
if isinstance(my_el, list):
mx = max([mx, level+1])
recurse(my_el, level+1)
recurse(my_list)
for level in level_list:
if check_error(level_list[level]):
raise Exception(f"Non uniform data format in level {level}: {my_list}")
if 3 in level_list:
for el in level_list[2]:
if not( (len(el)==2) or (len(el)==0) ):
raise Exception(f"Non uniform data format in level {2}: {my_list}")
return mx