Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Fix current issues with forecasting #1283

Merged
merged 11 commits into from
Oct 19, 2023
76 changes: 67 additions & 9 deletions evadb/executor/create_function_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import contextlib
import hashlib
import locale
import os
import pickle
import re
from pathlib import Path
from typing import Dict, List

Expand Down Expand Up @@ -50,6 +53,31 @@
from evadb.utils.logging_manager import logger


# From https://stackoverflow.com/a/34333710
@contextlib.contextmanager
def set_env(**environ):
"""
Temporarily set the process environment variables.

>>> with set_env(PLUGINS_DIR='test/plugins'):
... "PLUGINS_DIR" in os.environ
True

>>> "PLUGINS_DIR" in os.environ
False

:type environ: dict[str, unicode]
:param environ: Environment variables to set
"""
old_environ = dict(os.environ)
os.environ.update(environ)
try:
yield
finally:
os.environ.clear()
os.environ.update(old_environ)


class CreateFunctionExecutor(AbstractExecutor):
def __init__(self, db: EvaDBDatabase, node: CreateFunctionPlan):
super().__init__(db, node)
Expand Down Expand Up @@ -169,6 +197,15 @@ def handle_sklearn_function(self):
self.node.metadata,
)

def convert_to_numeric(self, x):
x = re.sub("[^0-9.,]", "", str(x))
locale.setlocale(locale.LC_ALL, "")
x = float(locale.atof(x))
if x.is_integer():
return int(x)
else:
return x

def handle_xgboost_function(self):
"""Handle xgboost functions

Expand Down Expand Up @@ -245,7 +282,6 @@ def handle_ultralytics_function(self):

def handle_forecasting_function(self):
"""Handle forecasting functions"""
os.environ["CUDA_VISIBLE_DEVICES"] = ""
aggregated_batch_list = []
child = self.children[0]
for batch in child.exec():
Expand Down Expand Up @@ -369,7 +405,7 @@ def handle_forecasting_function(self):
model_args["input_size"] = 2 * horizon
model_args["early_stop_patience_steps"] = 20
else:
model_args["config"] = {
model_args_config = {
"input_size": 2 * horizon,
"early_stop_patience_steps": 20,
}
Expand All @@ -381,7 +417,13 @@ def handle_forecasting_function(self):
if "auto" not in arg_map["model"].lower():
model_args["hist_exog_list"] = exogenous_columns
else:
model_args["config"]["hist_exog_list"] = exogenous_columns
model_args_config["hist_exog_list"] = exogenous_columns

def get_optuna_config(trial):
return model_args_config

model_args["config"] = get_optuna_config
model_args["backend"] = "optuna"

model_args["h"] = horizon

Expand Down Expand Up @@ -455,13 +497,31 @@ def handle_forecasting_function(self):
]
if len(existing_model_files) == 0:
logger.info("Training, please wait...")
for column in data.columns:
if column != "ds" and column != "unique_id":
data[column] = data.apply(
lambda x: self.convert_to_numeric(x[column]), axis=1
)
if library == "neuralforecast":
model.fit(df=data, val_size=horizon)
cuda_devices_here = "0"
if "CUDA_VISIBLE_DEVICES" in os.environ:
cuda_devices_here = os.environ["CUDA_VISIBLE_DEVICES"].split(",")[0]

with set_env(CUDA_VISIBLE_DEVICES=cuda_devices_here):
model.fit(df=data, val_size=horizon)
model.save(model_path, overwrite=True)
else:
# The following lines of code helps eliminate the math error encountered in statsforecast when only one datapoint is available in a time series
for col in data["unique_id"].unique():
if len(data[data["unique_id"] == col]) == 1:
data = data._append(
[data[data["unique_id"] == col]], ignore_index=True
)

xzdandy marked this conversation as resolved.
Show resolved Hide resolved
model.fit(df=data[["ds", "y", "unique_id"]])
f = open(model_path, "wb")
pickle.dump(model, f)
f.close()
f = open(model_path, "wb")
pickle.dump(model, f)
f.close()
elif not Path(model_path).exists():
model_path = os.path.join(model_dir, existing_model_files[-1])

Expand All @@ -483,8 +543,6 @@ def handle_forecasting_function(self):
FunctionMetadataCatalogEntry("library", library),
]

os.environ.pop("CUDA_VISIBLE_DEVICES", None)

return (
self.node.name,
impl_path,
Expand Down
15 changes: 10 additions & 5 deletions evadb/functions/forecast.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,21 @@ def setup(
horizon: int,
library: str,
):
f = open(model_path, "rb")
loaded_model = pickle.load(f)
f.close()
self.library = library
if "neuralforecast" in self.library:
from neuralforecast import NeuralForecast

loaded_model = NeuralForecast.load(path=model_path)
self.model_name = model_name[4:] if "Auto" in model_name else model_name
else:
with open(model_path, "rb") as f:
loaded_model = pickle.load(f)
self.model_name = model_name
self.model = loaded_model
self.model_name = model_name
self.predict_column_rename = predict_column_rename
self.time_column_rename = time_column_rename
self.id_column_rename = id_column_rename
self.horizon = int(horizon)
self.library = library

def forward(self, data) -> pd.DataFrame:
if self.library == "statsforecast":
Expand Down