import pandas as pd
from ..base.task import Task, TaskResult
from .eda import TimeDiscretizationResult
[docs]class ValueRangeProcessingResult(TaskResult):
min_values : pd.Series
max_values : pd.Series
[docs] def show(self) -> None:
pass
[docs]class ValueRangeProcessingTask(Task):
"""
Class of the data preprocessing task in the field of value range processing.
"""
def __init__(self,
name: str | None = None,
method: str | None = 'auto',
show: bool | None = True):
"""
This is a task that represents a class for value range processing with some meta-information
stored in ValueRangeProcessingResult.
Parameters
----------
method : str, default 'auto'
The method of value range processing. If 'auto', then the method is set to 'MinMax'.
show : bool, default True
Whether to show the warning message if some values are out of the allowed range.
"""
super().__init__(name)
self.show = show
self.method = method
def _check_intervals(self, df: pd.DataFrame, result: ValueRangeProcessingResult):
"""
Check if the values in the dataframe are within the allowed range and replace the values outside the range with NaN.
Parameters
----------
df : pd.DataFrame
Input dataset
result : ValueRangeProcessingResult
Result of the ValueRangeProcessingTask
Returns
-------
pd.DataFrame
Dataframe with values outside the allowed range replaced with NaN
"""
import numpy as np
mask_fault_mode = (df<result.min_values) | (df>result.max_values)
if mask_fault_mode.sum().sum()!=0:
import warnings
if result.show:
warnings.warn("Some values are out of the allowed range:")
which_columns = mask_fault_mode.sum()!=0
which_columns = which_columns[which_columns].index
for col in which_columns:
display(df[col][mask_fault_mode[col]])
else:
warnings.warn("Some values are out of the allowed range. Set show=True to see more details""")
if result.show:
print('Values outside the allowed range will be replaced with NaN')
df[mask_fault_mode] = np.nan
return df
[docs] def fit_predict(self, df: pd.DataFrame) -> tuple[pd.DataFrame, TaskResult]:
"""
Fit the ValueRangeProcessingTask.
The method finds the minimum and maximum values of the input dataframe and replaces the values outside the allowed range with NaN.
Parameters
----------
df : pd.DataFrame
Input dataset
Returns
-------
tuple[pd.DataFrame, TaskResult]
Dataframe with values outside the allowed range replaced with NaN and ValueRangeProcessingResult
"""
result = ValueRangeProcessingResult()
method = self.method
if method=='auto':
method = 'MinMax'
if method == 'MinMax':
result.min_values = df.min()
result.max_values = df.max()
# if method =='Interquartile range'
# mask = ((new_df < new_df.quantile(0.4)) | (new_df > new_df.quantile(0.8)))
# df = data
# min_values = df.median() + (df.max() - df.median())*2
# max_values = df.median() - (df.median() - df.min())*2
df = self._check_intervals(df, result)
return df, result
[docs] def predict(self, df: pd.DataFrame, vrp_result: ValueRangeProcessingResult) -> tuple[pd.DataFrame, TaskResult]:
"""
Predict using the ValueRangeProcessingTask.
The method replaces the values outside the allowed range with NaN.
Parameters
----------
df : pd.DataFrame
Input dataset
vrp_result : ValueRangeProcessingResult
Result of the ValueRangeProcessingTask
Returns
-------
tuple[pd.DataFrame, TaskResult]
Dataframe with values outside the allowed range replaced with NaN and ValueRangeProcessingResult
"""
df = self._check_intervals(df, vrp_result)
return df, vrp_result
[docs]class ResampleProcessingResult(TaskResult):
freq_tobe : str # can be pd.Timedelta or str
[docs] def show(self) -> None:
pass
[docs]class ResampleProcessingTask(Task):
"""
Class of the data preprocessing task in the field of resampling.
This is a task that represents a class for resampling the frequency of the time index
with some meta-information stored in ResampleProcessingResult.
Parameters
----------
freq_tobe : pd.Timedelta | str, default None
The desired frequency of the time index. If None,
the frequency is taken from the results of the TimeDiscretizationTask.
"""
def __init__(self,
name: str | None = None,
freq_tobe: str | None = None,
):
super().__init__(name)
self.freq_tobe = freq_tobe
[docs] def fit_predict(self, df: pd.DataFrame, time_result:TimeDiscretizationResult) -> tuple[pd.DataFrame, ResampleProcessingResult]:
"""
Fit the ResampleProcessingTask.
The method resamples the input dataframe according to freq_tobe or the frequency found in the TimeDiscretizationTask.
Parameters
----------
df : pd.DataFrame
Input dataset
time_result : TimeDiscretizationResult
Result of the TimeDiscretizationTask
Returns
-------
tuple[pd.DataFrame, ResampleProcessingResult]
Resampled dataframe and ResampleProcessingResult
"""
result = ResampleProcessingResult()
if self.freq_tobe is None:
result.freq_tobe = time_result.freq_tobe
else:
result.freq_tobe = self.freq_tobe
df = df.resample(result.freq_tobe).mean()
return df, result
[docs] def predict(self, df: pd.DataFrame, rp_result: ResampleProcessingResult) -> tuple[pd.DataFrame, TaskResult]:
"""
Predict using the ResampleProcessingTask.
The method resamples the input dataframe according to the frequency found in the ResampleProcessingResult.
Parameters
----------
df : pd.DataFrame
Input dataset
rp_result : ResampleProcessingResult
Result of the ResampleProcessingTask
Returns
-------
tuple[pd.DataFrame, TaskResult]
Resampled dataframe and ResampleProcessingResult
"""
df = df.resample(rp_result.freq_tobe).mean()
return df, rp_result
[docs]class SplitByNaNResult(TaskResult):
"""The results of the SplitByNaNTask task:
Nothing is saved or displayed as a result of this task.
"""
[docs] def show(self) -> None:
pass
[docs]class SplitByNaNTask(Task):
"""
Class for preprocessing data by splitting the original dataset
into separate datasets based on the continuity of the data.
A class of data preprocessing task in terms of dividing the original sample
into separate minidatasets according to the principle of data continuity.
That is, the original df is divided into samples of the maximum length,
by the gaps. These gaps, due to which the dataset is broken, are completely
will be removed, that is, the continuous part to the left and to the right of
the gap there give us 2 minidatasets that were obtained splitting by this gap.
Needed for working with sequences due to the requirement:
the absence of gaps and the same sampling frequency.
Parameters
----------
name : str | None
The name of the task.
"""
def __init__(self,
name: str | None = None,
):
super().__init__(name)
[docs] def fit_predict(self, df: pd.DataFrame,time_result:ResampleProcessingResult) -> tuple[pd.DataFrame, SplitByNaNResult]:
"""
Fits the SplitByNaNTask.
Parameters
----------
df : pd.DataFrame
The input dataset.
time_result: ResampleProcessingResult
The result of the ResampleProcessingTask,
mainly due to the freq_tobe argument,
which is needed for splitting.
Returns
-------
dfs : list[pd.DataFrame]
The output datasets that satisfy the continuity condition.
result : SplitByNaNResult
The object that stores the results of the SplitByNaNTask.
"""
result = SplitByNaNResult()
from ..utils.preproc import df2dfs
dfs = df2dfs(df,resample_freq = time_result.freq_tobe)
return dfs, result
[docs] def predict(self, df: pd.DataFrame, result: SplitByNaNResult,time_result:ResampleProcessingResult) -> tuple[pd.DataFrame, SplitByNaNResult]:
"""
Predicts by SplitByNaNTask.
Parameters
----------
df : pd.DataFrame
The input dataset.
time_result: ResampleProcessingResult
Need freq_tobe parametr for splitting, which is stored
in the ResampleProcessingTask.
Returns
-------
dfs : list[pd.DataFrame]
The output datasets that satisfy the continuity condition.
result : SplitByNaNResult
The object that stores the results of the SplitByNaNTask.
"""
from ..utils.preproc import df2dfs
dfs = df2dfs(df,resample_freq = time_result.freq_tobe)
return dfs, result
[docs]class PrepareSeqSamplesResult(TaskResult):
"""The results of the PrepareSeqSamplesTask task:
Nothing is saved or displayed as a result of this task.
"""
[docs] def show(self) -> None:
pass
[docs]class PrepareSeqSamplesTask(Task):
"""
Class of the data preprocessing task in the part of preparing samples
in the form of a sequence for sequence processing algorithms
and train test spitting execution.
Parameters
----------
name : (str | None)
Optional name for the task.
Next parameters is used in ts_train_test_split function of utils subpackage.
https://tsad.readthedocs.io/en/latest/tsad.utils.html#tsad.utils.trainTestSplitting.ts_train_test_split
len_seq : int, default=10
Length of the sequence, which is used to predict the next point/points.
points_ahead : int, default=0
How many points ahead we predict, reflected in y
gap : int, default=0
The gap between last point of sequence, which we used as input
for prediction and first point of potential model output sequence
(prediction).If the last point of input sequence is t, then the
first point of the output sequence is t + gap +1. The parameter
is designed to be able to predict sequence after a additional time
interval.
step : int, default=1.
Sample generation step. If the first point was t for
the 1st sample (sequence) of the train, then for the 2nd sample
(sequence) of the train it will be t + step if intersection=True,
otherwise the same but without intersections of the series values.
intersection : bool, default=True
The presence of one point in time in different samples (sequences)
for the train set and and separately for the test test.
If True, the train and the test never have common time points.
test_size : float or int or timestamp for df, or list of timestamps, default=0.25.
The size of the test set.
- If float, should be between 0.0 and 1.0 and represent the proportion
of the dataset to include in the test split.
- If int, represents the absolute number of test samples. If None, the value is set to the
complement of the train size.
- If 0, then it will return the X,y values in X_train, y_train.
- If timestamp, for X_test we will use set from df[t:]
- If list of timestamps [t1,t2], for X_test we will use set from df[t1:t2]
- If ``train_size`` is None, it will be set to 0.25. *
train_size : float or int, default=None.
The size of the train set.
- If float, should be between 0.0 and 1.0 and represent the
proportion of the dataset to include in the train split.
- If int, represents the absolute number of train samples.
- If 0, then it will return the X,y values in X_test, y_test.
- If timestamp for df, for X_train we will use set for train from df[:t]
- If list of timestamps [t1,t2], for X_train we will use set for train from df[t1:t2]
- If None,the value is automatically set to the complement of the test size.
what_to_shuffle: {'nothing', 'all','train'}, str. Default = 'train'.
- If 'train' we random shuffle only X_train, and y_train.
Test samples are unused for the shuffle. Any sample from X_test is later
than any sample from X_train. This is also true for respectively
- If 'all' in analogy with sklearn.model_selection.train_test_split
- If 'nothing' shuffle is not performed.
random_state : int, RandomState instance or None, default=None
Controls the shuffling applied to the data before applying the split.
Pass an int for reproducible output across multiple function calls.
See :term:`Glossary <random_state>`.*
"""
def __init__(self,
name: str | None = None,
len_seq=10, points_ahead=1, gap=0, step=1, intersection=True,
test_size=None,train_size=None, random_state=None, what_to_shuffle='train'
):
super().__init__(name)
self.kwargs= {}
self.kwargs['len_seq'] = len_seq
self.kwargs['points_ahead'] = points_ahead
self.kwargs['gap'] = gap
self.kwargs['step'] = step
self.kwargs['intersection'] = intersection
self.kwargs['test_size'] = test_size
self.kwargs['train_size'] = train_size
self.kwargs['random_state'] = random_state
self.kwargs['what_to_shuffle'] = what_to_shuffle
[docs] def fit_predict(self, df: pd.DataFrame | list[pd.DataFrame]) -> tuple[pd.DataFrame | list[pd.DataFrame], PrepareSeqSamplesResult]:
"""
Fit the PrepareSeqSamplesTask.
Parameters
----------
dfs : pd.DataFrame | list[pd.DataFrame]
The input dataset / input datasets
Returns
-------
dfs : pd.DataFrame | list[pd.DataFrame] | list[list[pd.DataFrame]]
The output datasets of sequences
result : PrepareSeqSamplesResult
The object that stores the results of the PrepareSeqSamplesTask task
"""
result = PrepareSeqSamplesResult()
from ..utils.trainTestSplitting import ts_train_test_split_dfs
df = ts_train_test_split_dfs(df,**self.kwargs)
return df, result
[docs] def predict(self, df: pd.DataFrame, result: PrepareSeqSamplesResult) -> tuple[pd.DataFrame, PrepareSeqSamplesResult]:
"""
Predict by PrepareSeqSamplesTask.
Parameters
----------
df : pd.DataFrame | list[pd.DataFrame]
The input dataset / input datasets
result : PrepareSeqSamplesResult
The object that stores the results of the PrepareSeqSamplesTask task
Returns
-------
df : pd.DataFrame | list[pd.DataFrame] | list[list[pd.DataFrame]]
The output datasets of sequences
result : PrepareSeqSamplesResult
The object that stores the results of the PrepareSeqSamplesTask task
"""
from ..utils.trainTestSplitting import ts_train_test_split_dfs
self.kwargs['test_size'] = 0
self.kwargs['what_to_shuffle'] = 'nothing'
df = ts_train_test_split_dfs(df,**self.kwargs)
return df, result