Source code for tsad.base.pipeline

import inspect
import logging
import pandas as pd

from enum import Enum

from .task import Task, TaskResult
from .exceptions import ArgumentNotFoundException, UnsupportedTaskResultException


[docs]class PipelineMode(Enum): FIT_PREDICT = "FIT_PREDICT" PREDICT = "PREDICT"
[docs]class Pipeline(): """ ## Pipeline The `Pipeline` class represents a data processing pipeline that consists of multiple tasks. It allows for fitting the pipeline and predict on a training dataset and making predictions on a test dataset. ### Parameters - `tasks` (list[Task]): List of tasks to be executed in the pipeline. - `results` (list[TaskResult], optional): List of task results that should be stored and accessible for annotation in later tasks. Default is `None`. - `show` (bool, optional): Specifies whether to show the annotated task results during pipeline execution. Default is `False`. ### Attributes - `mode` (PipelineMode): The current mode of the pipeline. Can be "FIT_PREDICT" or "PREDICT". - `run_arguments` (dict[str, any]): The arguments passed to the `fit_predict` or `predict` method. ### Methods #### \_\_init\_\_(tasks: List[Task], results: List[TaskResult] = None, show: bool = False) -> None Initializes a new instance of the `Pipeline` class. Parameters: - `tasks` (list[Task]): List of tasks to be executed in the pipeline. - `results` (list[TaskResult], optional): List of task results that should be stored and accessible for annotation in later tasks. Default is `None`. - `show` (bool, optional): Specifies whether to show the annotated task results during pipeline execution. Default is `False`. #### \_get_result_by_type(result_type) -> TaskResult Returns the task result of a specified type from the `results` list. Parameters: - `result_type` (TaskResult): The type of the task result to retrieve. Returns: - `TaskResult`: The task result of the specified type. Raises: - `Exception`: If the required task result of the specified type cannot be found in the `results` list. - `Exception`: If multiple task results of the specified type are found in the `results` list. #### \_annotate_task_results(object_to_annotate) -> None Annotates the specified object with the task results. Parameters: - `object_to_annotate`: The object to annotate with the task results. #### \_create_method_parameters(method, df: pd.DataFrame) -> dict Creates a dictionary of method parameters for a task. Parameters: - `method`: The method for which to create the parameters. - `df` (pd.DataFrame): The input DataFrame for the task. Returns: - `dict`: The dictionary of method parameters. #### \_run(df: pd.DataFrame, \*\*params) -> pd.DataFrame Runs the pipeline on the specified DataFrame. Parameters: - `df` (pd.DataFrame): The input DataFrame for the pipeline. - `params` (keyword arguments): Additional parameters to be passed to the pipeline. Returns: - `pd.DataFrame`: The resulting DataFrame after applying all tasks in the pipeline. Raises: - `Exception`: If the pipeline mode is not supported. #### fit_predict(df: pd.DataFrame, \*\*params) -> pd.DataFrame Fits and predicts the pipeline on the specified training DataFrame. Parameters: - `df` (pd.DataFrame): The training DataFrame for fitting the pipeline and predict. - `params` (keyword arguments): Additional parameters to be passed to the pipeline. Returns: - `pd.DataFrame`: The resulting DataFrame after applying all tasks in the pipeline. #### predict(df: pd.DataFrame, \*\*params) -> pd.DataFrame Makes predictions using the fitted pipeline on the specified test DataFrame. Parameters: - `df` (pd.DataFrame): The test DataFrame for making predictions. - `params` (keyword arguments): Additional parameters to be passed to the pipeline. Returns: - `pd.DataFrame`: The resulting DataFrame of predictions. """ mode: PipelineMode run_arguments: dict[str, any] def __init__(self, tasks: list[Task], results: list[TaskResult] = None, show: bool = False) -> None: self.tasks = tasks self.results = list() if results: self.results = results self.show = show def _get_result_by_type(self, result_type) -> TaskResult: results = [x for x in self.results if isinstance(x, result_type)] if not results: raise Exception(f'Can\'t find required task result of {result_type.__name__} in Pipeline.') if len(results) > 1: raise Exception('Multiple task results find!') return results[0] def _annotate_task_results(self, object_to_annotate) -> None: if '__annotations__' in vars(type(object_to_annotate)): for annotation_name, annotation_type in vars(type(object_to_annotate))['__annotations__'].items(): if not annotation_type: continue if issubclass(annotation_type, TaskResult): result = self._get_result_by_type(annotation_type) setattr(object_to_annotate, annotation_name, result) def _create_method_parameters(self, method, df: pd.DataFrame) -> dict: arguments = dict() signature = inspect.signature(method) (first_name, first_parameter), *parameters = signature.parameters.items() # TODO: Check first argument is DataFrame arguments[first_name] = df for name, parameter in parameters: if issubclass(parameter.annotation, TaskResult): arguments[name] = self._get_result_by_type(parameter.annotation) elif name in self.run_arguments: arguments[name] = self.run_arguments[name] elif not parameter.kind == inspect.Parameter.VAR_KEYWORD and parameter.default == signature.empty: raise ArgumentNotFoundException(f'Unable to inject named argument {name}. Add it to fit_predict/predict Pipeline method or set default value.') return arguments def _run(self, df: pd.DataFrame, **params) -> pd.DataFrame: self.run_arguments = params task_df = df for task in self.tasks: task.mode = self.mode self._annotate_task_results(task) if self.mode == PipelineMode.FIT_PREDICT: parameters = self._create_method_parameters(task.fit_predict, task_df) task_result = task.fit_predict(**parameters) elif self.mode == PipelineMode.PREDICT: parameters = self._create_method_parameters(task.predict, task_df) task_result = task.predict(**parameters) else: raise Exception("Not supported pipeline mode.") if task_result is None: continue elif isinstance(task_result, pd.DataFrame | list): (result_df, result) = (task_result, None) elif isinstance(task_result, tuple): (result_df, result) = task_result else: raise UnsupportedTaskResultException(f'{type(task_result)} in {type(task)}') task_df = result_df.copy() if not result: continue if not any(result == r for r in self.results): self.results.append(result) if self.show: self._annotate_task_results(result) result.show() return task_df
[docs] def fit_predict(self, df: pd.DataFrame, **params) -> pd.DataFrame: self.mode = PipelineMode.FIT_PREDICT return self._run(df, **params)
[docs] def predict(self, df: pd.DataFrame, **params) -> pd.DataFrame: self.mode = PipelineMode.PREDICT return self._run(df, **params)