diff --git a/CHANGELOG.md b/CHANGELOG.md index a62110b..ded7038 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,13 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] +## [v0.7.0] - 2022-02-17 + +### Added + +* Support for an `ABORTED` workflow state ([#44]) + + ## [v0.6.2] - 2022-02-07 ### Fixed @@ -211,7 +218,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. Initial Release -[Unreleased]: https://github.com/cirrus-geo/cirrus-lib/compare/v0.6.2...main +[Unreleased]: https://github.com/cirrus-geo/cirrus-lib/compare/v0.7.0...main +[v0.7.0]: https://github.com/cirrus-geo/cirrus-lib/compare/v0.6.2...v0.7.0 [v0.6.2]: https://github.com/cirrus-geo/cirrus-lib/compare/v0.6.1...v0.6.2 [v0.6.1]: https://github.com/cirrus-geo/cirrus-lib/compare/v0.6.0...v0.6.1 [v0.6.0]: https://github.com/cirrus-geo/cirrus-lib/compare/v0.5.1...v0.6.0 @@ -244,6 +252,7 @@ Initial Release [#38]: https://github.com/cirrus-geo/cirrus-lib/pull/38 [#41]: https://github.com/cirrus-geo/cirrus-lib/pull/41 +[#44]: https://github.com/cirrus-geo/cirrus-lib/issues/41 + [c919fad]: https://github.com/cirrus-geo/cirrus-lib/commit/c919fadb83bb4f5cdfd082d482e25975ce12aa2c [02ff5e3]: https://github.com/cirrus-geo/cirrus-lib/commit/02ff5e33412026b1fedda97727eef66715a27492 - diff --git a/src/cirrus/lib/process_payload.py b/src/cirrus/lib/process_payload.py index 6ca0c07..a9d4a53 100644 --- a/src/cirrus/lib/process_payload.py +++ b/src/cirrus/lib/process_payload.py @@ -456,7 +456,7 @@ def from_statedb(cls, collections, state, since: str=None, index: str='input_sta Args: collections (str): String of collections (input or output depending on `index`) - state (str): The state (QUEUED, PROCESSING, COMPLETED, FAILED, INVALID) of StateDB Items to get + state (str): The state (QUEUED, PROCESSING, COMPLETED, FAILED, INVALID, ABORTED) of StateDB Items to get since (str, optional): Get Items since this duration ago (e.g., 10m, 8h, 1w). Defaults to None. index (str, optional): 'input_state' or 'output_state' Defaults to 'input_state'. limit ([type], optional): Max number of Items to return. Defaults to None. @@ -496,7 +496,7 @@ def process(self, replace=False): # continue if payload['id'] in payload_ids: logger.warning(f"Dropping duplicated payload {payload['id']}") - elif state in ['FAILED', ''] or _replace: + elif state in ['FAILED', 'ABORTED', ''] or _replace: payload_id = payload() if payload_id is not None: payload_ids.append(payload_id) diff --git a/src/cirrus/lib/statedb.py b/src/cirrus/lib/statedb.py index d543eb2..9fa66c0 100644 --- a/src/cirrus/lib/statedb.py +++ b/src/cirrus/lib/statedb.py @@ -11,7 +11,7 @@ # envvars PAYLOAD_BUCKET = os.getenv('CIRRUS_PAYLOAD_BUCKET') -STATES = ['PROCESSING', 'COMPLETED', 'FAILED', 'INVALID'] +STATES = ['PROCESSING', 'COMPLETED', 'FAILED', 'INVALID', 'ABORTED'] # logging logger = logging.getLogger(__name__) @@ -123,7 +123,7 @@ def get_items_page(self, collections_workflow: str, Args: collections_workflow (str): /-separated list of input collections_workflow - state (Optional[str], optional): State of Items to get (PROCESSING, COMPLETED, FAILED, INVALID) + state (Optional[str], optional): State of Items to get (PROCESSING, COMPLETED, FAILED, INVALID, ABORTED) since (Optional[str], optional): Get Items since this amount of time in the past. Defaults to None. sort_ascending (Optional[bool], optional): Determines which direction the index of the results will be sorted. Defaults to False. sort_index (Optional[str], optional): Determines which index to use for sorting, if not applying a filter (state_updated, updated). Defaults to None. @@ -171,7 +171,7 @@ def get_state(self, payload_id: str) -> str: payload_id (str): The Payload ID Returns: - str: Current state: PROCESSING, COMPLETED, FAILED, INVALID + str: Current state: PROCESSING, COMPLETED, FAILED, INVALID, ABORTED """ response = self.table.get_item(Key=self.payload_id_to_key(payload_id)) if 'Item' in response: @@ -368,6 +368,35 @@ def set_invalid(self, payload_id: str, msg: str) -> str: logger.debug("set invalid", extra=key.update({'last_error': msg})) return response + def set_aborted(self, payload_id: str) -> str: + """Set this item as ABORTED + + Args: + payload_id (str): The Cirrus Payload + + Returns: + str: DynamoDB response + """ + now = datetime.now(timezone.utc).isoformat() + key = self.payload_id_to_key(payload_id) + + expr = ( + 'SET ' + 'created = if_not_exists(created, :created), ' + 'state_updated=:state_updated, updated=:updated' + ) + response = self.table.update_item( + Key=key, + UpdateExpression=expr, + ExpressionAttributeValues={ + ':created': now, + ':state_updated': f"ABORTED_{now}", + ':updated': now, + } + ) + logger.debug("set aborted") + return response + def query(self, collections_workflow: str, state: str=None, since: str=None, select: str='ALL_ATTRIBUTES', sort_ascending: bool=False, sort_index: str='updated', **kwargs) -> Dict: """Perform a single Query on a DynamoDB index diff --git a/tests/test_process_payload.py b/tests/test_process_payload.py index f627d94..8503a93 100644 --- a/tests/test_process_payload.py +++ b/tests/test_process_payload.py @@ -21,6 +21,11 @@ def base_payload(): return read_json_fixture('test-payload.json') +@pytest.fixture() +def capella_payload(): + return read_json_fixture('capella-fixture-2.json') + + @pytest.fixture() def sqs_event(): return read_json_fixture('sqs-event.json') diff --git a/tests/test_statedb.py b/tests/test_statedb.py index f59252e..6d18a78 100644 --- a/tests/test_statedb.py +++ b/tests/test_statedb.py @@ -33,6 +33,7 @@ } } + @mock_dynamodb2 def setup_table(): boto3.setup_default_session() @@ -44,188 +45,233 @@ def setup_table(): return StateDB(table_name) -class TestClassMethods(unittest.TestCase): +TESTKEY = { + 'collections_workflow': 'col1_wf1', + 'itemids': 'item1/item2' +} + + +def test_payload_id_to_key(): + key = StateDB.payload_id_to_key(test_item['id']) + assert(key['collections_workflow'] == "col1_wf1") + assert(key['itemids'] == 'item1/item2') + + +def test_key_to_payload_id(): + payload_id = StateDB.key_to_payload_id(TESTKEY) + assert(payload_id == test_item['id']) + + +def test_get_input_payload_url(): + url = StateDB.get_input_payload_url(TESTKEY) + assert(f"{test_item['id']}/input.json" in url) + + +def test_dbitem_to_item(): + item = StateDB.dbitem_to_item(test_dbitem) + assert(item['payload_id'] == test_item['id']) + assert(item['workflow'] == 'wf1') + assert(item['state'] == 'QUEUED') + + +def test_since_to_timedelta(): + td = StateDB.since_to_timedelta('1d') + assert(td.days == 1) + td = StateDB.since_to_timedelta('1h') + assert(td.seconds == 3600) + td = StateDB.since_to_timedelta('10m') + assert(td.seconds == 600) + + +NITEMS = 1000 + + +@pytest.fixture(scope='session') +def state_table(): + mock = mock_dynamodb2() + mock.start() + statedb = setup_table() + for i in range(NITEMS): + newitem = deepcopy(test_item) + statedb.set_processing( + f'{newitem["id"]}{i}', + execution='arn::test', + ) + statedb.set_processing( + f'{test_item["id"]}_processing', + execution='arn::test', + ) + statedb.set_completed( + f'{test_item["id"]}_completed', + outputs=['item1', 'item2'], + ) + statedb.set_failed( + f'{test_item["id"]}_failed', + 'failed', + ) + statedb.set_invalid( + f'{test_item["id"]}_invalid', + 'invalid', + ) + statedb.set_aborted( + f'{test_item["id"]}_aborted', + ) + yield statedb + for i in range(NITEMS): + statedb.delete_item(f'{test_item["id"]}{i}') + for s in STATES: + statedb.delete_item(f'{test_item["id"]}_{s.lower()}') + statedb.delete() + mock.stop() + + +def test_get_items(state_table): + items = state_table.get_items( + test_dbitem['collections_workflow'], + state='PROCESSING', + since='1h', + ) + assert(len(items) == NITEMS + 1) + items = state_table.get_items( + test_dbitem['collections_workflow'], + state='PROCESSING', + since='1h', + limit=1, + ) + assert(len(items) == 1) + + +def test_get_dbitem(state_table): + dbitem = state_table.get_dbitem(test_item['id'] + '0') + assert(dbitem['itemids'] == test_dbitem['itemids'] + '0') + assert(dbitem['collections_workflow'] == test_dbitem['collections_workflow']) + assert(dbitem['state_updated'].startswith('PROCESSING')) + + +def test_get_dbitem_noitem(state_table): + dbitem = state_table.get_dbitem('no-collection/workflow-none/fake-id') + assert(dbitem is None) + + +def test_get_dbitems(state_table): + ids = [test_item['id'] + str(i) for i in range(10)] + dbitems = state_table.get_dbitems(ids) + assert(len(dbitems) == len(ids)) + for dbitem in dbitems: + assert(state_table.key_to_payload_id(dbitem) in ids) + + +def test_get_dbitems_duplicates(state_table): + ids = [test_item['id'] + str(i) for i in range(10)] + ids.append(ids[0]) + dbitems = state_table.get_dbitems(ids) + for dbitem in dbitems: + assert(state_table.key_to_payload_id(dbitem) in ids) + + +def test_get_dbitems_noitems(state_table): + dbitems = state_table.get_dbitems(['no-collection/workflow-none/fake-id']) + assert(len(dbitems) == 0) + + +def test_get_state(state_table): + for s in STATES: + state = state_table.get_state(test_item['id'] + f"_{s.lower()}") + assert(state == s) + state = state_table.get_state(test_item['id'] + 'nosuchitem') + + +def test_get_states(state_table): + ids = [test_item['id'] + f"_{s.lower()}" for s in STATES] + states = state_table.get_states(ids) + assert(len(ids) == len(states)) + for i, id in enumerate(ids): + assert(states[id] == STATES[i]) + + +def test_get_counts(state_table): + count = state_table.get_counts(test_dbitem['collections_workflow']) + assert(count == NITEMS + len(STATES)) + for s in STATES: + count = state_table.get_counts(test_dbitem['collections_workflow'], state=s) + if s == 'PROCESSING': + assert(count == NITEMS + 1) + else: + assert(count == 1) + count = state_table.get_counts(test_dbitem['collections_workflow'], since='1h') + + +def test_set_processing(state_table): + resp = state_table.set_processing(test_item['id'], execution='arn::test1') + assert(resp['ResponseMetadata']['HTTPStatusCode'] == 200) + dbitem = state_table.get_dbitem(test_item['id']) + assert(StateDB.key_to_payload_id(dbitem) == test_item['id']) + assert(dbitem['executions'] == ['arn::test1']) + + +def test_second_execution(state_table): + # check that processing adds new execution to list + resp = state_table.set_processing(test_item['id'], execution='arn::test2') + dbitem = state_table.get_dbitem(test_item['id']) + assert(len(dbitem['executions']) == 2) + assert(dbitem['executions'][-1] == 'arn::test2') + + +def test_set_outputs(state_table): + resp = state_table.set_completed(test_item['id'], outputs=['output-item']) + assert(resp['ResponseMetadata']['HTTPStatusCode'] == 200) + dbitem = state_table.get_dbitem(test_item['id']) + assert(dbitem['outputs'][0] == 'output-item') + + +def test_set_completed(state_table): + resp = state_table.set_completed(test_item['id']) + assert(resp['ResponseMetadata']['HTTPStatusCode'] == 200) + dbitem = state_table.get_dbitem(test_item['id']) + assert(dbitem['state_updated'].startswith('COMPLETED')) + + +def test_set_failed(state_table): + resp = state_table.set_failed(test_item['id'], msg='test failure') + assert(resp['ResponseMetadata']['HTTPStatusCode'] == 200) + dbitem = state_table.get_dbitem(test_item['id']) + assert(dbitem['state_updated'].startswith('FAILED')) + assert(dbitem['last_error'] == 'test failure') + + +def test_set_completed_with_outputs(state_table): + resp = state_table.set_completed(test_item['id'], outputs=['output-item2']) + assert(resp['ResponseMetadata']['HTTPStatusCode'] == 200) + dbitem = state_table.get_dbitem(test_item['id']) + assert(dbitem['state_updated'].startswith('COMPLETED')) + assert(dbitem['outputs'][0] == 'output-item2') + + +def test_set_invalid(state_table): + resp = state_table.set_invalid(test_item['id'], msg='test failure') + assert(resp['ResponseMetadata']['HTTPStatusCode'] == 200) + dbitem = state_table.get_dbitem(test_item['id']) + assert(dbitem['state_updated'].startswith('INVALID')) + assert(dbitem['last_error'] == 'test failure') + + +def test_set_aborted(state_table): + resp = state_table.set_aborted(test_item['id']) + assert(resp['ResponseMetadata']['HTTPStatusCode'] == 200) + dbitem = state_table.get_dbitem(test_item['id']) + assert(dbitem['state_updated'].startswith('ABORTED')) + + +def test_delete_item(state_table): + state_table.delete_item(test_item['id']) + dbitem = state_table.get_dbitem(test_item['id']) + assert(dbitem is None) - testkey = { - 'collections_workflow': 'col1_wf1', - 'itemids': 'item1/item2' - } - def test_payload_id_to_key(self): - key = StateDB.payload_id_to_key(test_item['id']) - assert(key['collections_workflow'] == "col1_wf1") - assert(key['itemids'] == 'item1/item2') - - def test_key_to_payload_id(self): - payload_id = StateDB.key_to_payload_id(self.testkey) - assert(payload_id == test_item['id']) - - def test_get_input_payload_url(self): - url = StateDB.get_input_payload_url(self.testkey) - assert(f"{test_item['id']}/input.json" in url) - - def test_dbitem_to_item(self): - item = StateDB.dbitem_to_item(test_dbitem) - assert(item['payload_id'] == test_item['id']) - assert(item['workflow'] == 'wf1') - assert(item['state'] == 'QUEUED') - - def test_since_to_timedelta(self): - td = StateDB.since_to_timedelta('1d') - assert(td.days == 1) - td = StateDB.since_to_timedelta('1h') - assert(td.seconds == 3600) - td = StateDB.since_to_timedelta('10m') - assert(td.seconds == 600) - - -class TestDbItems(unittest.TestCase): - - nitems = 1000 - - @classmethod - def setUpClass(cls): - cls.mock = mock_dynamodb2() - cls.mock.start() - cls.statedb = setup_table() - for i in range(cls.nitems): - newitem = deepcopy(test_item) - cls.statedb.set_processing(newitem['id'] + str(i), execution='arn::test') - cls.statedb.set_processing(test_item['id'] + '_processing', execution='arn::test') - cls.statedb.set_completed(test_item['id'] + '_completed', outputs=['item1', 'item2']) - cls.statedb.set_failed(test_item['id'] + '_failed', 'failed') - cls.statedb.set_invalid(test_item['id'] + '_invalid', 'invalid') - - - @classmethod - def tearDownClass(cls): - for i in range(cls.nitems): - cls.statedb.delete_item(test_item['id'] + str(i)) - for s in STATES: - cls.statedb.delete_item(test_item['id'] + f"_{s.lower()}") - cls.statedb.delete() - cls.mock.stop() - - def test_set_processing(self): - resp = self.statedb.set_processing(test_item['id'], execution='arn::test1') - assert(resp['ResponseMetadata']['HTTPStatusCode'] == 200) - dbitem = self.statedb.get_dbitem(test_item['id']) - assert(StateDB.key_to_payload_id(dbitem) == test_item['id']) - assert(dbitem['executions'] == ['arn::test1']) - - # check that processing adds new execution to list - resp = self.statedb.set_processing(test_item['id'], execution='arn::test2') - dbitem = self.statedb.get_dbitem(test_item['id']) - assert(len(dbitem['executions']) == 2) - assert(dbitem['executions'][-1] == 'arn::test2') - self.statedb.delete_item(test_item['id']) - dbitem = self.statedb.get_dbitem(test_item['id']) - assert(dbitem is None) - - def test_get_dbitem(self): - dbitem = self.statedb.get_dbitem(test_item['id'] + '0') - assert(dbitem['itemids'] == test_dbitem['itemids'] + '0') - assert(dbitem['collections_workflow'] == test_dbitem['collections_workflow']) - assert(dbitem['state_updated'].startswith('PROCESSING')) - - def test_get_dbitem_noitem(self): - dbitem = self.statedb.get_dbitem(test_item['id']) - assert(dbitem is None) - - def test_get_dbitems(self): - ids = [test_item['id'] + str(i) for i in range(10)] - dbitems = self.statedb.get_dbitems(ids) - assert(len(dbitems) == len(ids)) - for dbitem in dbitems: - assert(self.statedb.key_to_payload_id(dbitem) in ids) - - def test_get_dbitems_duplicates(self): - ids = [test_item['id'] + str(i) for i in range(10)] - ids.append(ids[0]) - dbitems = self.statedb.get_dbitems(ids) - for dbitem in dbitems: - assert(self.statedb.key_to_payload_id(dbitem) in ids) - - def test_get_dbitems_noitems(self): - #with self.assertRaises(Exception): - dbitems = self.statedb.get_dbitems([test_item['id']]) - assert(len(dbitems) == 0) - - def test_get_items(self): - items = self.statedb.get_items(test_dbitem['collections_workflow'], state='PROCESSING', since='1h') - assert(len(items) == self.nitems + 1) - items = self.statedb.get_items(test_dbitem['collections_workflow'], state='PROCESSING', since='1h', limit=1) - assert(len(items) == 1) - - def test_get_state(self): - for s in STATES: - state = self.statedb.get_state(test_item['id'] + f"_{s.lower()}") - assert(state == s) - state = self.statedb.get_state(test_item['id'] + 'nosuchitem') - - def test_get_states(self): - ids = [test_item['id'] + f"_{s.lower()}" for s in STATES] - states = self.statedb.get_states(ids) - assert(len(ids) == len(states)) - for i, id in enumerate(ids): - assert(states[id] == STATES[i]) - - def test_set_processing(self): - resp = self.statedb.set_processing(test_item['id'], execution='testarn') - assert(resp['ResponseMetadata']['HTTPStatusCode'] == 200) - dbitem = self.statedb.get_dbitem(test_item['id']) - assert(dbitem['state_updated'].startswith('PROCESSING')) - assert(dbitem['executions'] == ['testarn']) - - def test_set_outputs(self): - resp = self.statedb.set_completed(test_item['id'], outputs=['output-item']) - assert(resp['ResponseMetadata']['HTTPStatusCode'] == 200) - dbitem = self.statedb.get_dbitem(test_item['id']) - assert(dbitem['outputs'][0] == 'output-item') - - def test_set_completed(self): - resp = self.statedb.set_completed(test_item['id']) - assert(resp['ResponseMetadata']['HTTPStatusCode'] == 200) - dbitem = self.statedb.get_dbitem(test_item['id']) - assert(dbitem['state_updated'].startswith('COMPLETED')) - - def test_set_failed(self): - resp = self.statedb.set_failed(test_item['id'], msg='test failure') - assert(resp['ResponseMetadata']['HTTPStatusCode'] == 200) - dbitem = self.statedb.get_dbitem(test_item['id']) - assert(dbitem['state_updated'].startswith('FAILED')) - assert(dbitem['last_error'] == 'test failure') - - def test_set_completed_with_outputs(self): - resp = self.statedb.set_completed(test_item['id'], outputs=['output-item2']) - assert(resp['ResponseMetadata']['HTTPStatusCode'] == 200) - dbitem = self.statedb.get_dbitem(test_item['id']) - assert(dbitem['state_updated'].startswith('COMPLETED')) - assert(dbitem['outputs'][0] == 'output-item2') - - def test_set_invalid(self): - resp = self.statedb.set_invalid(test_item['id'], msg='test failure') - assert(resp['ResponseMetadata']['HTTPStatusCode'] == 200) - dbitem = self.statedb.get_dbitem(test_item['id']) - assert(dbitem['state_updated'].startswith('INVALID')) - assert(dbitem['last_error'] == 'test failure') - - def test_get_counts(self): - count = self.statedb.get_counts(test_dbitem['collections_workflow']) - assert(count == self.nitems + 4) - for s in STATES: - count = self.statedb.get_counts(test_dbitem['collections_workflow'], state=s) - if s == 'PROCESSING': - assert(count == self.nitems + 1) - else: - assert(count == 1) - count = self.statedb.get_counts(test_dbitem['collections_workflow'], since='1h') - - - def _test_get_counts_paging(self): - for i in range(5000): - self.statedb.set_processing(test_item['id'] + f"_{i}", execution='arn::test') - count = self.statedb.get_counts(test_dbitem['collections_workflow']) - assert(count == 1004) - for i in range(5000): - self.statedb.delete_item(test_item['id'] + f"_{i}") +def _test_get_counts_paging(state_table): + for i in range(5000): + state_table.set_processing(test_item['id'] + f"_{i}", execution='arn::test') + count = state_table.get_counts(test_dbitem['collections_workflow']) + assert(count == 1004) + for i in range(5000): + state_table.delete_item(test_item['id'] + f"_{i}")