From cbb87e72d988a9a39effe8c9def7f8868427bd51 Mon Sep 17 00:00:00 2001 From: Shashank Suman Date: Fri, 3 Nov 2023 02:27:18 -0400 Subject: [PATCH] added job history for job scheduler --- evadb/catalog/catalog_manager.py | 52 ++++++++++- evadb/catalog/models/job_catalog.py | 2 + evadb/catalog/models/job_history_catalog.py | 59 ++++++++++++ evadb/catalog/models/utils.py | 24 +++++ evadb/catalog/services/job_catalog_service.py | 6 +- .../services/job_history_catalog_service.py | 92 +++++++++++++++++++ evadb/catalog/sql_config.py | 1 + evadb/executor/create_job_executor.py | 4 +- evadb/utils/job_scheduler.py | 59 ++++++++---- .../long/test_job_scheduler.py | 81 ++++++++++++++++ .../short/test_create_job_executor.py | 8 +- test/unit_tests/parser/test_parser.py | 1 - test/unit_tests/utils/test_job_scheduler.py | 60 ++++++++++++ 13 files changed, 417 insertions(+), 32 deletions(-) create mode 100644 evadb/catalog/services/job_history_catalog_service.py create mode 100644 test/integration_tests/long/test_job_scheduler.py create mode 100644 test/unit_tests/utils/test_job_scheduler.py diff --git a/evadb/catalog/catalog_manager.py b/evadb/catalog/catalog_manager.py index d3e6ead01..62ad30a59 100644 --- a/evadb/catalog/catalog_manager.py +++ b/evadb/catalog/catalog_manager.py @@ -41,6 +41,7 @@ FunctionMetadataCatalogEntry, IndexCatalogEntry, JobCatalogEntry, + JobHistoryCatalogEntry, TableCatalogEntry, drop_all_tables_except_catalog, init_db, @@ -64,6 +65,7 @@ ) from evadb.catalog.services.index_catalog_service import IndexCatalogService from evadb.catalog.services.job_catalog_service import JobCatalogService +from evadb.catalog.services.job_history_catalog_service import JobHistoryCatalogService from evadb.catalog.services.table_catalog_service import TableCatalogService from evadb.catalog.sql_config import IDENTIFIER_COLUMN, SQLConfig from evadb.expression.function_expression import FunctionExpression @@ -89,6 +91,9 @@ def __init__(self, db_uri: str): self._sql_config.session ) self._job_catalog_service = JobCatalogService(self._sql_config.session) + self._job_history_catalog_service = JobHistoryCatalogService( + self._sql_config.session + ) self._table_catalog_service = TableCatalogService(self._sql_config.session) self._column_service = ColumnCatalogService(self._sql_config.session) self._function_service = FunctionCatalogService(self._sql_config.session) @@ -231,7 +236,7 @@ def insert_job_catalog_entry( active: bool, next_schedule_run: datetime, ) -> JobCatalogEntry: - """A new entry is persisted in the job catalog." + """A new entry is persisted in the job catalog. Args: name: job name @@ -305,6 +310,51 @@ def update_job_catalog_entry( job_name, next_scheduled_run, active ) + "Job history catalog services" + + def insert_job_history_catalog_entry( + self, + job_id: str, + job_name: str, + execution_start_time: datetime, + execution_end_time: datetime, + ) -> JobCatalogEntry: + """A new entry is persisted in the job history catalog. + + Args: + job_id: job id for the execution entry + job_name: job name for the execution entry + execution_start_time: job execution start time + execution_end_time: job execution end time + """ + job_history_entry = self._job_history_catalog_service.insert_entry( + job_id, job_name, execution_start_time, execution_end_time + ) + + return job_history_entry + + def get_job_history_by_job_id(self, job_id: int) -> list[JobHistoryCatalogEntry]: + """Returns all the entries present for this job_id on in the history. + + Args: + job_id: the id of job whose history should be fetched + """ + return self._job_history_catalog_service.get_entry_by_job_id(job_id) + + def update_job_history_end_time( + self, job_id: int, execution_start_time: datetime, execution_end_time: datetime + ) -> list[JobHistoryCatalogEntry]: + """Updates the execution_end_time for this job history matching job_id and execution_start_time. + + Args: + job_id: id of the job whose history entry which should be updated + execution_start_time: the start time for the job history entry + execution_end_time: the end time for the job history entry + """ + return self._job_history_catalog_service.update_entry_end_time( + job_id, execution_start_time, execution_end_time + ) + "Table catalog services" def insert_table_catalog_entry( diff --git a/evadb/catalog/models/job_catalog.py b/evadb/catalog/models/job_catalog.py index da50c4a6a..07a66f622 100644 --- a/evadb/catalog/models/job_catalog.py +++ b/evadb/catalog/models/job_catalog.py @@ -17,6 +17,7 @@ import json from sqlalchemy import Boolean, Column, DateTime, Index, Integer, String +from sqlalchemy.orm import relationship from evadb.catalog.models.base_model import BaseModel from evadb.catalog.models.utils import JobCatalogEntry @@ -56,6 +57,7 @@ class JobCatalog(BaseModel): ) _next_run_index = Index("_next_run_index", _next_scheduled_run) + _job_history_catalog = relationship("JobHistoryCatalog", cascade="all, delete") def __init__( self, diff --git a/evadb/catalog/models/job_history_catalog.py b/evadb/catalog/models/job_history_catalog.py index 2338a33c1..c6c21ba3f 100644 --- a/evadb/catalog/models/job_history_catalog.py +++ b/evadb/catalog/models/job_history_catalog.py @@ -12,3 +12,62 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import datetime + +from sqlalchemy import Column, DateTime, ForeignKey, Integer, String, UniqueConstraint + +from evadb.catalog.models.base_model import BaseModel +from evadb.catalog.models.utils import JobHistoryCatalogEntry + + +class JobHistoryCatalog(BaseModel): + """The `JobHistoryCatalog` stores the execution history of jobs . + `_row_id:` an autogenerated unique identifier. + `_job_id:` job id. + `_job_name:` job name. + `_execution_start_time:` start time of this run + `_execution_end_time:` end time for this run + `_created_at:` entry creation time + `_updated_at:` entry last update time + """ + + __tablename__ = "job_history_catalog" + + _job_id = Column( + "job_id", Integer, ForeignKey("job_catalog._row_id", ondelete="CASCADE") + ) + _job_name = Column("job_name", String(100)) + _execution_start_time = Column("execution_start_time", DateTime) + _execution_end_time = Column("execution_end_time", DateTime) + _created_at = Column("created_at", DateTime, default=datetime.datetime.now) + _updated_at = Column( + "updated_at", + DateTime, + default=datetime.datetime.now, + onupdate=datetime.datetime.now, + ) + + __table_args__ = (UniqueConstraint("job_id", "execution_start_time"), {}) + + def __init__( + self, + job_id: int, + job_name: str, + execution_start_time: datetime, + execution_end_time: datetime, + ): + self._job_id = job_id + self._job_name = job_name + self._execution_start_time = execution_start_time + self._execution_end_time = execution_end_time + + def as_dataclass(self) -> "JobHistoryCatalogEntry": + return JobHistoryCatalogEntry( + row_id=self._row_id, + job_id=self._job_id, + job_name=self._job_name, + execution_start_time=self._execution_start_time, + execution_end_time=self._execution_end_time, + created_at=self._created_at, + updated_at=self._updated_at, + ) diff --git a/evadb/catalog/models/utils.py b/evadb/catalog/models/utils.py index 8292e9aaf..2ebb1ac78 100644 --- a/evadb/catalog/models/utils.py +++ b/evadb/catalog/models/utils.py @@ -305,3 +305,27 @@ def display_format(self): "created_at": self.created_at, "updated_at": self.updated_at, } + + +@dataclass(unsafe_hash=True) +class JobHistoryCatalogEntry: + """Dataclass representing an entry in the `JobHistoryCatalog`.""" + + job_id: int + job_name: str + execution_start_time: datetime + execution_end_time: datetime + created_at: datetime + updated_at: datetime + row_id: int = None + + def display_format(self): + return { + "row_id": self.row_id, + "job_id": self.job_name, + "job_name": self.job_name, + "execution_start_time": self.execution_start_time, + "execution_end_time": self.execution_end_time, + "created_at": self.created_at, + "updated_at": self.updated_at, + } diff --git a/evadb/catalog/services/job_catalog_service.py b/evadb/catalog/services/job_catalog_service.py index 0267ff049..ca3474b0d 100644 --- a/evadb/catalog/services/job_catalog_service.py +++ b/evadb/catalog/services/job_catalog_service.py @@ -108,9 +108,9 @@ def get_all_overdue_jobs(self) -> list: self.model._active == true(), ) ) - ).all() - entry = [row.as_dataclass() for row in entries] - return entry + ).scalars().all() + entries = [row.as_dataclass() for row in entries] + return entries def get_next_executable_job(self, only_past_jobs: bool) -> JobCatalogEntry: """Get the oldest job that is ready to be triggered by trigger time diff --git a/evadb/catalog/services/job_history_catalog_service.py b/evadb/catalog/services/job_history_catalog_service.py new file mode 100644 index 000000000..04333c791 --- /dev/null +++ b/evadb/catalog/services/job_history_catalog_service.py @@ -0,0 +1,92 @@ +# coding=utf-8 +# Copyright 2018-2023 EvaDB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import json + +from sqlalchemy import and_, true +from sqlalchemy.orm import Session +from sqlalchemy.sql.expression import select + +from evadb.catalog.models.job_history_catalog import JobHistoryCatalog +from evadb.catalog.models.utils import JobHistoryCatalogEntry +from evadb.catalog.services.base_service import BaseService +from evadb.utils.errors import CatalogError +from evadb.utils.logging_manager import logger + + +class JobHistoryCatalogService(BaseService): + def __init__(self, db_session: Session): + super().__init__(JobHistoryCatalog, db_session) + + def insert_entry( + self, + job_id: str, + job_name: str, + execution_start_time: datetime, + execution_end_time: datetime, + ) -> JobHistoryCatalogEntry: + try: + job_history_catalog_obj = self.model( + job_id=job_id, + job_name=job_name, + execution_start_time=execution_start_time, + execution_end_time=execution_end_time, + ) + job_history_catalog_obj = job_history_catalog_obj.save(self.session) + + except Exception as e: + logger.exception( + f"Failed to insert entry into job history catalog with exception {str(e)}" + ) + raise CatalogError(e) + + return job_history_catalog_obj.as_dataclass() + + def get_entry_by_job_id(self, job_id: int) -> list[JobHistoryCatalogEntry]: + """ + Get all the job history catalog entry with given job id. + Arguments: + job_id (int): Job id + Returns: + list[JobHistoryCatalogEntry]: all history catalog entries for given job id + """ + entries = self.session.execute( + select(self.model).filter(self.model._job_id == job_id) + ).scalars().all() + entries = [row.as_dataclass() for row in entries] + return entries + + def update_entry_end_time( + self, job_id: int, execution_start_time: datetime, execution_end_time: datetime + ): + """Update the execution_end_time of the entry as per the provided values + Arguments: + job_id (int): id of the job whose history entry which should be updated + + execution_start_time (datetime): the start time for the job history entry + + execution_end_time (datetime): the end time for the job history entry + Returns: + void + """ + job_history_entry = ( + self.session.query(self.model).filter( + and_(self.model._job_id == job_id, self.model._execution_start_time == execution_start_time) + ).first() + ) + if job_history_entry: + job_history_entry._execution_end_time = execution_end_time + self.session.commit() diff --git a/evadb/catalog/sql_config.py b/evadb/catalog/sql_config.py index 2ef4a67ef..fed6630f3 100644 --- a/evadb/catalog/sql_config.py +++ b/evadb/catalog/sql_config.py @@ -39,6 +39,7 @@ "function_cost_catalog", "function_metadata_catalog", "job_catalog", + "job_history_catalog", ] # Add all keywords that are restricted by EvaDB diff --git a/evadb/executor/create_job_executor.py b/evadb/executor/create_job_executor.py index ea3013a03..8dcf87d85 100644 --- a/evadb/executor/create_job_executor.py +++ b/evadb/executor/create_job_executor.py @@ -56,7 +56,7 @@ def _get_repeat_time_interval_seconds( self, repeat_interval: int, repeat_period: str ) -> int: unit_to_seconds = { - "second": 1, + "seconds": 1, "minute": 60, "minutes": 60, "min": 60, @@ -71,7 +71,7 @@ def _get_repeat_time_interval_seconds( } assert (repeat_period is None) or ( repeat_period in unit_to_seconds - ), "repeat period should be one of these values: minute | minutes | min | hour | hours | day | days | week | weeks | month | months" + ), "repeat period should be one of these values: seconds | minute | minutes | min | hour | hours | day | days | week | weeks | month | months" repeat_interval = 1 if repeat_interval is None else repeat_interval return repeat_interval * unit_to_seconds.get(repeat_period, 0) diff --git a/evadb/utils/job_scheduler.py b/evadb/utils/job_scheduler.py index 650e37d7f..ee306c368 100644 --- a/evadb/utils/job_scheduler.py +++ b/evadb/utils/job_scheduler.py @@ -25,11 +25,6 @@ class JobScheduler: - # read jobs with next trigger in the past - # execute the task with oldest trigger date - # update the next trigger date and TODO: update job history in one transaction - # sleep till next wakeup time - def __init__(self, evadb: EvaDBDatabase) -> None: config_object = parse_config_yml() self.jobs_config = ( @@ -42,21 +37,34 @@ def __init__(self, evadb: EvaDBDatabase) -> None: def _update_next_schedule_run(self, job_catalog_entry: JobCatalogEntry) -> bool: job_end_time = job_catalog_entry.end_time active_status = False - if job_catalog_entry.repeat_interval > 0: + if job_catalog_entry.repeat_interval and job_catalog_entry.repeat_interval > 0: next_trigger_time = datetime.datetime.now() + datetime.timedelta( seconds=job_catalog_entry.repeat_interval ) if next_trigger_time < job_end_time: active_status = True + next_trigger_time = ( + next_trigger_time if active_status else job_catalog_entry.next_scheduled_run + ) self._evadb.catalog().update_job_catalog_entry( job_catalog_entry.name, - next_trigger_time - if active_status - else job_catalog_entry.next_scheduled_run, + next_trigger_time, active_status, ) - return active_status + return active_status, next_trigger_time + + def _get_sleep_time(self, next_job_entry: JobCatalogEntry) -> int: + sleep_time = self.jobs_config["poll_interval"] + if next_job_entry: + sleep_time = min( + sleep_time, + ( + next_job_entry.next_scheduled_run - datetime.datetime.now() + ).total_seconds(), + ) + sleep_time = max(0, sleep_time) + return sleep_time def _scan_and_execute_jobs(self): while True: @@ -67,6 +75,17 @@ def _scan_and_execute_jobs(self): ), None, ): + execution_time = datetime.datetime.now() + + # insert a job history record to mark start of this execution + self._evadb.catalog().insert_job_history_catalog_entry( + next_executable_job.row_id, + next_executable_job.name, + execution_time, + None, + ) + + # execute the queries of the job execution_results = [ execute_query(self._evadb, query) for query in next_executable_job.queries @@ -74,19 +93,23 @@ def _scan_and_execute_jobs(self): logger.debug( f"Exection result for job: {next_executable_job.name} results: {execution_results}" ) + + # update the next trigger time for this job self._update_next_schedule_run(next_executable_job) + # update the previosly inserted job history record with endtime + self._evadb.catalog().update_job_history_end_time( + next_executable_job.row_id, + execution_time, + datetime.datetime.now(), + ) + next_executable_job = self._evadb.catalog().get_next_executable_job( only_past_jobs=False ) - if next_executable_job.next_scheduled_run > datetime.datetime.now(): - sleep_time = min( - self.jobs_config["poll_interval"], - ( - next_executable_job.next_scheduled_run - - datetime.datetime.now() - ).total_seconds(), - ) + + sleep_time = self._get_sleep_time(next_executable_job) + if sleep_time > 0: logger.debug( f"Job scheduler process sleeping for {sleep_time} seconds" ) diff --git a/test/integration_tests/long/test_job_scheduler.py b/test/integration_tests/long/test_job_scheduler.py new file mode 100644 index 000000000..106d823a2 --- /dev/null +++ b/test/integration_tests/long/test_job_scheduler.py @@ -0,0 +1,81 @@ +import unittest +import os +import time + +from mock import ANY, MagicMock +from datetime import datetime, timedelta +from test.util import get_evadb_for_testing, shutdown_ray + +from evadb.catalog.models.utils import JobCatalogEntry +from evadb.utils.job_scheduler import JobScheduler +from evadb.executor.executor_utils import ExecutorError +from evadb.server.command_handler import execute_query_fetch_all +from evadb.interfaces.relational.db import EvaDBConnection + +class JobSchedulerIntegrationTests(unittest.TestCase): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + @classmethod + def setUpClass(cls): + cls.evadb = get_evadb_for_testing() + # reset the catalog manager before running each test + cls.evadb.catalog().reset() + cls.job_name_1 = "test_async_job_1" + cls.job_name_2 = "test_async_job_2" + + @classmethod + def tearDownClass(cls): + shutdown_ray() + + execute_query_fetch_all(cls.evadb, f"DROP JOB IF EXISTS {cls.job_name_1};") + execute_query_fetch_all(cls.evadb, f"DROP JOB IF EXISTS {cls.job_name_2};") + + + def create_jobs(self): + datetime_format = "%Y-%m-%d %H:%M:%S" + start_time = (datetime.now() - timedelta(seconds=10)).strftime(datetime_format) + end_time = (datetime.now() + timedelta(seconds=60)).strftime(datetime_format) + + create_csv_query = """CREATE TABLE IF NOT EXISTS MyCSV ( + id INTEGER UNIQUE, + frame_id INTEGER, + video_id INTEGER + ); + """ + job_1_query = f"""CREATE JOB IF NOT EXISTS {self.job_name_1} AS ( + SELECT * FROM MyCSV; + ) + START '{start_time}' + END '{end_time}' + EVERY 4 seconds; + """ + job_2_query = f"""CREATE JOB IF NOT EXISTS {self.job_name_2} AS ( + SHOW FUNCTIONS; + ) + START '{start_time}' + END '{end_time}' + EVERY 2 seconds; + """ + + execute_query_fetch_all(self.evadb, create_csv_query) + execute_query_fetch_all(self.evadb, job_1_query) + execute_query_fetch_all(self.evadb, job_2_query) + + def test_should_execute_the_scheduled_jobs(self): + self.create_jobs() + connection = EvaDBConnection(self.evadb, MagicMock(), MagicMock()) + + # start the job scheduler + connection.start_jobs() + + # let the job scheduler run for 10 seconds + time.sleep(15) + connection.stop_jobs() + + job_1_execution_count = len(self.evadb.catalog().get_job_history_by_job_id(1)) + job_2_execution_count = len(self.evadb.catalog().get_job_history_by_job_id(2)) + + self.assertGreater(job_2_execution_count, job_1_execution_count) + self.assertGreater(job_2_execution_count, 2) + self.assertGreater(job_1_execution_count, 2) \ No newline at end of file diff --git a/test/integration_tests/short/test_create_job_executor.py b/test/integration_tests/short/test_create_job_executor.py index 6103051ac..e7554b802 100644 --- a/test/integration_tests/short/test_create_job_executor.py +++ b/test/integration_tests/short/test_create_job_executor.py @@ -12,7 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import os import unittest from datetime import datetime from test.util import get_evadb_for_testing, shutdown_ray @@ -27,17 +26,13 @@ def setUpClass(cls): cls.evadb = get_evadb_for_testing() # reset the catalog manager before running each test cls.evadb.catalog().reset() - cls.db_path = f"{os.path.dirname(os.path.abspath(__file__))}/testing.db" cls.job_name = "test_async_job" - execute_query_fetch_all(cls.evadb, f"DROP JOB IF EXISTS {cls.job_name};") @classmethod def tearDownClass(cls): shutdown_ray() execute_query_fetch_all(cls.evadb, f"DROP JOB IF EXISTS {cls.job_name};") - if os.path.exists(cls.db_path): - os.remove(cls.db_path) def test_create_job_should_add_the_entry(self): queries = [ @@ -69,8 +64,7 @@ def test_create_job_should_add_the_entry(self): job_entry.start_time, datetime.strptime(start, datetime_format) ) self.assertEqual(job_entry.end_time, datetime.strptime(end, date_format)) - self.assertEqual(job_entry.repeat_interval, repeat_interval) - self.assertEqual(job_entry.repeat_period, repeat_period) + self.assertEqual(job_entry.repeat_interval, 2 * 7 * 24 * 60 * 60) self.assertEqual(job_entry.active, True) self.assertEqual(len(job_entry.queries), len(queries)) diff --git a/test/unit_tests/parser/test_parser.py b/test/unit_tests/parser/test_parser.py index fdc5b382e..97d98092b 100644 --- a/test/unit_tests/parser/test_parser.py +++ b/test/unit_tests/parser/test_parser.py @@ -1156,7 +1156,6 @@ def test_create_job(self): parser = Parser() job_stmt = parser.parse(job_query)[0] - print(job_stmt) self.assertEqual(job_stmt.job_name, "my_job") self.assertEqual(len(job_stmt.queries), 2) self.assertTrue(isinstance(job_stmt.queries[0], CreateFunctionStatement)) diff --git a/test/unit_tests/utils/test_job_scheduler.py b/test/unit_tests/utils/test_job_scheduler.py new file mode 100644 index 000000000..4faff1be1 --- /dev/null +++ b/test/unit_tests/utils/test_job_scheduler.py @@ -0,0 +1,60 @@ +import unittest +from mock import MagicMock +from datetime import datetime, timedelta + +from evadb.catalog.models.utils import JobCatalogEntry +from evadb.utils.job_scheduler import JobScheduler + +class JobSchedulerTests(unittest.TestCase): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def get_dummy_job_catalog_entry(self, active, job_name, next_run): + return JobCatalogEntry( + name = job_name, + queries = None, + start_time = None, + end_time = None, + repeat_interval = None, + active = active, + next_scheduled_run = next_run, + created_at = None, + updated_at = None, + ) + + def test_sleep_time_calculation(self): + past_job = self.get_dummy_job_catalog_entry(True, "past_job", datetime.now() - timedelta(seconds=10)) + future_job = self.get_dummy_job_catalog_entry(True, "future_job", datetime.now() + timedelta(seconds=20)) + + job_scheduler = JobScheduler(MagicMock()) + + self.assertEqual(job_scheduler._get_sleep_time(past_job), 0) + self.assertGreaterEqual(job_scheduler._get_sleep_time(future_job), 10) + self.assertEqual(job_scheduler._get_sleep_time(None), 30) + + + def test_update_next_schedule_run(self): + future_time = datetime.now() + timedelta(seconds=1000) + job_scheduler = JobScheduler(MagicMock()) + job_entry = self.get_dummy_job_catalog_entry(True, "job", datetime.now()) + + # job which runs just once + job_entry.end_time = future_time + status, next_run = job_scheduler._update_next_schedule_run(job_entry) + self.assertEqual(status, False, "status for one time job should be false") + + # recurring job with valid end date + job_entry.end_time = future_time + job_entry.repeat_interval = 120 + expected_next_run = datetime.now() + timedelta(seconds=120) + status, next_run = job_scheduler._update_next_schedule_run(job_entry) + self.assertEqual(status, True, "status for recurring time job should be true") + self.assertGreaterEqual(next_run, expected_next_run) + + # recurring job with expired end date + job_entry.end_time = datetime.now() + timedelta(seconds=60) + job_entry.repeat_interval = 120 + expected_next_run = datetime.now() + timedelta(seconds=120) + status, next_run = job_scheduler._update_next_schedule_run(job_entry) + self.assertEqual(status, False, "status for rexpired ecurring time job should be false") + self.assertLessEqual(next_run, datetime.now()) \ No newline at end of file