Skip to content

Commit

Permalink
LITE-28651: Adding common methods to send ppr and check task status f…
Browse files Browse the repository at this point in the history
…rom task manager.
  • Loading branch information
akodelia committed Sep 13, 2023
1 parent c67a6e5 commit 364c835
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 16 deletions.
7 changes: 7 additions & 0 deletions connect_ext_ppr/models/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,10 @@ class PPRStatusChoices(str, enum.Enum):
processing = 'processing'
ready = 'ready'
failed = 'failed'


class CBCTaskLogStatus(str, enum.Enum):
success = 's'
running = 'r'
failed = 'f'
not_started = 'n'
3 changes: 3 additions & 0 deletions connect_ext_ppr/models/task.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from datetime import datetime

import sqlalchemy as db
from sqlalchemy.orm import relationship

from connect_ext_ppr.db import Model
from connect_ext_ppr.models.enums import TasksStatusChoices, TaskTypesChoices
Expand Down Expand Up @@ -33,6 +34,8 @@ class Task(Model):
aborted_at = db.Column(db.DateTime(), nullable=True)
aborted_by = db.Column(db.String(20), nullable=True)

dr_instance = relationship(DeploymentRequest, foreign_keys="Task.deployment_request")

@transition('status', target=STATUSES.aborted, sources=[STATUSES.pending])
def abort(self, by):
self.aborted_at = datetime.utcnow()
Expand Down
54 changes: 48 additions & 6 deletions connect_ext_ppr/tasks_manager.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import time
from datetime import datetime
from io import BufferedReader

from sqlalchemy.orm import joinedload
from sqlalchemy.orm import joinedload, selectinload

from connect_ext_ppr.db import get_db_ctx_manager
from connect_ext_ppr.models.enums import CBCTaskLogStatus
from connect_ext_ppr.models.enums import (
DeploymentRequestStatusChoices,
DeploymentStatusChoices,
Expand All @@ -14,15 +17,47 @@
from connect_ext_ppr.models.task import Task


def validate_ppr():
class TaskException(Exception):
pass


def _send_ppr(cbc_service, file: BufferedReader):

parsed_ppr = cbc_service.parse_ppr(file)

if 'error' in parsed_ppr.keys():
raise TaskException(parsed_ppr.get('message'))

tracking_id = cbc_service.apply_ppr(file)

if not tracking_id:
raise TaskException('Some error ocurred trying to upload ppr.')

return tracking_id


def _check_cbc_task_status(cbc_service, tracking_id):
task_log = cbc_service.search_task_logs_by_name(tracking_id)[0]
while task_log['status'] in (CBCTaskLogStatus.not_started, CBCTaskLogStatus.running):
time.sleep(10)
task_log = cbc_service.search_task_logs_by_name(tracking_id)[0]

if task_log['status'] == CBCTaskLogStatus.success:
return True

raise TaskException(f'Something went wrong with task: {tracking_id}')


def validate_ppr(deployment_request):
return True


def apply_ppr_and_delegate_to_marketplaces():
def apply_ppr_and_delegate_to_marketplaces(deployment_request):

return True


def delegate_to_l2():
def delegate_to_l2(deployment_request):
return True


Expand All @@ -44,12 +79,17 @@ def execute_tasks(db, tasks):
db.commit()

try:
was_succesfull = TASK_PER_TYPE.get(task.type)()
was_succesfull = TASK_PER_TYPE.get(task.type)(task.dr_instance)
task.status = TasksStatusChoices.done
if not was_succesfull:
task.status = TasksStatusChoices.error
except TaskException as ex:
was_succesfull = False
task.error_message = str(ex)
task.status = TasksStatusChoices.error
except Exception:
was_succesfull = False
task.error_message = 'Unexpected error'
task.status = TasksStatusChoices.error

task.finished_at = datetime.utcnow()
Expand Down Expand Up @@ -80,7 +120,9 @@ def main_process(deployment_request_id, config):
db.add(deployment_request)
db.commit()

tasks = db.query(Task).filter_by(
tasks = db.query(Task).options(
selectinload(Task.dr_instance),
).filter_by(
deployment_request=deployment_request.id,
).order_by(Task.id).all()

Expand Down
4 changes: 2 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def _build_deployment(


@pytest.fixture
def deployment_request_factory(dbsession):
def deployment_request_factory(dbsession, deployment_factory):
def _build_deployment_request(
deployment=None,
ppr=None,
Expand All @@ -178,7 +178,7 @@ def _build_deployment_request(
finished_at=None,
):
if not deployment:
deployment = deployment_factory(id='DPLR-123-123-123')
deployment = deployment_factory()

if not ppr:
ppr = PPRVersion(id=f'PPR-{random.randint(1000, 9999)}', product_version=1)
Expand Down
40 changes: 32 additions & 8 deletions tests/test_tasks_manager.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,56 @@
import copy

import pytest
from sqlalchemy import null

from connect_ext_ppr.models.deployment import Deployment, DeploymentRequest
from connect_ext_ppr.models.enums import (
CBCTaskLogStatus,
DeploymentRequestStatusChoices,
DeploymentStatusChoices,
TasksStatusChoices,
TaskTypesChoices,
)
from connect_ext_ppr.models.task import Task
from connect_ext_ppr.tasks_manager import (
_check_cbc_task_status,
_send_ppr,
apply_ppr_and_delegate_to_marketplaces,
delegate_to_l2,
main_process,
validate_ppr,
)


def test_apply_ppr_and_delegate_to_marketplaces():
assert apply_ppr_and_delegate_to_marketplaces()
def test_apply_ppr_and_delegate_to_marketplaces(deployment_request_factory):
assert apply_ppr_and_delegate_to_marketplaces(deployment_request_factory())


def test_delegate_to_l2(deployment_request_factory):
assert delegate_to_l2(deployment_request_factory())


def test_validate_ppr(deployment_request_factory):
assert validate_ppr(deployment_request_factory())


def test_delegate_to_l2():
assert delegate_to_l2()
def test__send_ppr(parse_ppr_success_response, sample_ppr_file, mocker):
cbc_service = mocker.Mock()
cbc_service.parse_ppr.return_value = parse_ppr_success_response
cbc_service.apply_ppr.return_value = 100
_send_ppr(cbc_service, sample_ppr_file)


def test_validate_ppr():
assert validate_ppr()
def test__check_cbc_task_status(task_logs_response, mocker):
not_started_log = copy.deepcopy(task_logs_response)
not_started_log[0]['status'] = CBCTaskLogStatus.not_started
cbc_service = mocker.Mock()
cbc_service.search_task_logs_by_name.side_effect = [
not_started_log,
task_logs_response,
]
with mocker.patch('connect_ext_ppr.tasks_manager.time.sleep', return_value=None):
_check_cbc_task_status(cbc_service, 100)


def test_main_process(
Expand Down Expand Up @@ -144,7 +168,7 @@ def test_main_process_ends_w_error(
my_mock = mocker.Mock()

def mock_get(key):
return lambda: key != type_function_to_mock
return lambda dr: key != type_function_to_mock
my_mock.get = mock_get

mocker.patch('connect_ext_ppr.tasks_manager.TASK_PER_TYPE', my_mock)
Expand Down Expand Up @@ -345,7 +369,7 @@ def test_main_process_ends_w_task_exception(
def mock_get(key):
if key == type_function_to_mock:
raise Exception('Unexpected Error')
return lambda: True
return lambda dr: True

my_mock.get = mock_get

Expand Down

0 comments on commit 364c835

Please # to comment.