Skip to content

Release/2.13.9rc1 #1174

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions ads/opctl/operator/lowcode/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import tempfile
from typing import List, Union

import cloudpickle
import fsspec
import oracledb
import pandas as pd
Expand Down Expand Up @@ -126,7 +127,26 @@ def load_data(data_spec, storage_options=None, **kwargs):
return data


def _safe_write(fn, **kwargs):
try:
fn(**kwargs)
except Exception:
logger.warning(f'Failed to write file {kwargs.get("filename", "UNKNOWN")}')


def write_data(data, filename, format, storage_options=None, index=False, **kwargs):
return _safe_write(
fn=_write_data,
data=data,
filename=filename,
format=format,
storage_options=storage_options,
index=index,
**kwargs,
)


def _write_data(data, filename, format, storage_options=None, index=False, **kwargs):
disable_print()
if not format:
_, format = os.path.splitext(filename)
Expand All @@ -143,11 +163,24 @@ def write_data(data, filename, format, storage_options=None, index=False, **kwar


def write_json(json_dict, filename, storage_options=None):
return _safe_write(
fn=_write_json,
json_dict=json_dict,
filename=filename,
storage_options=storage_options,
)


def _write_json(json_dict, filename, storage_options=None):
with fsspec.open(filename, mode="w", **storage_options) as f:
f.write(json.dumps(json_dict))


def write_simple_json(data, path):
return _safe_write(fn=_write_simple_json, data=data, path=path)


def _write_simple_json(data, path):
if ObjectStorageDetails.is_oci_path(path):
storage_options = default_signer()
else:
Expand All @@ -156,6 +189,60 @@ def write_simple_json(data, path):
json.dump(data, f, indent=4)


def write_file(local_filename, remote_filename, storage_options, **kwargs):
return _safe_write(
fn=_write_file,
local_filename=local_filename,
remote_filename=remote_filename,
storage_options=storage_options,
**kwargs,
)


def _write_file(local_filename, remote_filename, storage_options, **kwargs):
with open(local_filename) as f1:
with fsspec.open(
remote_filename,
"w",
**storage_options,
) as f2:
f2.write(f1.read())


def load_pkl(filepath):
return _safe_write(fn=_load_pkl, filepath=filepath)


def _load_pkl(filepath):
storage_options = {}
if ObjectStorageDetails.is_oci_path(filepath):
storage_options = default_signer()

with fsspec.open(filepath, "rb", **storage_options) as f:
return cloudpickle.load(f)
return None


def write_pkl(obj, filename, output_dir, storage_options):
return _safe_write(
fn=_write_pkl,
obj=obj,
filename=filename,
output_dir=output_dir,
storage_options=storage_options,
)


def _write_pkl(obj, filename, output_dir, storage_options):
pkl_path = os.path.join(output_dir, filename)
with fsspec.open(
pkl_path,
"wb",
**storage_options,
) as f:
cloudpickle.dump(obj, f)


def merge_category_columns(data, target_category_columns):
result = data.apply(
lambda x: "__".join([str(x[col]) for col in target_category_columns]), axis=1
Expand Down Expand Up @@ -290,4 +377,8 @@ def disable_print():

# Restore
def enable_print():
try:
sys.stdout.close()
except Exception:
pass
sys.stdout = sys.__stdout__
17 changes: 14 additions & 3 deletions ads/opctl/operator/lowcode/forecast/model/automlx.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def __init__(self, config: ForecastOperatorConfig, datasets: ForecastDatasets):
super().__init__(config, datasets)
self.global_explanation = {}
self.local_explanation = {}
self.explainability_kwargs = {}

def set_kwargs(self):
model_kwargs_cleaned = self.spec.model_kwargs
Expand All @@ -54,6 +55,9 @@ def set_kwargs(self):
self.spec.preprocessing.enabled
or model_kwargs_cleaned.get("preprocessing", True)
)
sample_ratio = model_kwargs_cleaned.pop("sample_to_feature_ratio", None)
if sample_ratio is not None:
self.explainability_kwargs = {"sample_to_feature_ratio": sample_ratio}
return model_kwargs_cleaned, time_budget

def preprocess(self, data, series_id): # TODO: re-use self.le for explanations
Expand Down Expand Up @@ -445,6 +449,7 @@ def explain_model(self):
else None,
pd.DataFrame(data_i[self.spec.target_column]),
task="forecasting",
**self.explainability_kwargs,
)

# Generate explanations for the forecast
Expand Down Expand Up @@ -518,7 +523,9 @@ def get_validation_score_and_metric(self, model):
model_params = model.selected_model_params_
if len(trials) > 0:
score_col = [col for col in trials.columns if "Score" in col][0]
validation_score = trials[trials.Hyperparameters == model_params][score_col].iloc[0]
validation_score = trials[trials.Hyperparameters == model_params][
score_col
].iloc[0]
else:
validation_score = 0
return -1 * validation_score
Expand All @@ -531,8 +538,12 @@ def generate_train_metrics(self) -> pd.DataFrame:
for s_id in self.forecast_output.list_series_ids():
try:
metrics = {self.spec.metric.upper(): self.models[s_id]["score"]}
metrics_df = pd.DataFrame.from_dict(metrics, orient="index", columns=[s_id])
logger.warning("AutoMLX failed to generate training metrics. Recovering validation loss instead")
metrics_df = pd.DataFrame.from_dict(
metrics, orient="index", columns=[s_id]
)
logger.warning(
"AutoMLX failed to generate training metrics. Recovering validation loss instead"
)
total_metrics = pd.concat([total_metrics, metrics_df], axis=1)
except Exception as e:
logger.debug(
Expand Down
26 changes: 14 additions & 12 deletions ads/opctl/operator/lowcode/forecast/model/base_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from abc import ABC, abstractmethod
from typing import Tuple

import fsspec
import numpy as np
import pandas as pd
import report_creator as rc
Expand All @@ -25,10 +24,13 @@
disable_print,
enable_print,
human_time_friendly,
load_pkl,
merged_category_column_name,
seconds_to_datetime,
write_data,
write_file,
write_json,
write_pkl,
)
from ads.opctl.operator.lowcode.forecast.utils import (
_build_metrics_df,
Expand All @@ -38,8 +40,6 @@
evaluate_train_metrics,
get_auto_select_plot,
get_forecast_plots,
load_pkl,
write_pkl,
)

from ..const import (
Expand Down Expand Up @@ -493,13 +493,11 @@ def _save_report(
enable_print()

report_path = os.path.join(unique_output_dir, self.spec.report_filename)
with open(report_local_path) as f1:
with fsspec.open(
report_path,
"w",
**storage_options,
) as f2:
f2.write(f1.read())
write_file(
local_filename=report_local_path,
remote_filename=report_path,
storage_options=storage_options,
)

# forecast csv report
# todo: add test data into forecast.csv
Expand Down Expand Up @@ -576,7 +574,9 @@ def _save_report(
# Round to 4 decimal places before writing
global_expl_rounded = self.formatted_global_explanation.copy()
global_expl_rounded = global_expl_rounded.apply(
lambda col: np.round(col, 4) if np.issubdtype(col.dtype, np.number) else col
lambda col: np.round(col, 4)
if np.issubdtype(col.dtype, np.number)
else col
)
if self.spec.generate_explanation_files:
write_data(
Expand All @@ -598,7 +598,9 @@ def _save_report(
# Round to 4 decimal places before writing
local_expl_rounded = self.formatted_local_explanation.copy()
local_expl_rounded = local_expl_rounded.apply(
lambda col: np.round(col, 4) if np.issubdtype(col.dtype, np.number) else col
lambda col: np.round(col, 4)
if np.issubdtype(col.dtype, np.number)
else col
)
if self.spec.generate_explanation_files:
write_data(
Expand Down
24 changes: 10 additions & 14 deletions ads/opctl/operator/lowcode/forecast/model/neuralprophet.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@
from ads.opctl.operator.lowcode.common.utils import (
disable_print,
enable_print,
)
from ads.opctl.operator.lowcode.forecast.utils import (
_select_plot_list,
load_pkl,
write_pkl,
)
from ads.opctl.operator.lowcode.forecast.utils import _select_plot_list

from ..const import DEFAULT_TRIALS, SupportedModels
from ..operator_config import ForecastOperatorConfig
Expand Down Expand Up @@ -159,20 +157,18 @@ def _train_model(self, i, s_id, df, model_kwargs):
upper_bound=self.get_horizon(forecast[upper_bound_col_name]).values,
lower_bound=self.get_horizon(forecast[lower_bound_col_name]).values,
)
core_columns = set(forecast.columns) - set(
[
"y",
"yhat1",
upper_bound_col_name,
lower_bound_col_name,
"future_regressors_additive",
"future_regressors_multiplicative",
]
)
core_columns = set(forecast.columns) - {
"y",
"yhat1",
upper_bound_col_name,
lower_bound_col_name,
"future_regressors_additive",
"future_regressors_multiplicative",
}
exog_variables = set(
filter(lambda x: x.startswith("future_regressor_"), list(core_columns))
)
combine_terms = list(core_columns - exog_variables - set(["ds"]))
combine_terms = list(core_columns - exog_variables - {"ds"})
temp_df = (
forecast[list(core_columns)]
.rename({"ds": "Date"}, axis=1)
Expand Down
42 changes: 13 additions & 29 deletions ads/opctl/operator/lowcode/forecast/utils.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
#!/usr/bin/env python

# Copyright (c) 2023, 2024 Oracle and/or its affiliates.
# Copyright (c) 2023, 2025 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/

import logging
import os
from typing import Set

import cloudpickle
import fsspec
import numpy as np
import pandas as pd
import report_creator as rc
Expand All @@ -21,7 +19,6 @@
r2_score,
)

from ads.common.object_storage_details import ObjectStorageDetails
from ads.dataset.label_encoder import DataFrameLabelEncoder
from ads.opctl import logger
from ads.opctl.operator.lowcode.forecast.const import ForecastOutputColumns
Expand Down Expand Up @@ -170,26 +167,6 @@ def _build_metrics_per_horizon(
return metrics_df


def load_pkl(filepath):
storage_options = {}
if ObjectStorageDetails.is_oci_path(filepath):
storage_options = default_signer()

with fsspec.open(filepath, "rb", **storage_options) as f:
return cloudpickle.load(f)
return None


def write_pkl(obj, filename, output_dir, storage_options):
pkl_path = os.path.join(output_dir, filename)
with fsspec.open(
pkl_path,
"wb",
**storage_options,
) as f:
cloudpickle.dump(obj, f)


def _build_metrics_df(y_true, y_pred, series_id):
if len(y_true) == 0 or len(y_pred) == 0:
return pd.DataFrame()
Expand Down Expand Up @@ -251,7 +228,10 @@ def evaluate_train_metrics(output):


def _select_plot_list(fn, series_ids, target_category_column):
blocks = [rc.Widget(fn(s_id=s_id), label=s_id if target_category_column else None) for s_id in series_ids]
blocks = [
rc.Widget(fn(s_id=s_id), label=s_id if target_category_column else None)
for s_id in series_ids
]
return rc.Select(blocks=blocks) if len(blocks) > 1 else blocks[0]


Expand All @@ -264,8 +244,10 @@ def get_auto_select_plot(backtest_results):
back_test_csv_columns = backtest_results.columns.tolist()
back_test_column = "backtest"
metric_column = "metric"
models = [x for x in back_test_csv_columns if x not in [back_test_column, metric_column]]
for i, column in enumerate(models):
models = [
x for x in back_test_csv_columns if x not in [back_test_column, metric_column]
]
for column in models:
fig.add_trace(
go.Scatter(
x=backtest_results[back_test_column],
Expand All @@ -283,7 +265,7 @@ def get_forecast_plots(
horizon,
test_data=None,
ci_interval_width=0.95,
target_category_column=None
target_category_column=None,
):
def plot_forecast_plotly(s_id):
fig = go.Figure()
Expand Down Expand Up @@ -380,7 +362,9 @@ def plot_forecast_plotly(s_id):
)
return fig

return _select_plot_list(plot_forecast_plotly, forecast_output.list_series_ids(), target_category_column)
return _select_plot_list(
plot_forecast_plotly, forecast_output.list_series_ids(), target_category_column
)


def convert_target(target: str, target_col: str):
Expand Down
Loading