Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

fix: do not launch composition for atomized model #1324

Merged
merged 9 commits into from
Aug 26, 2024
6 changes: 5 additions & 1 deletion fedot/api/api_utils/predefined_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ def _get_pipeline(self, use_input_preprocessing: bool = True) -> Pipeline:
else:
raise ValueError(f'{type(self.predefined_model)} is not supported as Fedot model')

verify_pipeline(pipelines, task_type=self.data.task.task_type, raise_on_failure=True)
# TODO: Workaround for AtomizedModel
if "atomized" in pipelines.descriptive_id:
self.log.message("Pipeline verification for AtomizedModel currently unavailable")
else:
verify_pipeline(pipelines, task_type=self.data.task.task_type, raise_on_failure=True)

return pipelines

Expand Down
7 changes: 7 additions & 0 deletions fedot/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,13 @@ def fit(self,
with fedot_composer_timer.launch_preprocessing():
self.train_data = self.data_processor.fit_transform(self.train_data)

# TODO: Workaround for AtomizedModel
init_asm = self.params.data.get('initial_assumption')
if predefined_model is None:
if isinstance(init_asm, Pipeline) and ("atomized" in init_asm.descriptive_id):
self.log.message('Composition for AtomizedModel currently unavailable')
predefined_model = init_asm

with fedot_composer_timer.launch_fitting():
if predefined_model is not None:
# Fit predefined model and return it without composing
Expand Down
38 changes: 38 additions & 0 deletions test/integration/api/test_main_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@

from examples.simple.time_series_forecasting.ts_pipelines import ts_complex_ridge_smoothing_pipeline
from fedot import Fedot
from fedot.core.operations.atomized_model import AtomizedModel
from fedot.core.pipelines.node import PipelineNode
from fedot.core.pipelines.pipeline import Pipeline
from fedot.core.pipelines.pipeline_builder import PipelineBuilder
from fedot.core.repository.tasks import TsForecastingParams
from test.data.datasets import get_dataset, get_multimodal_ts_data, load_categorical_unimodal, \
load_categorical_multidata
Expand Down Expand Up @@ -84,6 +86,42 @@ def test_api_tune_correct(task_type, metric_name, pred_model):
assert len(test_data.target) == len(pred_before) == len(pred_after)


@pytest.mark.parametrize(
"task_type, metric_name, pred_model",
[
("classification", "f1", "dt"),
("regression", "rmse", "dtreg"),
],
)
def test_api_fit_atomized_model(task_type, metric_name, pred_model):
train_data, test_data, _ = get_dataset(task_type, n_samples=100, n_features=5, iris_dataset=False)

auto_model = Fedot(
problem=task_type,
metric=metric_name,
**TESTS_MAIN_API_DEFAULT_PARAMS,
initial_assumption=PipelineBuilder().add_node("scaling").add_node(pred_model).build()
)

auto_model.fit(features=train_data)
pred_auto_model = auto_model.predict(features=test_data)

prev_model = auto_model.current_pipeline
prev_model.unfit()

atomized_model = Pipeline(
PipelineNode(operation_type=AtomizedModel(prev_model), nodes_from=[PipelineNode("normalization")])
)

auto_model_from_atomized = Fedot(
problem=task_type, metric=metric_name, **TESTS_MAIN_API_DEFAULT_PARAMS, initial_assumption=atomized_model
)
auto_model_from_atomized.fit(features=train_data)
pred_auto_model_from_atomized = auto_model_from_atomized.predict(features=test_data)

assert len(test_data.target) == len(pred_auto_model) == len(pred_auto_model_from_atomized)


def test_api_simple_ts_predict_correct(task_type: str = 'ts_forecasting'):
# The forecast length must be equal to 5
forecast_length = 5
Expand Down
Loading