diff --git a/.gitignore b/.gitignore index 7d2ba47..ec2de90 100644 --- a/.gitignore +++ b/.gitignore @@ -19,4 +19,4 @@ __pycache__/ *.swp .vscode/settings.json -Test-bears.ipynb \ No newline at end of file +*.ipynb \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 916616e..6d47e95 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,6 +34,7 @@ dependencies = [ "tqdm", "boto3", "cloudpickle>=3.0.0", + "scikit-learn", ] [project.optional-dependencies] diff --git a/src/bears/__init__.py b/src/bears/__init__.py index 355649f..0f62b3e 100644 --- a/src/bears/__init__.py +++ b/src/bears/__init__.py @@ -10,6 +10,7 @@ from bears.core.frame.ScalableSeries import ScalableSeries, ScalableSeriesRawType,ScalableSeriesOrRaw from bears.reader import Reader from bears.writer import Writer +from bears.processor import DataProcessor, DataPipeline to_sdf = ScalableDataFrame.of to_ss = ScalableSeries.of diff --git a/src/bears/processor/_DataPipeline.py b/src/bears/processor/_DataPipeline.py new file mode 100644 index 0000000..8a8032e --- /dev/null +++ b/src/bears/processor/_DataPipeline.py @@ -0,0 +1,1134 @@ +import copy +import io +import json +import time +from abc import ABC, abstractmethod +from collections import OrderedDict +from math import inf +from typing import Any, ClassVar, Dict, List, Optional, Set, Tuple, Type, Union + +import cloudpickle +import numpy as np +from autoenum import AutoEnum, auto +from pydantic import confloat, conint, constr, model_validator + +from bears import FileMetadata, ScalableDataFrame, ScalableDataFrameRawType +from bears.constants import ( + DataLayout, + FileContents, + MissingColumnBehavior, + MLType, + MLTypeSchema, + ProcessingMode, +) +from bears.reader import ConfigReader, Reader +from bears.util import ( + FractionalBool, + Log, + Parameters, + Registry, + String, + UserEnteredParameters, + as_list, + filter_string_list, + get_subset, + is_subset, + keep_keys, + keep_values, + measure_time_ms, + safe_validate_arguments, + type_str, +) +from bears.writer import DataFrameWriter, Writer + +from ._DataProcessor import DataProcessor +from ._Nto1ColumnProcessor import Nto1ColumnProcessor +from ._SingleColumnProcessor import SingleColumnProcessor + +DataPipeline = "DataPipeline" +DataPipelineStepProcessor = "DataPipelineStepProcessor" + + +class PersistLevel(AutoEnum): + DONT_PERSIST = auto() + BEFORE_PIPELINE = auto() + AFTER_PIPELINE = auto() + BEFORE_AFTER_PIPELINE = auto() + EVERY_PIPELINE_STEP = auto() + EVERY_PROCESSOR = auto() + + +class ProcessorPerf(Parameters): + start_time: confloat(ge=0.0) + processing_mode: ProcessingMode + input_columns: List[str] + output_columns: List[str] + data_processor_class_name: str + data_processor_params: Dict + persist_time_ms: Optional[confloat(ge=0.0)] + end_time: confloat(ge=0.0) + time_ms: Optional[confloat(ge=0.0)] + + @model_validator(mode="before") + @classmethod + def set_time_ms(cls, params): + params["time_ms"] = 1000 * (params["end_time"] - params["start_time"]) + return params + + +class PipelineStepPerf(Parameters): + start_time: confloat(ge=0.0) + processing_mode: ProcessingMode + num_rows_processed: Optional[conint(ge=1)] + length_calculation_ms: Optional[confloat(ge=0.0)] + processor_perfs: List[ProcessorPerf] + persist_time_ms: Optional[confloat(ge=0.0)] + end_time: confloat(ge=0.0) + time_ms: Optional[confloat(ge=0.0)] + + @model_validator(mode="before") + @classmethod + def set_time_ms(cls, params): + params["time_ms"] = 1000 * (params["end_time"] - params["start_time"]) + return params + + +class PipelineWriterPerf(Parameters): + start_time: confloat(ge=0.0) + input_columns: List[str] + writer_class_name: str + writer_params: Dict + end_time: confloat(ge=0.0) + time_ms: Optional[confloat(ge=0.0)] + + @model_validator(mode="before") + @classmethod + def set_time_ms(cls, params): + params["time_ms"] = 1000 * (params["end_time"] - params["start_time"]) + return params + + +class ProcessingPipelinePerf(Parameters): + processing_mode: ProcessingMode + persist: PersistLevel + is_input_ScalableDataFrame: bool + should_log_perf: bool + + start_time: confloat(ge=0.0) + layout_detection_time_ms: confloat(ge=0.0) + input_data_layout: DataLayout + + persist_read_time_ms: Optional[confloat(ge=0.0)] + + length_calculation_ms: Optional[confloat(ge=0.0)] + process_as: DataLayout + layout_conversion_ms: confloat(ge=0.0) + + pipeline_steps_compute_time_ms: Optional[confloat(ge=0.0)] + pipeline_step_perfs: Optional[List[PipelineStepPerf]] + persist_compute_time_ms: Optional[confloat(ge=0.0)] + + pipeline_write_time_ms: Optional[confloat(ge=0.0)] + pipeline_writer_perfs: Optional[List[PipelineWriterPerf]] + + num_rows_processed: conint(ge=1) + + end_time: confloat(ge=0.0) + time_ms: Optional[confloat(ge=0.0)] + + @model_validator(mode="before") + @classmethod + def set_time_ms(cls, params): + params["time_ms"] = 1000 * (params["end_time"] - params["start_time"]) + return params + + +class DataPipelineConfig(UserEnteredParameters): + """Structure in YAML file.""" + + class StepConfig(UserEnteredParameters): + input: Union[List[Union[MLType, str]], MLType, str] + output: constr(min_length=1, strip_whitespace=True) = "{col_name}" + params: Optional[Dict[str, Any]] = None + transformer: constr(min_length=1, strip_whitespace=True) ## data processor name + + class WriterConfig(UserEnteredParameters): + input: Union[List[Union[MLType, str]], MLType, str] + writer: constr(min_length=1, strip_whitespace=True) + params: Dict[str, Any] = {} + schema_override: MLTypeSchema = {} + + pipeline: List[StepConfig] = [] + writers_config: List[WriterConfig] = [] + + +class DataPipelineStepProcessor(Parameters, Registry, ABC): + data_processor_class: ClassVar[Type[DataProcessor]] + data_processor: DataProcessor + output_col_name: str + output_mltype: MLType + + @classmethod + def _registry_keys(cls) -> Optional[Union[List[Any], Any]]: + return [cls.data_processor_class, cls.data_processor_class.__name__] + + @classmethod + @abstractmethod + def create_pipeline_step_processors( + cls, + DataProcessorClass: Type[DataProcessor], + filtered_input_schema: MLTypeSchema, + name: str, + params: Dict, + output_pattern: str, + ) -> Dict[Union[str, Tuple[str]], DataPipelineStepProcessor]: + """ + Static factory to create a mapping from input column(s) to data processor instances and their output. + :param filtered_input_schema: input schema with only the relevant columns which we must transform. + Each key is a column name from the input data, and each value is its corresponding MLType. + :param name: name of the data processor(s). + :param params: dict of params to initialize the data processor(s). + :param output_pattern: used to name the output columns. + :return: Depending on the type of processor (1:1, N:1, etc), the returned map will have each key as a single + column or a tuple of columns. The value is the data processor instance which will transform a single column or + set of columns, respectively. + E.g. for 1:1 we might return: + { + "ASIN_STATIC_ITEM_NAME": + (<__TFIDFVectorization_at_87da792f>, 'ASIN_STATIC_ITEM_NAME_TFIDF_15000', MLType.VECTOR), + "ASIN_STATIC_BULLET_POINT": + (<__TFIDFVectorization_at_adf90eb8>, 'ASIN_STATIC_BULLET_POINT_TFIDF_15000', MLType.VECTOR) + } + E.g. for N:1 we might return: + { + ("ASIN_STATIC_ITEM_NAME", "ASIN_STATIC_BULLET_POINT"): + (<__TextConcatenation_at_92ba33e>, 'CONCATENATED_TEXT_COLUMNS', MLType.TEXT) + } + """ + pass + + @classmethod + @abstractmethod + def get_pipeline_step_output_schema( + cls, + input_schema: MLTypeSchema, + pipeline_step_processors: Dict[Union[str, Tuple[str]], DataPipelineStepProcessor], + ) -> MLTypeSchema: + """ + Obtains the output schema from the input data processors dict. + :param input_schema: schema with all columns in the current DataFrame. + :param pipeline_step_processors: map from column(s) which must be transformed, to data processor + and its outputs. This should be the output of a call to `create_pipeline_step_processors`. + :return: the updated output schema. Columns which are not in `pipeline_step_processors` are copied as-is. + For other columns this function will use the output column names and MLTypes in `pipeline_step_processors` + to add the corresponding columns to the input schema. This becomes the output schema which is returned. + """ + pass + + +class DataPipelineStepSingleColumnProcessor(DataPipelineStepProcessor): + data_processor_class: ClassVar[Type[DataProcessor]] = SingleColumnProcessor + + @classmethod + def create_pipeline_step_processors( + cls, + DataProcessorClass: Type[SingleColumnProcessor], + filtered_input_schema: MLTypeSchema, + name: str, + params: Dict, + output_pattern: str, + ) -> Dict[Union[str, Tuple[str]], DataPipelineStepProcessor]: + pipeline_step_processors: Dict[Union[str, Tuple[str]], DataPipelineStepProcessor] = {} + for input_col, input_mltype in filtered_input_schema.items(): + processor_input_schema: MLTypeSchema = {input_col: input_mltype} + data_processor: SingleColumnProcessor = DataProcessorClass( + name=name, + data_schema=processor_input_schema, + params=params, + ) + supported_input_mltypes: Tuple[MLType] = data_processor.input_mltypes + assert input_mltype in supported_input_mltypes, ( + f'"{str(input_mltype)}" not included in supported MLTypes: "{str(supported_input_mltypes)}"' + ) + ## For 1:1 data processors, the supported input MLType should be a list of MLTypes + if not all([isinstance(mltype, MLType) for mltype in supported_input_mltypes]): + raise AttributeError( + f"Supported input types for class {str(cls)} (1:1 data processor) " + + f"should be a list of MLTypes, not: {supported_input_mltypes}" + ) + ## Converts '{col_name}_XYZ' to 'MyCol_XYZ' but leaves 'XYZ' unchanged. + output_col_name = output_pattern.format(col_name=input_col) + output_mltype = data_processor.output_mltype + pipeline_step_processors[input_col] = cls( + data_processor=data_processor, + output_col_name=output_col_name, + output_mltype=output_mltype, + ) + return pipeline_step_processors + + @classmethod + def get_pipeline_step_output_schema( + cls, + input_schema: MLTypeSchema, + pipeline_step_processors: Dict[Union[str, Tuple[str]], DataPipelineStepProcessor], + ) -> MLTypeSchema: + output_schema = copy.deepcopy(input_schema) + for input_cols, step_processor in pipeline_step_processors.items(): + if step_processor.output_col_name is not None and step_processor.output_mltype is not None: + output_schema[step_processor.output_col_name] = step_processor.output_mltype + return output_schema + + +class DataPipelineStepNto1ColumnProcessor(DataPipelineStepProcessor): + data_processor_class: ClassVar[Type[DataProcessor]] = Nto1ColumnProcessor + + @classmethod + def create_pipeline_step_processors( + cls, + DataProcessorClass: Type[Nto1ColumnProcessor], + filtered_input_schema: MLTypeSchema, + name: str, + params: Dict, + output_pattern: str, + ) -> Dict[Union[str, Tuple[str]], DataPipelineStepProcessor]: + pipeline_step_processors: Dict[Union[str, Tuple[str]], DataPipelineStepProcessor] = {} + ## Sorted tuple of columns we want to pass to this data processor. + input_cols: Tuple[str] = tuple(filtered_input_schema.keys()) + if len(input_cols) > 0: + processor: Nto1ColumnProcessor = DataProcessorClass( + name=name, + data_schema=copy.deepcopy(filtered_input_schema), + params=params, + ) + supported_input_mltypes: Tuple[MLType, ...] = processor.input_mltypes + ## For N:1 data processors, the supported input MLType should be a list of MLTypes + if not all([isinstance(mltype, MLType) for mltype in supported_input_mltypes]): + raise AttributeError( + f"Supported input types for {str(cls)} (N:1 data processor) " + + f"should be a list of MLTypes, not: {supported_input_mltypes}" + ) + if not all([mltype in supported_input_mltypes for mltype in filtered_input_schema.values()]): + raise AttributeError( + f"MLTypes of selected columns passed to {str(cls)} (N:1 data processor) " + + "should be supported by this data processor. Supported types are " + + f"{supported_input_mltypes}, selected columns have MLTypes: " + + f"{list(filtered_input_schema.values())}" + ) + output_col_name = output_pattern ## Assume it does not have {col_name} in it. + output_mltype: MLType = processor.output_mltype + pipeline_step_processors[input_cols] = cls( + data_processor=processor, + output_col_name=output_col_name, + output_mltype=output_mltype, + ) + return pipeline_step_processors + + @classmethod + def get_pipeline_step_output_schema( + cls, + input_schema: MLTypeSchema, + pipeline_step_processors: Dict[Union[str, Tuple[str]], DataPipelineStepProcessor], + ) -> MLTypeSchema: + output_schema = copy.deepcopy(input_schema) + ## dict returned by create_pipeline_step_processors should have exactly one item. + assert len(pipeline_step_processors) <= 1 + for input_cols, step_processor in pipeline_step_processors.items(): + if step_processor.output_col_name is not None and step_processor.output_mltype is not None: + output_schema[step_processor.output_col_name] = step_processor.output_mltype + return output_schema + + +class DataPipelineStep(Parameters): + input_schema: MLTypeSchema + pipeline_step_processors: Dict[Union[str, Tuple], DataPipelineStepProcessor] + output_schema: MLTypeSchema + + def __str__(self): + out_str = f"{self.__class__.__name__}:" + out_str += "\n >> Input schema: " + str(MLType.convert_values_to_str(self.input_schema)) + out_str += "\n >> Data Processors map:" + for cols_to_transform, step_processor in self.pipeline_step_processors.items(): + out_str += f"\n - Columns to transform: {str(cols_to_transform)}" + out_str += f"\n Data processor: {step_processor.data_processor.class_name}" + if len(step_processor.data_processor.params.dict()) > 0: + out_str += f" ({step_processor.data_processor.params})" + out_str += f"\n Output column: {str(step_processor.output_col_name)} ({str(step_processor.output_mltype)})" + out_str += "\n >> Output schema: " + str(MLType.convert_values_to_str(self.output_schema)) + return out_str + + @classmethod + @safe_validate_arguments + def from_config( + cls, + step_cfg: DataPipelineConfig.StepConfig, + step_input_schema: MLTypeSchema, + ) -> Any: + """ + Static factory to resolve and instantiate a pipeline step object. + Resolution includes: + - Add filtered input schema to the pipeline step + - Add a collection of data processors to the pipeline step + - Add an output schema to the pipeline step + :param step_cfg: pipeline step configuration. + :param step_input_schema: the schema of the DataFrame at this step. + :return: Serializable DataPipelineStep instance. + """ + ## Extract variables: + DataProcessorClass: Type[DataProcessor] = DataProcessor.get_subclass(step_cfg.transformer) + if issubclass(DataProcessorClass, SingleColumnProcessor): + DataProcessorSuperClass: Type[DataProcessor] = SingleColumnProcessor + elif issubclass(DataProcessorClass, Nto1ColumnProcessor): + DataProcessorSuperClass: Type[DataProcessor] = Nto1ColumnProcessor + else: + raise NotImplementedError( + f"Unsupported subtype of {DataProcessor}: {DataProcessorClass}, " + f"with following inheritance: {DataProcessorClass.__mro__}" + ) + + DataPipelineStepProcessorClass: Type[DataPipelineStepProcessor] = ( + DataPipelineStepProcessor.get_subclass(DataProcessorSuperClass) + ) + ## Create data processors and output schema: + ## Note: selection of columns from the pipeline config is case insensitive. User might enter 'AbCD' but the + ## appropriate columns 'abcd' will be picked up from the DataFrame schema. + filtered_step_input_schema: MLTypeSchema = PipelineUtil.filter_schema_by_input_patterns( + step_input_schema, + step_cfg.input, + ) + try: + pipeline_step_processors: Dict[Union[str, Tuple[str]], DataPipelineStepProcessor] = ( + DataPipelineStepProcessorClass.create_pipeline_step_processors( + DataProcessorClass=DataProcessorClass, + filtered_input_schema=filtered_step_input_schema, + name=step_cfg.transformer, + params=step_cfg.params, + output_pattern=step_cfg.output, + ) + ) + except Exception as e: + print(String.format_exception_msg(e)) + raise AttributeError( + f'Error while creating data processor of type "{str(DataProcessorClass)}" ' + f"with params: {str(step_cfg.params)} " + f"and filtered input schema {str(filtered_step_input_schema)}" + ) + output_schema: MLTypeSchema = DataPipelineStepProcessorClass.get_pipeline_step_output_schema( + input_schema=step_input_schema, + pipeline_step_processors=pipeline_step_processors, + ) + return DataPipelineStep( + input_schema=filtered_step_input_schema, + pipeline_step_processors=pipeline_step_processors, + output_schema=output_schema, + ) + + def execute_pipeline_step( + self, + sdf: ScalableDataFrame, + processing_mode: ProcessingMode, + persist: PersistLevel, + should_measure_perf: bool, + should_log_perf: bool, + ) -> Tuple[ScalableDataFrame, Optional[PipelineStepPerf]]: + """ + Runs the particular pipeline step on the input ScalableDataFrame. + :param sdf: input ScalableDataFrame to process. + :param processing_mode: what this step should do, e.g. fit-transform, transform, etc. + :param persist: how often to persist the ScalableDataFrame every so often. + :param should_measure_perf: whether to measure performance information. + :param should_log_perf: whether to log performance information. + :return: transformed ScalableDataFrame (or raw data) after executing this step + """ + step_start_time = time.perf_counter() + if should_log_perf: + Log.debug(f"\n>> Running {processing_mode.lower().replace('_', '-')} on pipeline step...") + _processor_perfs: List[ProcessorPerf] = [] + for input_cols, step_processors in self.pipeline_step_processors.items(): + data_processor: DataProcessor = step_processors.data_processor + output_col_name: str = step_processors.output_col_name + input_cols: List[str] = as_list(input_cols) + sdf_cols: List[str] = list(sdf.columns) + if ( + is_subset(input_cols, sdf_cols) + or data_processor.missing_column_behavior is MissingColumnBehavior.EXECUTE + ): + ## Apply data processor on whatever subset exists, retaining column order: + cols_to_process_set: Set[str] = get_subset(input_cols, sdf_cols) + cols_to_process_in_order: List[str] = [ + col for col in input_cols if col in cols_to_process_set + ] + if isinstance(data_processor, SingleColumnProcessor): + if len(cols_to_process_in_order) != 1: + raise ValueError(f"Expected only one column, found: {cols_to_process_in_order}") + cols_to_process_in_order: str = cols_to_process_in_order[0] + processor_start_time = time.perf_counter() + if should_log_perf: + Log.debug( + f"\n>> Running {processing_mode.lower().replace('_', '-')} " + f"on {type_str(sdf)}, using:\n{str(data_processor)}" + ) + sdf: ScalableDataFrame = self._execute_data_processor( + sdf=sdf, + cols_to_process_in_order=cols_to_process_in_order, + data_processor=data_processor, + processing_mode=processing_mode, + output_col_name=output_col_name, + ) + persist_time_ms: Optional[float] = None + if persist is PersistLevel.EVERY_PROCESSOR: + sdf, persist_time_ms = measure_time_ms(lambda: sdf.persist(wait=True)) + + processor_end_time: float = time.perf_counter() + if should_log_perf: + Log.debug( + f"\r...processor ran in " + f"{String.readable_seconds(processor_end_time - processor_start_time)}." + ) + if should_measure_perf: + _processor_perfs.append( + ProcessorPerf( + start_time=processor_start_time, + processing_mode=processing_mode, + input_columns=as_list(cols_to_process_in_order), + output_columns=as_list(output_col_name), + data_processor_class_name=data_processor.class_name, + data_processor_params=data_processor.params.dict(), + persist_time_ms=persist_time_ms, + end_time=processor_end_time, + ) + ) + elif data_processor.missing_column_behavior is MissingColumnBehavior.SKIP: + continue + elif data_processor.missing_column_behavior is MissingColumnBehavior.ERROR: + raise ValueError( + f"Cannot transform {type_str(sdf)} using {data_processor.class_name} due to insufficient columns: " + f"columns required for transformation: {input_cols}; " + f"columns actually present: {sdf_cols}" + ) + else: + raise NotImplementedError( + f"Unsupported value for {MissingColumnBehavior}: {data_processor.missing_column_behavior}" + ) + persist_time_ms: Optional[float] = None + if persist is PersistLevel.EVERY_PIPELINE_STEP: + sdf, persist_time_ms = measure_time_ms(lambda: sdf.persist(wait=True)) + + step_end_time: float = time.perf_counter() + if should_log_perf: + Log.debug( + f"\r...pipeline-step ran in {String.readable_seconds(step_end_time - step_start_time)}." + ) + step_end_time: float = time.perf_counter() + if should_measure_perf: + if sdf.layout is not DataLayout.DASK: + sdf_num_rows, length_calculation_ms = measure_time_ms(lambda: len(sdf)) + else: + sdf_num_rows, length_calculation_ms = None, None + return sdf, PipelineStepPerf( + start_time=step_start_time, + processing_mode=processing_mode, + num_rows_processed=sdf_num_rows, + length_calculation_ms=length_calculation_ms, + processor_perfs=_processor_perfs, + persist_time_ms=persist_time_ms, + end_time=step_end_time, + ) + return sdf, None + + def _execute_data_processor( + self, + sdf: ScalableDataFrame, + cols_to_process_in_order: List[str], + data_processor: DataProcessor, + processing_mode: ProcessingMode, + output_col_name: str, + ) -> ScalableDataFrame: + if processing_mode is ProcessingMode.FIT_TRANSFORM: + sdf[output_col_name] = data_processor.fit_transform(sdf[cols_to_process_in_order]) + elif processing_mode is ProcessingMode.TRANSFORM: + sdf[output_col_name] = data_processor.transform(sdf[cols_to_process_in_order]) + return sdf + + +class DataPipeline(Parameters): + input_schema: MLTypeSchema + pipeline: List[DataPipelineStep] + output_schema: MLTypeSchema + writers: Dict[FileContents, DataFrameWriter] = {} + layout_scaling: Optional[Dict[ProcessingMode, Tuple[Tuple[confloat(ge=1), DataLayout], ...]]] = { + ProcessingMode.FIT_TRANSFORM: ( + ## Determines which layout to use with different number of rows. + (1_000, DataLayout.DICT), ## <= 1k rows, use DataLayout.DICT + (500_000, DataLayout.PANDAS), ## <= 500k rows, use DataLayout.PANDAS + (inf, DataLayout.DASK), ## >500k rows, use DataLayout.DASK + ), + ProcessingMode.TRANSFORM: ( + ## Determines which layout to use with different number of rows. + (5, DataLayout.LIST_OF_DICT), ## <= 5 rows, use DataLayout.LIST_OF_DICT + (1_000, DataLayout.DICT), ## <= 1k rows, use DataLayout.DICT + (125_000, DataLayout.PANDAS), ## <= 125k rows, use DataLayout.PANDAS + (inf, DataLayout.DASK), ## >125k rows, use DataLayout.DASK + ), + } + _performance: List[ProcessingPipelinePerf] = [] ## For performance tracking + + # @classmethod + # @safe_validate_arguments + # def from_steps( + # cls, + # input_schema: MLTypeSchema, + # process: List[Union[ + # DataProcessor, + # Tuple[str, DataProcessor], + # Tuple[DataProcessor, str], + # Tuple[str, DataProcessor, str] + # ]], + # select: List[Union[MLType, str]], + # write: Optional[List[Writer]] = None + # ) -> DataPipeline: + # process: List = as_list(process) + # select: List = as_list(process) + # if write is not None: + # write: List = as_list(write) + # current_schema: MLTypeSchema = copy.deepcopy(input_schema) + # processing_steps: List[DataPipelineStep] = [] + # for processor_tuple in process: + # if isinstance(processor_tuple, DataProcessor): + # DataPipelineStep( + # input_schema=filtered_step_input_schema, + # data_processors=data_processors, + # output_schema=output_schema, + # ) + + @classmethod + @safe_validate_arguments + def from_config( + cls, + config: Union[DataPipelineConfig, FileMetadata], + input_schema: MLTypeSchema, + only_writers: bool = False, + *args, + **kwargs, + ) -> DataPipeline: + """ + Static factory to resolve each pipeline step and instantiate the pipeline object. + :param config: either DataPipelineConfig or config file (YAML/JSON) with pipeline steps and writers. + :param input_schema: schema of the input dataframe this pipeline can process. + :param only_writers: if True, then only the writers will be initialized. + :return: Serializable DataPipeline instance. + """ + if isinstance(config, FileMetadata): + reader: Reader = Reader.of(config.format) + assert isinstance(reader, ConfigReader) + Log.debug("\nReading pipeline config...") + config: DataPipelineConfig = DataPipelineConfig(**reader.read_metadata(config)) + Log.debug("...done reading pipeline config.") + if not only_writers: + return cls._resolve_pipeline( + input_schema=input_schema, + pipeline_steps=config.pipeline, + writers=config.writers_config, + *args, + **kwargs, + ) + else: + return cls._resolve_pipeline( + input_schema=input_schema, + pipeline_steps=[], + writers=config.writers_config, + *args, + **kwargs, + ) + + @classmethod + def _resolve_pipeline( + cls, + input_schema: MLTypeSchema, + pipeline_steps: List[DataPipelineConfig.StepConfig], + writers: Optional[List[DataPipelineConfig.WriterConfig]] = None, + *args, + **kwargs, + ) -> DataPipeline: + """ + Static factory to resolve each pipeline step and instantiate the pipeline object. + :param input_schema: schema of the input dataframe this pipeline can process. + :param pipeline_steps: list of pipeline steps input by the user. + :param writers: list of Dataframe or Algorithm writers input by the user. + Some of these may be invoked when the file with corresponding properties is passed. + :return: Serializable DataPipeline instance. + """ + + Log.debug("\nInitializing DataPipeline...") + Log.debug(f"\n> Input schema to pipeline: {input_schema}") + + ## Resolve pipeline steps: + resolved_pipeline: List[DataPipelineStep] = [] + cur_schema = input_schema + Log.debug(f"\n> Resolving pipeline transformation steps: {str(pipeline_steps)}") + for pipeline_step in pipeline_steps: + resolved_pipeline_step: DataPipelineStep = DataPipelineStep.from_config( + step_cfg=pipeline_step, + step_input_schema=cur_schema, + ) + resolved_pipeline.append(resolved_pipeline_step) + Log.debug(f"Added {str(resolved_pipeline_step)}") + cur_schema: MLTypeSchema = resolved_pipeline_step.output_schema + output_schema: MLTypeSchema = cur_schema + Log.debug("...resolved pipeline transformation steps.") + Log.debug(f"\n> Output schema from pipeline: \n{json.dumps(output_schema, indent=4)}") + + ## Resolve writers: + if writers is None: + writers: Dict[FileContents, DataFrameWriter] = {} + else: + Log.debug("\n> Resolving pipeline writers...") + writers: Dict[FileContents, DataFrameWriter] = cls._resolve_pipeline_writers( + writers=writers, + output_schema=output_schema, + *args, + **kwargs, + ) + Log.debug("...resolved pipeline writers.") + + ## Instantiate: + pipeline = DataPipeline( + input_schema=input_schema, + pipeline=resolved_pipeline, + output_schema=output_schema, + writers=writers, + ) + Log.debug("...done initializing pipeline.") + return pipeline + + @classmethod + @safe_validate_arguments + def _resolve_pipeline_writers( + cls, + writers: List[DataPipelineConfig.WriterConfig], + output_schema: MLTypeSchema, + *args, + **kwargs, + ) -> Dict[FileContents, DataFrameWriter]: + pipeline_writers: Dict[FileContents, DataFrameWriter] = {} + for writer_cfg in writers: + writer: DataFrameWriter = cls._create_pipeline_writer( + writer_cfg, + output_schema, + ) + for supported_file_content in writer.file_contents: + if supported_file_content in pipeline_writers: + raise KeyError( + f"Only one writer of {supported_file_content} contents can be present" + f"in the pipeline. Found two writers of type {supported_file_content}." + ) + pipeline_writers[supported_file_content] = writer + Log.debug(f'Set writer of key "{str(supported_file_content)}" as {str(writer)}') + return pipeline_writers + + @classmethod + def _create_pipeline_writer( + cls, + writer_cfg: DataPipelineConfig.WriterConfig, + output_schema: MLTypeSchema, + ) -> DataFrameWriter: + writer_cfg: DataPipelineConfig.WriterConfig = writer_cfg.copy(deep=True) + WriterClass: Type[Writer] = Writer.get_subclass(writer_cfg.writer) + if not isinstance(WriterClass, DataFrameWriter.__class__): + raise TypeError( + f"Pipeline writers must be of type {DataFrameWriter.class_name}; " + f"found: {WriterClass.class_name}." + ) + + ## Overwrite keys in the output schema with those present in the writer config (if any): + writer_data_schema: MLTypeSchema = { + **output_schema, + **writer_cfg.schema_override, + } + writer_data_schema: MLTypeSchema = PipelineUtil.filter_schema_by_input_patterns( + schema=writer_data_schema, input_patterns=writer_cfg.input + ) + writer_cfg.params["data_schema"] = writer_data_schema + return WriterClass(**writer_cfg.params) + + @safe_validate_arguments + def get_writer_by_file_contents(self, file_contents: FileContents) -> Optional[Writer]: + return self.writers.get(file_contents) + + # def fit(self, df, ) + # def transform(self, df, ) + # def fit_transform(self, df, ) + + @safe_validate_arguments + def execute( + self, + data: Union[ScalableDataFrame, ScalableDataFrameRawType], + processing_mode: ProcessingMode, + process_as: Optional[DataLayout] = None, + measure_perf: FractionalBool = True, + log_perf: FractionalBool = True, + persist: PersistLevel = PersistLevel.DONT_PERSIST, + write_to: Optional[Union[List[FileMetadata], FileMetadata]] = None, + overwrite: bool = False, + rnd: Optional[confloat(ge=0.0, le=1.0)] = None, + **kwargs, + ) -> Union[ScalableDataFrame, ScalableDataFrameRawType]: + """ + Executes each pipeline step on the input DataFrame in a sequential fashion. + :param data: input ScalableDataFrame or raw type (Pandas, Dask, List of Dicts, etc). + :param processing_mode: fit, fit_transform, transform + :param process_as: data layout to run the pipeline. + :param measure_perf: how often to measure performance. + If False, it will not measure performance. If True, it will measure performance. + If 0.0 < measure_perf < 1.0, then we will measure performance a fraction of the time. + :param log_perf: how often to log performance. + If False, it will not log performance. If True, it will always log performance. + If 0.0 < log_perf < 1.0, then we will log performance a fraction of the time. + :param persist: how often to persist processed results (for lazily-evaluated dataframes). + :param write_to: output files to write to using the configured writers. + :param overwrite: whether to overwrite the path while writing. + :param rnd: Optional random value (passed to ensure end-to-end logging and performance measurement). + :return: the transformed DataFrame. + """ + pipeline_start_time = time.time() + if rnd is None: + rnd: float = np.random.random() + should_measure_perf: bool = rnd <= measure_perf + should_log_perf: bool = rnd <= log_perf + + if should_measure_perf: + Log.info(f"\nRunning pipeline in {processing_mode.lower().replace('_', '-')} mode on dataset...") + + ## Detect layout if the input is raw data: + is_input_ScalableDataFrame: bool = isinstance(data, ScalableDataFrame) + sdf, layout_detection_time_ms = measure_time_ms(lambda: ScalableDataFrame.of(data, layout=None)) + input_data_layout: DataLayout = sdf.layout + + ## For lazy-loaded DataFrames (e.g. Dask, Spark), read data from file: + persist_read_time_ms: Optional[float] = None + if persist in { + PersistLevel.BEFORE_PIPELINE, + PersistLevel.BEFORE_AFTER_PIPELINE, + PersistLevel.EVERY_PIPELINE_STEP, + PersistLevel.EVERY_PROCESSOR, + }: + sdf, persist_read_time_ms = measure_time_ms(lambda: sdf.persist(wait=True)) + + ## Convert to different layout used to process: + sdf_num_rows: Optional[int] = None + length_calculation_ms: Optional[float] = None + if process_as is None: + if sdf_num_rows is None: + sdf_num_rows, length_calculation_ms = measure_time_ms(lambda: len(sdf)) + for sdf_num_rows_limit, process_as in self.layout_scaling[processing_mode]: + if sdf_num_rows <= sdf_num_rows_limit: + break ## Sets data_layout + layout_conversion_ms: float = 0.0 + if process_as is not DataLayout.RECORD: + sdf, layout_conversion_ms = measure_time_ms(lambda: sdf.as_layout(layout=process_as)) + + ## Run the pipeline: + pipeline_step_perfs: Optional[List[PipelineStepPerf]] = None + pipeline_steps_compute_time_ms: Optional[float] = None + if len(self.pipeline) > 0: + pipeline_steps_compute_start_time: float = time.time() + if processing_mode is ProcessingMode.TRANSFORM and process_as in { + DataLayout.LIST_OF_DICT, + DataLayout.RECORD, + }: + sdf, pipeline_step_perfs = self._transform_as_records( + sdf=sdf, + processing_mode=processing_mode, + persist=persist, + should_measure_perf=should_measure_perf, + should_log_perf=should_log_perf, + ) + else: + sdf, pipeline_step_perfs = self._execute_as_sdf( + sdf=sdf, + processing_mode=processing_mode, + persist=persist, + should_measure_perf=should_measure_perf, + should_log_perf=should_log_perf, + process_as=process_as, + ) + + pipeline_steps_compute_end_time: float = time.time() + pipeline_steps_compute_time_ms: float = 1000 * ( + pipeline_steps_compute_end_time - pipeline_steps_compute_start_time + ) + + ## For lazy-loaded DataFrames (e.g. Dask, Spark), this actually starts the data-processing: + persist_compute_time_ms: Optional[float] = None + if persist in { + PersistLevel.AFTER_PIPELINE, + PersistLevel.BEFORE_AFTER_PIPELINE, + }: + sdf, persist_compute_time_ms = measure_time_ms(lambda: sdf.persist(wait=True)) + + ## Write data to files + pipeline_writer_perfs: Optional[List[PipelineWriterPerf]] = None + pipeline_write_time_ms: Optional[float] = None + if write_to is not None: + write_to: List[FileMetadata] = as_list(write_to) + pipeline_write_start_time: float = time.time() + pipeline_writer_perfs: List[PipelineWriterPerf] = self._write_processed( + sdf=sdf, + processing_mode=processing_mode, + should_measure_perf=should_measure_perf, + should_log_perf=should_log_perf, + write_to=write_to, + overwrite=overwrite, + **kwargs, + ) + pipeline_write_end_time: float = time.time() + pipeline_write_time_ms: float = 1000 * (pipeline_write_end_time - pipeline_write_start_time) + + ## Log and measure performance + pipeline_end_time: float = time.time() + if should_log_perf: + writers_log_str: str = ( + f" and running {len(self.writers)} writers " if write_to is not None else " " + ) + Log.info( + f"...done running pipeline in {processing_mode.lower().replace('_', '-')} mode{writers_log_str}in " + f"{String.readable_seconds(pipeline_end_time - pipeline_start_time)}." + ) + if should_measure_perf: + if sdf_num_rows is None: + sdf_num_rows, length_calculation_ms = measure_time_ms(lambda: len(sdf)) + pipeline_end_time: float = time.time() + self._performance.append( + ProcessingPipelinePerf( + processing_mode=processing_mode, + persist=persist, + is_input_ScalableDataFrame=is_input_ScalableDataFrame, + should_log_perf=should_log_perf, + start_time=pipeline_start_time, + layout_detection_time_ms=layout_detection_time_ms, + input_data_layout=input_data_layout, + persist_read_time_ms=persist_read_time_ms, + length_calculation_ms=length_calculation_ms, + process_as=process_as, + layout_conversion_ms=layout_conversion_ms, + pipeline_steps_compute_time_ms=pipeline_steps_compute_time_ms, + pipeline_step_perfs=pipeline_step_perfs, + persist_compute_time_ms=persist_compute_time_ms, + pipeline_write_time_ms=pipeline_write_time_ms, + pipeline_writer_perfs=pipeline_writer_perfs, + num_rows_processed=sdf_num_rows, + end_time=pipeline_end_time, + ) + ) + if is_input_ScalableDataFrame: + return sdf + return sdf._data + + def _transform_as_records( + self, + sdf: ScalableDataFrame, + processing_mode: ProcessingMode, + persist: PersistLevel, + should_measure_perf: bool, + should_log_perf: bool, + ) -> Tuple[ScalableDataFrame, List[PipelineStepPerf]]: + record_sdfs: List[ScalableDataFrame] = list( + sdf.stream(stream_as=DataLayout.RECORD, num_rows=1, shuffle=False, raw=False) + ) + for i in range(len(record_sdfs)): + for pipeline_step_i, pipeline_step in enumerate(self.pipeline): + record_sdfs[i], _step_perf = pipeline_step.execute_pipeline_step( + sdf=record_sdfs[i], + processing_mode=processing_mode, + persist=PersistLevel.DONT_PERSIST, + should_measure_perf=should_measure_perf, + should_log_perf=should_log_perf, + ) + ## TODO: log perfs + record_sdf_concat: ScalableDataFrame = ScalableDataFrame.concat( + record_sdfs, + reset_index=True, + layout=sdf.layout, + ) + if record_sdf_concat.layout != sdf.layout: + raise ValueError( + f"Expected the output {ScalableDataFrame.__name__} to have layout " + f"{sdf.layout}; found layout {record_sdf_concat.layout}" + ) + return record_sdf_concat, [] + + def _execute_as_sdf( + self, + sdf: ScalableDataFrame, + processing_mode: ProcessingMode, + persist: PersistLevel, + should_measure_perf: bool, + should_log_perf: bool, + process_as: DataLayout, + ) -> Tuple[ScalableDataFrame, List[PipelineStepPerf]]: + pipeline_step_perfs: List[PipelineStepPerf] = [] + for pipeline_step in self.pipeline: + sdf, _step_perf = pipeline_step.execute_pipeline_step( + sdf=sdf, + processing_mode=processing_mode, + persist=persist, + should_measure_perf=should_measure_perf, + should_log_perf=should_log_perf, + ) + if sdf.layout != process_as: + raise ValueError( + f"Expected the output {ScalableDataFrame.__name__} of the following step to have layout " + f"{process_as}; found layout {sdf.layout}: {str(pipeline_step)}" + ) + if should_measure_perf: + pipeline_step_perfs.append(_step_perf) + return sdf, pipeline_step_perfs + + def _write_processed( + self, + sdf: ScalableDataFrame, + processing_mode: ProcessingMode, + should_measure_perf: bool, + should_log_perf: bool, + write_to: List[FileMetadata], + overwrite: bool = False, + **kwargs, + ) -> Optional[List[PipelineWriterPerf]]: + writers_start_time: float = time.time() + if should_log_perf: + Log.debug( + f"\nWriting dataset after {processing_mode.lower().replace('_', '-')}, " + f"using {len(self.writers)} writers..." + ) + + _writer_perfs: List[PipelineWriterPerf] = [] + for file in write_to: + writer_start_time: float = time.time() + writer: Writer = self.writers.get(file.contents) + if writer is None: + raise KeyError( + f"While writing from pipeline, could not find writer for the following output metadata " + f"(with contents {file.contents}):\n{str(file)}" + ) + if should_log_perf: + Log.debug(f"\n>> Writing processed data using {str(writer)}") + if not writer.write_metadata(file, sdf, overwrite=overwrite, **kwargs): + raise IOError("Could not write pipeline output to file.") + writer_end_time: float = time.time() + if should_log_perf: + Log.debug( + f"\r...writer ran in {String.readable_seconds(writer_end_time - writer_start_time)}." + ) + if should_measure_perf: + _writer_perfs.append( + PipelineWriterPerf( + start_time=writer_start_time, + input_columns=sorted(list(writer.data_schema.keys())), + writer_class_name=writer.class_name, + writer_params=writer.params.dict(), + end_time=writer_end_time, + ) + ) + writers_end_time: float = time.time() + if should_log_perf: + if processing_mode is ProcessingMode.FIT_TRANSFORM: + Log.info( + f"...done running writers in " + f"{String.readable_seconds(writers_end_time - writers_start_time)}." + ) + return _writer_perfs + + def serialize(self, file: str): + """ + Serialize the pipeline object (and all its data processors) using the cloudpickle library, which Ray uses. + Ref: https://github.com/cloudpipe/cloudpickle + """ + ## TODO: create a writer for pickled objects. + Log.debug("\nSerializing pipeline...") + file = String.assert_not_empty_and_strip(file) + with io.open(file, "wb") as out: + cloudpickle.dump(self, out) + Log.debug("...done serializing pipeline.") + + @classmethod + def deserialize(cls, file) -> DataPipeline: + ## TODO: create a reader for pickled objects. + Log.debug("Reading pipeline file from pickle...") + with io.open(file, "rb") as inp: + pipeline: DataPipeline = cloudpickle.load(inp) + if not isinstance(pipeline, DataPipeline): + raise TypeError( + f"Deserialized pipeline is must be an instance of {DataPipeline.__class__}.", + f"Found object of type {type_str(pipeline)}", + ) + Log.debug("...done reading pipeline file from pickle.") + return pipeline + + +class PipelineUtil: + def __init__(self): + raise TypeError(f"Cannot create {str(self.__class__)} instances.") + + @classmethod + def filter_schema_by_input_patterns(cls, schema: MLTypeSchema, input_patterns: Union[str, List[str]]): + """ + :param schema: Dict where keys are column names and values are strings corresponding to MLTypes. + :param input_patterns: String or list of strings, like '.*_TFIDF', 'NUMERIC', ['TEXT', '.*_TFIDF'] etc. + :return: filtered schema, where we filter based on either the key (if string) or value (if MLType). + """ + filtered_schema: Optional[Dict] = None + if not isinstance(input_patterns, list): + input_patterns = [input_patterns] + filtered_cols = set() + filtered_cols_ordered = [] + for input_pattern in input_patterns: + input_mltype = MLType.from_str(input_pattern, raise_error=False) + if isinstance(input_mltype, MLType): + filtered_mltype_cols = set(keep_values(schema, input_mltype).keys()) + filtered_mltype_cols_list = list(filtered_mltype_cols) + + # This is used for handling cases when Numeric value is present inside String + # Example: If there are two columns named TOP_1_PREDICTED_LABEL and TOP_10_PREDICTED_LABEL + # Then output of sorted would be ['TOP_10_PREDICTED_LABEL', 'TOP_1_PREDICTED_LABEL'] + # This creates problem when using Uncertainty Calculator + # To solve this, first check if all the column names have any digit present. + # If yes, then sort it using key (REF: https://stackoverflow.com/a/49232907) + # If no, then sort lexicographically + + if cls.__do_column_names_have_numeric_values(filtered_mltype_cols_list): + filtered_cols_ordered += [ + col + for col in sorted( + filtered_mltype_cols_list, + key=lambda x: int("".join([i for i in x if i.isdigit()])), + ) + if col not in filtered_cols_ordered + ] + else: + filtered_cols_ordered += [ + col for col in sorted(filtered_mltype_cols_list) if col not in filtered_cols_ordered + ] + filtered_cols = filtered_cols.union(filtered_mltype_cols) + elif isinstance(input_pattern, str): + filtered_str_pattern_cols = set( + filter_string_list(list(schema.keys()), input_pattern, ignorecase=True) + ) + filtered_cols = filtered_cols.union(filtered_str_pattern_cols) + filtered_cols_ordered += [ + col for col in sorted(list(filtered_str_pattern_cols)) if col not in filtered_cols_ordered + ] + else: + raise AttributeError( + f"input_pattern must be a str denoting regex or an MLType, found {input_pattern}" + ) + filtered_schema: Dict = keep_keys(schema, list(filtered_cols)) + filtered_schema_ordered = OrderedDict() + for col in filtered_cols_ordered: + filtered_schema_ordered[col] = filtered_schema[col] + + return filtered_schema_ordered + + @classmethod + def __do_column_names_have_numeric_values(cls, filtered_cols_list: List[str]) -> bool: + return all( + [True if any(char.isdigit() for char in col_name) else False for col_name in filtered_cols_list] + ) diff --git a/src/bears/processor/_DataProcessor.py b/src/bears/processor/_DataProcessor.py new file mode 100644 index 0000000..cbbb46d --- /dev/null +++ b/src/bears/processor/_DataProcessor.py @@ -0,0 +1,128 @@ +from abc import ABC, abstractmethod +from typing import ClassVar, Dict, NoReturn, Optional, Tuple, Union + +from bears.core.frame import ScalableDataFrame, ScalableDataFrameRawType, ScalableSeries +from bears.util import MutableParameters, Registry, UserEnteredParameters +from pydantic import model_validator + +from bears.constants import DataLayout, MissingColumnBehavior, MLType, MLTypeSchema + + +class DataProcessor(MutableParameters, Registry, ABC): + """ + Abstract base class for all data processors. + + Subclasses of this class should be serializable via pickling. + Subclasses must define the following class variables: + - missing_column_behavior: Used in the context of DataPipeline. This field determined whether to allow + skipping of transformations when the columns required for those transformations are not present in the DataFrame. + E.g. If the pipeline processes the ground truth labels (such as label-encoding), then during inference time ground + truth labels will not be present and transformations declared on the ground truth column cannot run. + - input_mltypes: Lists the input MLTypes types of the columns the data processor can act take as input. + - output_mltype: Lists the output MLType the data processor will return. This is an instance method since it might + vary depending on the parameters used to initialize the data processor. + """ + + missing_column_behavior: ClassVar[MissingColumnBehavior] = MissingColumnBehavior.ERROR + input_mltypes: ClassVar[Tuple[MLType, ...]] + output_mltype: ClassVar[MLType] + + class Params(UserEnteredParameters): + """ + BaseModel for parameters. Expected to be overridden by subclasses of DataProcessor. + Example: + class CaseTransformer(DataProcessor): + class Params(DataProcessor.Params): + case: Literal['lowercase', 'uppercase'] + """ + + pass + + name: str = None + data_schema: Optional[MLTypeSchema] = None + params: Params = {} + + def __str__(self): + params_str: str = self.json(include={"name": True, "data_schema": True, "params": True}, indent=4) + out: str = f"{self.class_name} with params:\n{params_str}" + return out + + @property + def AlreadyFitError(self) -> ValueError: + return ValueError(".fit() has already been called.") + + @property + def FitBeforeTransformError(self) -> ValueError: + return ValueError(".fit() must be called before .transform()") + + @model_validator(mode="before") + @classmethod + def convert_params(cls, params: Dict): + if params.get("name") is None: + params["name"] = cls.class_name + params["params"] = super(DataProcessor, cls)._convert_params(cls.Params, params.get("params")) + return params + + @abstractmethod + def fit( + self, + data: Union[ + ScalableDataFrame, + ScalableDataFrameRawType, + ScalableSeries, + ScalableDataFrameRawType, + ], + process_as: Optional[DataLayout] = None, + ) -> NoReturn: + """ + Fits the data processor instance on the input data. + By default, this is a no-op, i.e. the data processor is assumed to be stateless. + - Any subclass implementation must not modify the input data. + - Any subclass implementation must fit data structure(s) which are serializable via pickling. + :param data: input data which the data processor will use to fit. + :param process_as: data-layout to use while processing. + :return: None + """ + pass + + @abstractmethod + def transform( + self, + data: Union[ + ScalableDataFrame, + ScalableDataFrameRawType, + ScalableSeries, + ScalableDataFrameRawType, + ], + process_as: Optional[DataLayout] = None, + ) -> Union[ + ScalableDataFrame, + ScalableDataFrameRawType, + ScalableSeries, + ScalableDataFrameRawType, + ]: + """ + Transforms the input data and returns the result. Any subclass implementation must not modify the input data. + :param data: input data which the data processor will act on. + :param process_as: data-layout to use while processing. + :return: transformed result. + """ + pass + + def fit_transform( + self, + data: Union[ + ScalableDataFrame, + ScalableDataFrameRawType, + ScalableSeries, + ScalableDataFrameRawType, + ], + process_as: Optional[DataLayout] = None, + ) -> Union[ + ScalableDataFrame, + ScalableDataFrameRawType, + ScalableSeries, + ScalableDataFrameRawType, + ]: + self.fit(data, process_as=process_as) + return self.transform(data, process_as=process_as) diff --git a/src/bears/processor/_DataProcessor_mixins.py b/src/bears/processor/_DataProcessor_mixins.py new file mode 100644 index 0000000..b8c970c --- /dev/null +++ b/src/bears/processor/_DataProcessor_mixins.py @@ -0,0 +1,136 @@ +from abc import ABC + +from bears.constants import MissingColumnBehavior, MLType +from bears.processor import DataProcessor + + +class NumericInputProcessor(DataProcessor, ABC): + """Mixin for numeric input data processors.""" + + input_mltypes = [MLType.INT, MLType.FLOAT] + + +class CategoricalInputProcessor(DataProcessor, ABC): + """Mixin for categorical input data processors.""" + + input_mltypes = [MLType.INT, MLType.CATEGORICAL] + + +class CategoricalOutputProcessor(DataProcessor, ABC): + """Mixin for categorical output data processors.""" + + output_mltype = MLType.CATEGORICAL + + +class IntegerOutputProcessor(DataProcessor, ABC): + """Mixin for integer output data processors.""" + + output_mltype = MLType.INT + + +class DecimalOutputProcessor(DataProcessor, ABC): + """Mixin for decimal output data processors.""" + + output_mltype = MLType.FLOAT + + +class EncodedLabelOutputProcessor(DataProcessor, ABC): + """Mixin for label output data processors.""" + + output_mltype = MLType.ENCODED_LABEL + + +class TextInputProcessor(DataProcessor, ABC): + """Mixin for text input data processors.""" + + input_mltypes = [ + MLType.TEXT, + MLType.CATEGORICAL, + MLType.INT, + MLType.FLOAT, + MLType.BOOL, + ] + + +class VectorAssemblerInputProcessor(DataProcessor, ABC): + """Mixin for vectorAssembler input data processors.""" + + input_mltypes = [MLType.INT, MLType.FLOAT, MLType.VECTOR, MLType.SPARSE_VECTOR] + + +class LabelInputProcessor(DataProcessor, ABC): + """Mixin for label input data processors.""" + + missing_column_behavior = MissingColumnBehavior.SKIP + + input_mltypes = [ + MLType.GROUND_TRUTH_LABEL, + MLType.ENCODED_LABEL, + MLType.PREDICTED_LABEL, + MLType.ENCODED_PREDICTED_LABEL, + ] + + +class TextOrLabelInputProcessor(DataProcessor, ABC): + """Mixin for text or label input data processors.""" + + missing_column_behavior = MissingColumnBehavior.SKIP + input_mltypes = LabelInputProcessor.input_mltypes + TextInputProcessor.input_mltypes + + +class TextOutputProcessor(DataProcessor, ABC): + """Mixin for text output data processors.""" + + output_mltype = MLType.TEXT + + +class BoolOutputProcessor(DataProcessor, ABC): + """Mixin for bool output data processors.""" + + output_mltype = MLType.BOOL + + +class VectorInputProcessor(DataProcessor, ABC): + """Mixin for vector input data processors.""" + + input_mltypes = [MLType.VECTOR] + + +class VectorOutputProcessor(DataProcessor, ABC): + """Mixin for vector output data processors.""" + + output_mltype = MLType.VECTOR + + +class SparseVectorInputProcessor(DataProcessor, ABC): + """Mixin for sparse vector input data processors.""" + + input_mltypes = [MLType.SPARSE_VECTOR] + + +class SparseVectorOutputProcessor(DataProcessor, ABC): + """Mixin for sparse vector output data processors.""" + + output_mltype = MLType.SPARSE_VECTOR + + +class NonVectorInputProcessor(DataProcessor, ABC): + """Mixin for non-vector input data processors.""" + + input_mltypes = list(set(MLType).difference({MLType.VECTOR, MLType.SPARSE_VECTOR})) + + +class PredictionsInputProcessor(DataProcessor, ABC): + """Mixin for algorithm predictions data input data processors.""" + + input_mltypes = [ + MLType.INDEX, + MLType.GROUND_TRUTH_LABEL, + MLType.ENCODED_LABEL, + MLType.PROBABILITY_SCORE, + MLType.PROBABILITY_SCORE_COMMA_SEPARATED_OR_LIST, + MLType.PREDICTED_LABEL, + MLType.PREDICTED_LABEL_COMMA_SEPARATED_OR_LIST, + MLType.ENCODED_PREDICTED_LABEL, + MLType.PREDICTED_CORRECT, + ] diff --git a/src/bears/processor/_Nto1ColumnProcessor.py b/src/bears/processor/_Nto1ColumnProcessor.py new file mode 100644 index 0000000..f1c3b47 --- /dev/null +++ b/src/bears/processor/_Nto1ColumnProcessor.py @@ -0,0 +1,61 @@ +from abc import ABC, abstractmethod +from typing import ( + Optional, +) + +from bears import ( + ScalableDataFrame, + ScalableOrRaw, + ScalableSeries, + ScalableSeriesOrRaw, + is_scalable, +) +from bears.util import safe_validate_arguments + +from bears.constants import DataLayout +from bears.processor import DataProcessor + + +class Nto1ColumnProcessor(DataProcessor, ABC): + """Abstract base class for N:1 data processors.""" + + @safe_validate_arguments + def fit( + self, + data: ScalableOrRaw, + process_as: Optional[DataLayout] = None, + ): + data: ScalableDataFrame = ScalableDataFrame.of(data, layout=process_as) + self._fit_df(data) + + def _fit_df(self, data: ScalableDataFrame): + """Fit step is a noop by default.""" + pass + + def __call__(self, *args, **kwargs): + return self.transform(*args, **kwargs) + + @safe_validate_arguments + def transform( + self, + data: ScalableOrRaw, + process_as: Optional[DataLayout] = None, + ) -> ScalableSeriesOrRaw: + output_data: ScalableSeries = self._transform_df(ScalableDataFrame.of(data, layout=process_as)) + if is_scalable(data): + return output_data + return output_data.raw() + + @abstractmethod + def _transform_df(self, data: ScalableDataFrame) -> ScalableSeries: + """N:1 data processors can make optimizations internally as column-wise operations are usually much faster.""" + pass + + @safe_validate_arguments + def fit_transform( + self, + data: ScalableOrRaw, + process_as: Optional[DataLayout] = None, + ) -> ScalableSeries: + self.fit(data, process_as=process_as) + return self.transform(data, process_as=process_as) diff --git a/src/bears/processor/_SingleColumnProcessor.py b/src/bears/processor/_SingleColumnProcessor.py new file mode 100644 index 0000000..460e129 --- /dev/null +++ b/src/bears/processor/_SingleColumnProcessor.py @@ -0,0 +1,65 @@ +from abc import ABC +from typing import ( + Any, + Optional, + Union, +) + +from bears import ScalableSeries, ScalableSeriesRawType +from bears.util import get_current_fn_name + +from bears.constants import DASK_APPLY_OUTPUT_MLTYPE_TO_META_MAP, DataLayout +from bears.processor import DataProcessor + + +class SingleColumnProcessor(DataProcessor, ABC): + """Abstract base class for 1:1 data processors.""" + + def fit( + self, + data: Union[ScalableSeries, ScalableSeriesRawType], + process_as: Optional[DataLayout] = None, + ): + data: ScalableSeries = ScalableSeries.of(data, layout=process_as) + self._fit_series(data) + + def _fit_series(self, data: ScalableSeries): + """Fit step is a noop by default.""" + pass + + def __call__(self, *args, **kwargs): + return self.transform(*args, **kwargs) + + def transform( + self, + data: Union[ScalableSeries, ScalableSeriesRawType], + process_as: Optional[DataLayout] = None, + ) -> Union[ScalableSeries, ScalableSeriesRawType]: + output_data: ScalableSeries = self._transform_series(ScalableSeries.of(data, layout=process_as)) + if isinstance(data, ScalableSeries): + return output_data + return output_data.raw() + + def _transform_series(self, data: ScalableSeries) -> ScalableSeries: + """1:1 data processors can make optimizations internally.""" + kwargs = {} + if data.layout is DataLayout.DASK: + if self.output_mltype in DASK_APPLY_OUTPUT_MLTYPE_TO_META_MAP: + kwargs["meta"] = DASK_APPLY_OUTPUT_MLTYPE_TO_META_MAP[self.output_mltype] + return data.apply(self.transform_single, **kwargs) + + def transform_single(self, data: Any) -> Any: + """ + Transforms a single data point using the current data processor. + :param data: input data point + :return: transformed value + """ + raise NotImplementedError(f"{get_current_fn_name()} has not been implemented.") + + def fit_transform( + self, + data: Union[ScalableSeries, ScalableSeriesRawType], + process_as: Optional[DataLayout] = None, + ) -> ScalableSeries: + self.fit(data, process_as=process_as) + return self.transform(data, process_as=process_as) diff --git a/src/bears/processor/__init__.py b/src/bears/processor/__init__.py new file mode 100644 index 0000000..029b426 --- /dev/null +++ b/src/bears/processor/__init__.py @@ -0,0 +1,9 @@ +from bears.processor._DataProcessor import * +from bears.processor._SingleColumnProcessor import * +from bears.processor._Nto1ColumnProcessor import * +from bears.processor._DataPipeline import * +from bears.processor._DataProcessor_mixins import * +from bears.processor._categorical import * +from bears.processor._numeric import * +from bears.processor._text import * +from bears.processor._vector import * \ No newline at end of file diff --git a/src/bears/processor/_categorical/_CategoricalMissingValueImputation.py b/src/bears/processor/_categorical/_CategoricalMissingValueImputation.py new file mode 100644 index 0000000..37be6f3 --- /dev/null +++ b/src/bears/processor/_categorical/_CategoricalMissingValueImputation.py @@ -0,0 +1,75 @@ +from typing import ( + Any, + Dict, + Optional, +) + +from autoenum import AutoEnum, auto +from bears import ScalableSeries +from bears.util import is_null +from pydantic import model_validator + +from bears.processor import CategoricalInputProcessor, CategoricalOutputProcessor, SingleColumnProcessor + + +class CategoricalImputationStrategy(AutoEnum): + MODE = auto() + CONSTANT = auto() + + +class CategoricalMissingValueImputation( + SingleColumnProcessor, CategoricalInputProcessor, CategoricalOutputProcessor +): + """ + This calculates or fills in the value to be filled in place of nan based on strategy passed as input. + Params: + - FILL_VALUE: the value to be filled in when it encounters a NaN (This must be only passed when CONSTANT is strategy) + - STRATEGY: this indicates what strategy must be used when NaN is encountered + - MODE: The number which appears most often in a set of numbers + - CONSTANT: This allows the user to pass in a fill value where that fill value will be imputed + """ + + class Params(SingleColumnProcessor.Params): + strategy: CategoricalImputationStrategy + fill_value: Optional[Any] = None + + imputed_value: Optional[Any] = None + + @model_validator(mode="before") + @classmethod + def set_imputed_value(cls, params: Dict) -> Dict: + cls.set_default_param_values(params) + if params["params"].strategy is CategoricalImputationStrategy.CONSTANT: + if params["params"].fill_value is None: + raise ValueError( + f"Cannot have empty `fill_value` when `strategy` is {CategoricalImputationStrategy.CONSTANT}" + ) + params["imputed_value"] = params["params"].fill_value + elif params["params"].fill_value is not None: + raise ValueError( + f"`fill_value` can only be passed when strategy={CategoricalImputationStrategy.CONSTANT}" + ) + return params + + def _fit_series(self, data: ScalableSeries): + if self.params.strategy is not CategoricalImputationStrategy.CONSTANT: + if self.imputed_value is not None: + raise self.AlreadyFitError + if self.params.strategy is CategoricalImputationStrategy.MODE: + self.imputed_value = self._get_mode(data) + else: + raise NotImplementedError(f"Unsupported strategy: {self.params.strategy}") + + def _get_mode(self, data: ScalableSeries) -> Any: + imputed_value: Any = data.mode().compute().iloc[0] + if not isinstance(imputed_value, str): + if float(imputed_value).is_integer(): + return int(imputed_value) + return imputed_value + + def transform_single(self, data: Optional[Any]) -> Any: + if self.imputed_value is None and self.params.strategy is not CategoricalImputationStrategy.CONSTANT: + raise self.FitBeforeTransformError + if is_null(data): + data = self.imputed_value + return data diff --git a/src/bears/processor/_categorical/_LabelAffix.py b/src/bears/processor/_categorical/_LabelAffix.py new file mode 100644 index 0000000..c5de437 --- /dev/null +++ b/src/bears/processor/_categorical/_LabelAffix.py @@ -0,0 +1,38 @@ +from typing import ( + Any, + Optional, +) + +from bears.util import is_null +from pydantic import constr + +from bears.processor import ( + EncodedLabelOutputProcessor, + SingleColumnProcessor, + TextOrLabelInputProcessor, +) + + +class LabelAffix(SingleColumnProcessor, TextOrLabelInputProcessor, EncodedLabelOutputProcessor): + """ + Adds a suffix or prefix (or both) to a label. + + Params: + - PREFIX: option prefix to the label + - SUFFIX: option suffix to the label + """ + + class Params(SingleColumnProcessor.Params): + prefix: constr(min_length=0) = "" + suffix: constr(min_length=0) = "" + + # def _transform_series(self, data: ScalableSeries) -> ScalableSeries: + # nulls: ScalableSeries = data.isna() + # data = self.params.prefix + data.fillna('').astype(str) + self.params.suffix + # data[nulls] = None + # return data + + def transform_single(self, data: Optional[Any]) -> Optional[str]: + if is_null(data): + return None + return self.params.prefix + str(data) + self.params.suffix diff --git a/src/bears/processor/_categorical/_LabelEncoding.py b/src/bears/processor/_categorical/_LabelEncoding.py new file mode 100644 index 0000000..e2c224b --- /dev/null +++ b/src/bears/processor/_categorical/_LabelEncoding.py @@ -0,0 +1,218 @@ +from typing import ( + Any, + Callable, + Dict, + List, + Optional, + Set, + Tuple, + Union, +) + +import numpy as np +from autoenum import AutoEnum, auto +from bears import ScalableSeries, ScalableSeriesRawType +from bears.util import is_null, type_str +from pydantic import model_validator + +from bears.processor import ( + EncodedLabelOutputProcessor, + SingleColumnProcessor, + TextOrLabelInputProcessor, +) + + +class EncodingRange(AutoEnum): + ONE_TO_N = auto() + ZERO_TO_N_MINUS_ONE = auto() + BINARY_ZERO_ONE = auto() + BINARY_PLUS_MINUS_ONE = auto() + + +ENCODING_RANGE_TO_UNKNOWN_LABELS_MAP = { + EncodingRange.ONE_TO_N: 0, + EncodingRange.BINARY_ZERO_ONE: -1, + EncodingRange.BINARY_PLUS_MINUS_ONE: 0, + EncodingRange.ZERO_TO_N_MINUS_ONE: -1, +} + +BINARY_POSITIVE_LABELS: Set[str] = {"1", "Y", "YES", "TRUE", "T"} +BINARY_NEGATIVE_LABELS: Set[str] = {"0", "-1", "N", "NO", "FALSE", "F"} + +LabelEncoding = "LabelEncoding" + + +class LabelEncoding(SingleColumnProcessor, TextOrLabelInputProcessor, EncodedLabelOutputProcessor): + """ + Fits a list of categorical or integer values and transforms each into an integer value. + Params: + - ENCODING_RANGE: the output range of integer values, must be long to enum EncodingRange. Values: + - ONE_TO_N: encodes to 1, 2, 3, ... N (number of unique labels) + - ZERO_TO_N_MINUS_ONE: encodes to 0, 1, 2, ... N-1 (number of unique labels) + - BINARY_ZERO_ONE: encodes to 0 or 1. Throws an exception if the labels are not binary. + - BINARY_PLUS_MINUS_ONE: encodes to -1 or +1. Throws an exception if the labels are not binary. + - MISSING_INPUT_FILL_VALUE: the value to fill for None/NaN labels. + - UNKNOWN_INPUT_ENCODING_VALUE: the encoding value to fill for labels which are present in the data passed to the + transform step but not present in the data used to fit the transformer. + """ + + aliases = ["LabelEncoder"] + + class Params(SingleColumnProcessor.Params): + encoding_range: EncodingRange = EncodingRange.ONE_TO_N + missing_input_fill_value: Optional[Any] = None + unknown_input_encoding_value: Optional[Any] = None + label_normalizer: Optional[Callable[[Any], str]] = None + + @model_validator(mode="before") + @classmethod + def set_unknown_input_encoding_value(cls, params: Dict) -> Dict: + cls.set_default_param_values(params) + if params.get("unknown_input_encoding_value") is None: + params["unknown_input_encoding_value"]: Any = ENCODING_RANGE_TO_UNKNOWN_LABELS_MAP[ + params["encoding_range"] + ] + return params + + label_encoding_dict: Dict[Any, int] = None ## Stores normalized labels if label_normalizer is not None + label_decoding_dict: Dict[int, Any] = None ## Stores normalized labels if label_normalizer is not None + + @classmethod + def from_labelspace( + cls, + labelspace: Union[Set, List, Tuple], + label_encoding_range: EncodingRange, + label_normalizer: Callable[[Any], str], + ) -> LabelEncoding: + """ + Static factory to create a LabelEncoding object from a list/set/tuple of labels. + :param labelspace: complete set of labels for an ML training dataset. + :param label_encoding_range: the range of values to which we should encode the labels. + :param label_normalizer: function to normalize labels. + :return: LabelEncoding object. + """ + if len(labelspace) == 2: + lb1, lb2 = tuple(labelspace) ## Assume normalized beforehand. + lb1: str = label_normalizer(lb1) + lb2: str = label_normalizer(lb2) + if lb1.upper() in BINARY_NEGATIVE_LABELS and lb2.upper() in BINARY_POSITIVE_LABELS: + return LabelEncoding( + label_encoding_dict={lb1: 0, lb2: 1}, + label_decoding_dict={0: lb1, 1: lb2}, + params=dict( + encoding_range=EncodingRange.BINARY_ZERO_ONE, + label_normalizer=label_normalizer, + ), + ) + elif lb1.upper() in BINARY_POSITIVE_LABELS and lb2.upper() in BINARY_NEGATIVE_LABELS: + return LabelEncoding( + label_encoding_dict={lb2: 0, lb1: 1}, + label_decoding_dict={0: lb2, 1: lb1}, + params=dict( + encoding_range=EncodingRange.BINARY_ZERO_ONE, + label_normalizer=label_normalizer, + ), + ) + label_encoding_range: EncodingRange = EncodingRange.BINARY_ZERO_ONE + label_encoder: LabelEncoding = LabelEncoding( + params=dict( + encoding_range=label_encoding_range, + label_normalizer=label_normalizer, + ) + ) + label_encoder.fit(np.array(list(labelspace))) + return label_encoder + + def _fit_series(self, data: ScalableSeries): + ## Cannot use np.unique with NaNs in the data, as it replicates the nans: + labels: np.ndarray = self._fill_missing_values(data).dropna().numpy() + if self.params.missing_input_fill_value is not None: + labels: np.ndarray = np.append(labels, self.params.missing_input_fill_value) + labels: np.ndarray = np.unique(labels) ## Makes unique. + if self.params.label_normalizer is not None: + ## Normalize labels before encoding: + labels: np.ndarray = np.array([self.params.label_normalizer(lb) for lb in labels]) + labels: np.ndarray = np.unique(labels) ## Makes unique post-normalization. + ## The 2nd return param is an index of the unique labels, i.e. an encoding from 0 to N-1: + labels, encoded_labels = np.unique(labels, return_inverse=True) + num_labels, num_encodings = len(labels), len(encoded_labels) + if num_labels == 0: + raise ValueError("Input data must contain at least one non-null entry.") + if num_labels != num_encodings: + raise ValueError( + "Each label should have exactly one encoding. " + + f"Found: no. unique labels={num_labels}, no. encodings={num_encodings}" + ) + ## Adjust label encoding based on encoding range: + if self.params.encoding_range is EncodingRange.ZERO_TO_N_MINUS_ONE: + self.label_encoding_dict: Dict[Any, int] = dict(zip(labels, encoded_labels)) + elif self.params.encoding_range is EncodingRange.ONE_TO_N: + ## encoded_labels goes from 0 to N-1 + self.label_encoding_dict: Dict[Any, int] = dict(zip(labels, encoded_labels + 1)) + elif self.params.encoding_range is EncodingRange.BINARY_ZERO_ONE: + if num_labels > 2: + raise ValueError( + f"{EncodingRange.BINARY_ZERO_ONE} encoding supports <=2 labels, found {num_labels}" + ) + self.label_encoding_dict: Dict[Any, int] = {labels[0]: 0} + if num_labels == 2: + self.label_encoding_dict[labels[1]] = 1 + elif self.params.encoding_range is EncodingRange.BINARY_PLUS_MINUS_ONE: + if num_labels > 2: + raise ValueError( + f"{EncodingRange.BINARY_PLUS_MINUS_ONE} needs <=2 labels, found {num_labels}" + ) + self.label_encoding_dict: Dict[Any, int] = {labels[0]: -1} + if num_labels == 2: + self.label_encoding_dict[labels[1]] = 1 + else: + raise NotImplementedError(f"Unsupported encoding range: {self.params.encoding_range}") + self.label_decoding_dict: Dict[int, Any] = {v: k for k, v in self.label_encoding_dict.items()} + + def _transform_series(self, data: ScalableSeries) -> ScalableSeries: + if self.label_encoding_dict is None: + raise self.FitBeforeTransformError + data: ScalableSeries = self._fill_missing_values(data) + if self.params.label_normalizer is not None: + data: ScalableSeries = data.map(self.params.label_normalizer, na_action="ignore") + return data.map(self.label_encoding_dict, na_action="ignore").fillna( + self.params.unknown_input_encoding_value + ) + + def transform_single(self, data: Optional[Any]) -> int: + if self.label_encoding_dict is None: + raise self.FitBeforeTransformError + data = self._fill_missing_value(data) + return int(self.label_encoding_dict.get(data, self.params.unknown_input_encoding_value)) + + def inverse_transform_series( + self, + data: Union[ScalableSeries, ScalableSeriesRawType], + ) -> Union[ScalableSeries, ScalableSeriesRawType]: + if self.label_decoding_dict is None: + raise self.FitBeforeTransformError + output: ScalableSeries = ScalableSeries.of(data).map(self.label_decoding_dict, na_action="ignore") + if not isinstance(data, ScalableSeries): + output: ScalableSeriesRawType = output.raw() + return output + + def inverse_transform_single(self, data: int) -> Optional[str]: + if self.label_decoding_dict is None: + raise self.FitBeforeTransformError + if not isinstance(data, int): + raise ValueError( + f"Expected input data to be an integer; found {type_str(data)} having value: {data}" + ) + return self.label_decoding_dict.get(data) + + def _fill_missing_value(self, data: Any): + """TODO: replace this with a transformer or util which imputes missing values.""" + if is_null(data) and self.params.missing_input_fill_value is not None: + return self.params.missing_input_fill_value + return data + + def _fill_missing_values(self, data: ScalableSeries) -> ScalableSeries: + """TODO: replace this with a transformer or util which imputes missing values.""" + if self.params.missing_input_fill_value is not None: + return data.fillna(self.params.missing_input_fill_value) + return data diff --git a/src/bears/processor/_categorical/__init__.py b/src/bears/processor/_categorical/__init__.py new file mode 100644 index 0000000..16d8944 --- /dev/null +++ b/src/bears/processor/_categorical/__init__.py @@ -0,0 +1,3 @@ +from bears.processor._categorical._CategoricalMissingValueImputation import * +from bears.processor._categorical._LabelAffix import * +from bears.processor._categorical._LabelEncoding import * \ No newline at end of file diff --git a/src/bears/processor/_numeric/_NumericMissingValueImputation.py b/src/bears/processor/_numeric/_NumericMissingValueImputation.py new file mode 100644 index 0000000..5c26cd4 --- /dev/null +++ b/src/bears/processor/_numeric/_NumericMissingValueImputation.py @@ -0,0 +1,84 @@ +from typing import ( + Any, + Callable, + ClassVar, + Dict, + Optional, +) + +import pandas as pd +from autoenum import AutoEnum, auto +from bears.util import is_null +from pydantic import model_validator + +from bears.constants import MLType +from bears.processor import SingleColumnProcessor + + +class NumericImputationStrategy(AutoEnum): + MEAN = auto() + MEDIAN = auto() + MODE = auto() + MIN = auto() + MAX = auto() + CONSTANT = auto() + + +class NumericMissingValueImputation(SingleColumnProcessor): + """ + This calculates or fills in the value to be filled in place of nan based on strategy passed as input. + Params: + - FILL_VALUE: the value to be filled in when it encounters a NaN (This must be only passed when CONSTANT is strategy) + - STRATEGY: this indicates what strategy must be used when NaN is encountered + - MEAN: The "average" you're used to, where you add up all the numbers and then divide by the number of numbers + - MEDIAN: The "median" is the "middle" value in the list of numbers + - MODE: The number which appears most often in a set of numbers + - MIN: The minimum value of the series + - MAX: The Maximum value of the series + - CONSTANT: This allows the user to pass in a fill value where that fill value will be imputed + """ + + input_mltypes = [MLType.INT, MLType.FLOAT] + output_mltype = MLType.FLOAT + IMPUTE_FN_MAP: ClassVar[Dict[NumericImputationStrategy, Callable]] = { + NumericImputationStrategy.MODE: lambda _data: _data.mode(dropna=True).compute().iloc[0], + NumericImputationStrategy.MEAN: lambda _data: _data.mean(skipna=True), + NumericImputationStrategy.MEDIAN: lambda _data: _data.median(skipna=True), + NumericImputationStrategy.MIN: lambda _data: _data.min(skipna=True), + NumericImputationStrategy.MAX: lambda _data: _data.max(skipna=True), + } + + class Params(SingleColumnProcessor.Params): + strategy: NumericImputationStrategy + fill_value: Optional[Any] = None + + imputed_value: Optional[Any] = None + + @model_validator(mode="before") + @classmethod + def set_imputed_value(cls, params: Dict) -> Dict: + cls.set_default_param_values(params) + if params["params"].strategy is NumericImputationStrategy.CONSTANT: + if params["params"].fill_value is None: + raise ValueError( + f"Cannot have empty `fill_value` when `strategy` is {NumericImputationStrategy.CONSTANT}" + ) + params["imputed_value"] = params["params"].fill_value + elif params["params"].fill_value is not None: + raise ValueError( + f"`fill_value` can only be passed when strategy={NumericImputationStrategy.CONSTANT}" + ) + return params + + def _fit_series(self, data: pd.Series): + if self.params.strategy is not NumericImputationStrategy.CONSTANT: + if self.imputed_value is not None: + raise self.AlreadyFitError + self.imputed_value: Any = self.IMPUTE_FN_MAP[self.params.strategy](data) + + def transform_single(self, data: Optional[Any]) -> Any: + if self.imputed_value is None and self.params.strategy is not NumericImputationStrategy.CONSTANT: + raise self.FitBeforeTransformError + if is_null(data): + data = self.imputed_value + return data diff --git a/src/bears/processor/_numeric/__init__.py b/src/bears/processor/_numeric/__init__.py new file mode 100644 index 0000000..9d2a1c8 --- /dev/null +++ b/src/bears/processor/_numeric/__init__.py @@ -0,0 +1 @@ +from bears.processor._numeric._NumericMissingValueImputation import * diff --git a/src/bears/processor/_text/_CaseTransformation.py b/src/bears/processor/_text/_CaseTransformation.py new file mode 100644 index 0000000..b2a1a3b --- /dev/null +++ b/src/bears/processor/_text/_CaseTransformation.py @@ -0,0 +1,34 @@ +from typing import ( + Optional, +) + +from autoenum import AutoEnum, auto +from bears.util import is_null + +from bears.processor import SingleColumnProcessor, TextInputProcessor, TextOutputProcessor + + +class Case(AutoEnum): + UPPER = auto() + LOWER = auto() + + +class CaseTransformation(SingleColumnProcessor, TextInputProcessor, TextOutputProcessor): + """ + Transforms the text case to uppercase or lowercase. + + Params: + - CASE: must be the string 'upper' or 'lower'. + """ + + class Params(SingleColumnProcessor.Params): + case: Case = Case.LOWER + + def transform_single(self, data: Optional[str]) -> Optional[str]: + if is_null(data): + return None + if self.params.case is Case.LOWER: + return data.lower() + elif self.params.case is Case.UPPER: + return data.upper() + raise NotImplementedError(f"Unsupported case: {self.params.case}") diff --git a/src/bears/processor/_text/_HtmlTagRemoval.py b/src/bears/processor/_text/_HtmlTagRemoval.py new file mode 100644 index 0000000..b24d250 --- /dev/null +++ b/src/bears/processor/_text/_HtmlTagRemoval.py @@ -0,0 +1,23 @@ +import re +from typing import ( + ClassVar, + Optional, +) + +from bears.util import is_null + +from bears.processor import SingleColumnProcessor, TextInputProcessor, TextOutputProcessor + + +class HtmlTagRemoval(SingleColumnProcessor, TextInputProcessor, TextOutputProcessor): + """ + Removes HTML tags from the text. Leaves the content between tags untouched. + An HTML tag is recognized as anything between a pair of crocodile brackets, e.g.
, < p>, < p >, < /p html >, etc. + """ + + HTML_REGEX: ClassVar = re.compile("<.*?>") + + def transform_single(self, data: Optional[str]) -> Optional[str]: + if is_null(data): + return None + return self.HTML_REGEX.sub("", data) diff --git a/src/bears/processor/_text/_PunctuationCleaner.py b/src/bears/processor/_text/_PunctuationCleaner.py new file mode 100644 index 0000000..fc60246 --- /dev/null +++ b/src/bears/processor/_text/_PunctuationCleaner.py @@ -0,0 +1,25 @@ +import string +from typing import ( + Optional, +) + +from bears.util import String, is_null +from pydantic import constr + +from bears.processor import SingleColumnProcessor, TextInputProcessor, TextOutputProcessor + + +class PunctuationCleaner(SingleColumnProcessor, TextInputProcessor, TextOutputProcessor): + """ + Replaces punctuations with spaces. + """ + + class Params(SingleColumnProcessor.Params): + replacement_char: constr(min_length=1) = String.SPACE + + def transform_single(self, data: Optional[str]) -> Optional[str]: + if is_null(data): + return None + return data.translate( + str.maketrans(string.punctuation, self.params.replacement_char * len(string.punctuation)) + ) diff --git a/src/bears/processor/_text/_RegexSubstitution.py b/src/bears/processor/_text/_RegexSubstitution.py new file mode 100644 index 0000000..43d276a --- /dev/null +++ b/src/bears/processor/_text/_RegexSubstitution.py @@ -0,0 +1,61 @@ +import re +from typing import ( + Any, + Dict, + List, + Optional, + Tuple, +) + +from bears.util import is_null +from pydantic import constr, model_validator + +from bears.processor import SingleColumnProcessor, TextInputProcessor, TextOutputProcessor + + +class RegexSubstitution(SingleColumnProcessor, TextInputProcessor, TextOutputProcessor): + """ + Replaces each matched regex pattern in a list with the corresponding substitution pattern. + + Params: + - SUBSTITUTION_LIST: a list of 2-tuples, where the first element is the regex to match and the second is the + substitution (which might be string or regex, controllable via SUBSTITUTE_IS_REGEX). + This list of substitutions will be applied on the input text sequentially. + - IGNORECASE: whether to ignore case during regex matching. + - MULTILINE: whether to do multiline mathcing during regex matching. + - SUBSTITUTE_IS_REGEX: whether the substitution is a regex expression. If set to True, the transformer will compile + the substitution as regex during replacement, allowing usage of capturing groups etc. + """ + + class Params(SingleColumnProcessor.Params): + substitution_list: List[Tuple[constr(min_length=1), constr(min_length=0)]] + ignorecase: bool = False + multiline: bool = True + substitute_is_regex: bool = True + flags: Optional[int] = None + match_patterns: Dict[constr(min_length=1), Any] = None + + @model_validator(mode="before") + @classmethod + def set_flags(cls, params: Dict) -> Dict: + cls.set_default_param_values(params) + flags = 0 + if params["ignorecase"]: + flags |= re.IGNORECASE + if params["multiline"]: + flags |= re.MULTILINE + params["flags"] = flags + params["match_patterns"] = { + regex_pattern: re.compile(regex_pattern, flags=flags) + for regex_pattern, _ in params["substitution_list"] + } + return params + + def transform_single(self, data: Optional[str]) -> Optional[str]: + if is_null(data): + return None + for regex_pattern, sub_str in self.params.substitution_list: + match_pattern = self.params.match_patterns[regex_pattern] + sub_pattern = sub_str if not self.params.substitute_is_regex else r"%s" % (sub_str) + data: str = match_pattern.sub(sub_pattern, data) + return data diff --git a/src/bears/processor/_text/_StringRemoval.py b/src/bears/processor/_text/_StringRemoval.py new file mode 100644 index 0000000..e423532 --- /dev/null +++ b/src/bears/processor/_text/_StringRemoval.py @@ -0,0 +1,33 @@ +from typing import Dict, List, Optional + +from bears.util import is_list_like, is_null +from pydantic import model_validator + +from bears.processor import SingleColumnProcessor, TextInputProcessor, TextOutputProcessor + + +class StringRemoval(SingleColumnProcessor, TextInputProcessor, TextOutputProcessor): + """ + Removes certain strings from each text string using str.replace() (no regex matching). + + Params: + - REMOVAL_LIST: the list of strings to remove. + """ + + class Params(SingleColumnProcessor.Params): + removal_list: List[str] + + @model_validator(mode="before") + @classmethod + def set_params(cls, params: Dict) -> Dict: + removal_list = params.get("removal_list") + if not is_list_like(removal_list) or len(removal_list) == 0: + raise ValueError("`removal_list` should be a non-empty list of strings") + return params + + def transform_single(self, data: Optional[str]) -> Optional[str]: + if is_null(data): + return None + for s in self.params.removal_list: + data: str = data.replace(s, "") + return data diff --git a/src/bears/processor/_text/_TFIDFVectorization.py b/src/bears/processor/_text/_TFIDFVectorization.py new file mode 100644 index 0000000..b119073 --- /dev/null +++ b/src/bears/processor/_text/_TFIDFVectorization.py @@ -0,0 +1,74 @@ +from ast import literal_eval +from typing import Dict, Optional, Union + +import numpy as np +from bears import ScalableSeries +from bears.util import if_else, is_dict_like +from pydantic import model_validator +from scipy.sparse import csr_matrix as SparseCSRMatrix +from sklearn.feature_extraction.text import TfidfVectorizer + +from bears.constants import MLType +from bears.processor import SingleColumnProcessor, TextInputProcessor +from bears.processor._vector._VectorDensifier import VectorDensifier + + +class TFIDFVectorization(SingleColumnProcessor, TextInputProcessor): + """ + Performs TF-IDF Vectorization of a text column using sklearn's TFIDFVectorizer. + Ref: https://scikit-learn.org/stable/modules/generated/sklearn.feature_extraction.text.TfidfVectorizer.html + Params: + - OUTPUT_SPARSE: whether to output each row as a sparse row matrix (1 x N). If False, will output a 1d numpy array. + - SKLEARN_PARAMS: dictionary of sklearn params to be unpacked as keyword arguments to the constructor + sklearn.feature_extraction.text.TfidfVectorizer. Thus, keys are case-sensitive. + """ + + class Params(SingleColumnProcessor.Params): + sklearn_params: Dict = {} + output_sparse: bool = False + + @model_validator(mode="before") + @classmethod + def set_params(cls, params: Dict) -> Dict: + cls.set_default_param_values(params) + sklearn_tfidf_params: Dict = params["sklearn_params"] + if not is_dict_like(sklearn_tfidf_params): + raise ValueError("`sklearn_params` should be a dictionary") + token_pattern: Optional = sklearn_tfidf_params.get("token_pattern") + if token_pattern is not None: + sklearn_tfidf_params["token_pattern"] = str(sklearn_tfidf_params.get("token_pattern")) + ngram_range: Optional = sklearn_tfidf_params.get("ngram_range") + if ngram_range is not None: + if isinstance(ngram_range, str): + ngram_range = literal_eval(ngram_range) + if isinstance(ngram_range, list): + ngram_range = tuple(ngram_range) + assert isinstance(ngram_range, tuple) + sklearn_tfidf_params["ngram_range"] = ngram_range + params["sklearn_params"] = sklearn_tfidf_params + return params + + output_mltype = MLType.VECTOR + vectorizer: TfidfVectorizer = None + vector_densifier: VectorDensifier = None + + @model_validator(mode="before") + @classmethod + def set_vectorizer(cls, params: Dict) -> Dict: + cls.set_default_param_values(params) + params["vectorizer"]: TfidfVectorizer = TfidfVectorizer(**params["params"].sklearn_params) + params["vector_densifier"]: VectorDensifier = VectorDensifier() + params["output_mltype"]: MLType = if_else( + params["params"].output_sparse, MLType.SPARSE_VECTOR, MLType.VECTOR + ) + return params + + def _fit_series(self, data: ScalableSeries): + self.vectorizer.fit(data.pandas()) ## TODO: Super slow, replace with Dask TFIDF + + def transform_single(self, data: str) -> Union[SparseCSRMatrix, np.ndarray]: + ## Will output a sparse matrix with only a single row. + tfidf_vec: SparseCSRMatrix = self.vectorizer.transform([data]) + if not self.params.output_sparse: + tfidf_vec: np.ndarray = self.vector_densifier.transform_single(tfidf_vec) + return tfidf_vec diff --git a/src/bears/processor/_text/_TextConcatenation.py b/src/bears/processor/_text/_TextConcatenation.py new file mode 100755 index 0000000..34f434a --- /dev/null +++ b/src/bears/processor/_text/_TextConcatenation.py @@ -0,0 +1,102 @@ +from typing import ( + Any, + Dict, + List, + Optional, +) + +from autoenum import AutoEnum, auto +from bears import ScalableDataFrame, ScalableSeries +from bears.util import String, is_list_like, is_null +from pydantic import constr, model_validator + +from bears.processor import ( + Nto1ColumnProcessor, + TextInputProcessor, + TextOutputProcessor, +) + + +class ColumnOrder(AutoEnum): + SORT_BY_NAME_ASCENDING = auto() + SORT_BY_NAME_DESCENDING = auto() + SORT_BY_SHORTEST_FIRST = auto() + INPUT_ORDER = auto() + + +class TextConcatenation(Nto1ColumnProcessor, TextInputProcessor, TextOutputProcessor): + """ + Concatenates text from multiple columns into a single column. + For non-text columns, converts to string and then concatenates. + + Params: + - SEP: the separator between columns in the combined text string. + - COLUMN_ORDER: which way to order columns. + """ + + class Params(Nto1ColumnProcessor.Params): + sep: constr(min_length=1) = String.SPACE + column_order: ColumnOrder = ( + ColumnOrder.SORT_BY_NAME_ASCENDING + ) ## Do not change this for legacy reasons. + input_ordering: Optional[List[str]] = None + prefix_col_name: Optional[bool] = False + prefix_col_sep: Optional[str] = ": " + allow_missing: Optional[bool] = False + + ordered_cols: Optional[List[str]] = None + + @model_validator(mode="before") + @classmethod + def set_ordered_cols(cls, params: Dict) -> Dict: + cls.set_default_param_values(params) + if params["params"].column_order is ColumnOrder.INPUT_ORDER: + if not is_list_like(params["params"].input_ordering): + raise ValueError( + f"`input_ordering` must be a non-empty list when column_order={ColumnOrder.INPUT_ORDER}" + ) + params["ordered_cols"]: List[str] = params["params"].input_ordering + return params + + def _fit_df(self, data: ScalableDataFrame): + cols: List[str] = list(data.columns) + if self.params.column_order is ColumnOrder.SORT_BY_SHORTEST_FIRST: + avg_column_length: Dict[str, float] = { + col: data[col].dropna().astype(str).apply(len).mean() for col in cols + } + ## Sort first by avg. length, then by column name: + self.ordered_cols: List[str] = [ + col for col, avg_len in sorted(list(avg_column_length.items()), key=lambda x: (x[1], x[0])) + ] + elif self.params.column_order is ColumnOrder.SORT_BY_NAME_DESCENDING: + self.ordered_cols: List[str] = sorted(cols, reverse=True) + elif self.params.column_order is ColumnOrder.SORT_BY_NAME_ASCENDING: + self.ordered_cols: List[str] = sorted(cols) + elif self.params.column_order is ColumnOrder.INPUT_ORDER: + self.ordered_cols: List[str] = self.params.input_ordering + else: + self.ordered_cols = None + + def _transform_single(self, data: List[Any]) -> str: + """Concatanate a list of data of any type""" + return self.params.sep.join([str(x) for x in data if not is_null(x)]) + + def _transform_df(self, data: ScalableDataFrame) -> ScalableSeries: + if self.ordered_cols is None: + raise self.FitBeforeTransformError + output_series: Optional[ScalableSeries] = None + for col in self.ordered_cols: + if col not in data.columns_set: + if self.params.allow_missing: + continue + raise ValueError( + f"Column {col} is required but not found in input data. Input data has columns: {data.columns}" + ) + to_add_col = col + self.params.prefix_col_sep + if self.params.prefix_col_name is False: + to_add_col = "" + if output_series is None: + output_series: ScalableSeries = to_add_col + data[col].fillna(String.EMPTY).astype(str) + else: + output_series += self.params.sep + to_add_col + data[col].fillna(String.EMPTY).astype(str) + return output_series diff --git a/src/bears/processor/_text/__init__.py b/src/bears/processor/_text/__init__.py new file mode 100644 index 0000000..f3cc645 --- /dev/null +++ b/src/bears/processor/_text/__init__.py @@ -0,0 +1,7 @@ +from bears.processor._text._CaseTransformation import * +from bears.processor._text._HtmlTagRemoval import * +from bears.processor._text._PunctuationCleaner import * +from bears.processor._text._RegexSubstitution import * +from bears.processor._text._StringRemoval import * +from bears.processor._text._TextConcatenation import * +from bears.processor._text._TFIDFVectorization import * diff --git a/src/bears/processor/_vector/_VectorAssembler.py b/src/bears/processor/_vector/_VectorAssembler.py new file mode 100644 index 0000000..bfc0035 --- /dev/null +++ b/src/bears/processor/_vector/_VectorAssembler.py @@ -0,0 +1,71 @@ +from typing import ( + Any, + List, + Optional, + Set, + Tuple, + Union, +) + +import numpy as np +from autoenum import AutoEnum, auto +from bears import ScalableDataFrame, ScalableSeries +from bears.util import as_list, is_null +from scipy.sparse import csr_matrix as SparseCSRMatrix + +from bears.constants import MLType +from bears.processor import Nto1ColumnProcessor, VectorAssemblerInputProcessor, VectorOutputProcessor + + +class InvalidBehavior(AutoEnum): + ERROR = auto() + KEEP = auto() + + +class VectorAssembler(Nto1ColumnProcessor, VectorAssemblerInputProcessor, VectorOutputProcessor): + """ + Concatenates multiple columns into a single vector column + + Params: + - HANDLE_INVALID: how to handle NaN values in columns + - ERROR: Throws an error if invalid data/NaN value is present + - KEEP: Keeps all the rows, ignores NaNs and Nones. + """ + + class Params(Nto1ColumnProcessor.Params): + handle_invalid: InvalidBehavior = InvalidBehavior.KEEP + + def _transform_df(self, data: ScalableDataFrame) -> ScalableSeries: + output_series: Optional[ScalableSeries] = None + for col in sorted(list(data.columns)): + if output_series is None: + output_series: ScalableSeries = self._transform_series(data[col], col) + else: + output_series += self._transform_series(data[col], col) + return output_series + + def _transform_series(self, data: ScalableSeries, col: str) -> ScalableSeries: + feature_type: MLType = self.data_schema[col] + if feature_type in {MLType.INT, MLType.FLOAT, MLType.VECTOR}: + return data.apply(self._convert_to_list, col=col) + elif feature_type is MLType.SPARSE_VECTOR: + return data.apply(self._convert_sparse_vector_to_dense_vector, col=col) + else: + raise TypeError(f"{col} Column must be of type {self.input_mltypes}; found {feature_type}") + + def _convert_sparse_vector_to_dense_vector(self, vector: SparseCSRMatrix, col: str): + if isinstance(vector, SparseCSRMatrix): + dense_vector: np.ndarray = vector.toarray()[0] + else: + if self.params.handle_invalid is InvalidBehavior.ERROR: + raise ValueError( + f'Expected only SparseCSRMatrix in column "{col}", got a value of type {type(vector)}' + ) + dense_vector: Optional[np.ndarray] = None + return self._convert_to_list(dense_vector, col) + + def _convert_to_list(self, val: Optional[Union[np.ndarray, List, Set, Tuple, Any]], col: str): + ## Assumes the length of vectors are same throughout the column. + if is_null(val) and self.params.handle_invalid is InvalidBehavior.ERROR: + raise ValueError(f'Got empty value ({val}) in column: "{col}"') + return as_list(val) diff --git a/src/bears/processor/_vector/_VectorDensifier.py b/src/bears/processor/_vector/_VectorDensifier.py new file mode 100644 index 0000000..3896b8b --- /dev/null +++ b/src/bears/processor/_vector/_VectorDensifier.py @@ -0,0 +1,36 @@ +from typing import ( + List, + Optional, +) + +import numpy as np +from bears.util import is_null +from scipy.sparse import csr_matrix as SparseCSRMatrix + +from bears.processor import SingleColumnProcessor, SparseVectorInputProcessor, VectorOutputProcessor + + +class VectorDensifier(SingleColumnProcessor, SparseVectorInputProcessor, VectorOutputProcessor): + """Converts a sparse vector column into a dense vector column. Each dense vector is a 1d numpy array.""" + + class Params(SingleColumnProcessor.Params): + output_list: bool = False + + def transform_single(self, data: SparseCSRMatrix) -> Optional[np.ndarray]: + if is_null(data): + return None + if not isinstance(data, SparseCSRMatrix): + raise ValueError(f"{str(self.__class__)} can only densify SparseCSRMatrix objects") + data: np.ndarray = data.toarray() + if len(data.shape) != 2: + raise ValueError( + f"Each SparseCSRMatrix to densify must have two dimensions. Found: {len(data.shape)} dims" + ) + if data.shape[0] != 1: + raise ValueError( + f"Each SparseCSRMatrix to densify must have exactly 1 row. Found: {data.shape[0]} rows" + ) + data: np.ndarray = data[0] + if self.params.output_list: + data: List = list(data) + return data diff --git a/src/bears/processor/_vector/__init__.py b/src/bears/processor/_vector/__init__.py new file mode 100644 index 0000000..1afd01d --- /dev/null +++ b/src/bears/processor/_vector/__init__.py @@ -0,0 +1,2 @@ +from bears.processor._vector._VectorAssembler import * +from bears.processor._vector._VectorDensifier import *