Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Fix current issues with forecasting #1283

Merged
merged 11 commits into from
Oct 19, 2023
52 changes: 45 additions & 7 deletions evadb/executor/create_function_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import hashlib
import os
import pickle
import re
from pathlib import Path
from typing import Dict, List

Expand Down Expand Up @@ -163,6 +164,16 @@ def handle_sklearn_function(self):
self.node.metadata,
)

def convert_to_numeric(self, x):
x = re.sub("[^0-9.]", "", str(x))
xzdandy marked this conversation as resolved.
Show resolved Hide resolved
try:
return int(x)
except ValueError:
try:
return float(x)
except ValueError:
return x

def handle_ultralytics_function(self):
"""Handle Ultralytics functions"""
try_to_import_ultralytics()
Expand All @@ -184,7 +195,12 @@ def handle_ultralytics_function(self):

def handle_forecasting_function(self):
"""Handle forecasting functions"""
os.environ["CUDA_VISIBLE_DEVICES"] = ""
save_old_cuda_env = None
if "CUDA_VISIBLE_DEVICES" in os.environ:
save_old_cuda_env = os.environ["CUDA_VISIBLE_DEVICES"]
os.environ["CUDA_VISIBLE_DEVICES"] = save_old_cuda_env.split(",")[0]
else:
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
aggregated_batch_list = []
child = self.children[0]
for batch in child.exec():
Expand Down Expand Up @@ -308,7 +324,7 @@ def handle_forecasting_function(self):
model_args["input_size"] = 2 * horizon
model_args["early_stop_patience_steps"] = 20
else:
model_args["config"] = {
model_args_config = {
"input_size": 2 * horizon,
"early_stop_patience_steps": 20,
}
Expand All @@ -320,7 +336,13 @@ def handle_forecasting_function(self):
if "auto" not in arg_map["model"].lower():
model_args["hist_exog_list"] = exogenous_columns
else:
model_args["config"]["hist_exog_list"] = exogenous_columns
model_args_config["hist_exog_list"] = exogenous_columns

def get_optuna_config(trial):
return model_args_config

model_args["config"] = get_optuna_config
model_args["backend"] = "optuna"

model_args["h"] = horizon

Expand Down Expand Up @@ -394,13 +416,26 @@ def handle_forecasting_function(self):
]
if len(existing_model_files) == 0:
logger.info("Training, please wait...")
for column in data.columns:
if column != "ds" and column != "unique_id":
data[column] = data.apply(
lambda x: self.convert_to_numeric(x[column]), axis=1
)
if library == "neuralforecast":
model.fit(df=data, val_size=horizon)
model.save(model_path, overwrite=True)
else:
# The following lines of code helps eliminate the math error encountered in statsforecast when only one datapoint is available in a time series
for col in data["unique_id"].unique():
if len(data[data["unique_id"] == col]) == 1:
data = data._append(
[data[data["unique_id"] == col]], ignore_index=True
)

xzdandy marked this conversation as resolved.
Show resolved Hide resolved
model.fit(df=data[["ds", "y", "unique_id"]])
f = open(model_path, "wb")
pickle.dump(model, f)
f.close()
f = open(model_path, "wb")
pickle.dump(model, f)
f.close()
elif not Path(model_path).exists():
model_path = os.path.join(model_dir, existing_model_files[-1])

Expand All @@ -422,7 +457,10 @@ def handle_forecasting_function(self):
FunctionMetadataCatalogEntry("library", library),
]

os.environ.pop("CUDA_VISIBLE_DEVICES", None)
if save_old_cuda_env is not None:
os.environ["CUDA_VISIBLE_DEVICES"] = save_old_cuda_env
else:
os.environ.pop("CUDA_VISIBLE_DEVICES", None)
xzdandy marked this conversation as resolved.
Show resolved Hide resolved

return (
self.node.name,
Expand Down
16 changes: 11 additions & 5 deletions evadb/functions/forecast.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,22 @@ def setup(
horizon: int,
library: str,
):
f = open(model_path, "rb")
loaded_model = pickle.load(f)
f.close()
self.library = library
if "neuralforecast" in self.library:
from neuralforecast import NeuralForecast

loaded_model = NeuralForecast.load(path=model_path)
self.model_name = model_name[4:] if "Auto" in model_name else model_name
else:
f = open(model_path, "rb")
xzdandy marked this conversation as resolved.
Show resolved Hide resolved
loaded_model = pickle.load(f)
f.close()
self.model_name = model_name
self.model = loaded_model
self.model_name = model_name
self.predict_column_rename = predict_column_rename
self.time_column_rename = time_column_rename
self.id_column_rename = id_column_rename
self.horizon = int(horizon)
self.library = library

def forward(self, data) -> pd.DataFrame:
if self.library == "statsforecast":
Expand Down