diff --git a/ingestion_tools/docs/enqueue_runs.md b/ingestion_tools/docs/enqueue_runs.md index 68e31efcf..11b7ff346 100644 --- a/ingestion_tools/docs/enqueue_runs.md +++ b/ingestion_tools/docs/enqueue_runs.md @@ -152,6 +152,8 @@ python3 enqueue_runs.py sync --exclude '*' --include 'Annotations/*.json' --s3-p | --include-deposition | null | Look for deposition metadata with the deposition ids passed here. This helps sync the deposition data. | | --no-sync-dataset | False | Skip syncing datasets. This is useful when we want to only update deposition data | +## Retrieving logs from job runs +Use the `fetch_enqueue_runs_logs.py` script to retrieve logs from the job runs. This script is used for fetching logs for the jobs run with `enqueue_runs.py` and categorizing them into success and failed directories based on the execution status. To provide the required ARNs needed for the `fetch_enqueue_runs_logs.py` script, run `enqueue_runs.py` with an output log file specified using the `--execution-machine-log` flag. See `fetch_enqueue_runs_logs.md` for more details. ## Building and pushing up a dev/test image: diff --git a/ingestion_tools/docs/fetch_enqueue_runs_logs.md b/ingestion_tools/docs/fetch_enqueue_runs_logs.md new file mode 100644 index 000000000..bc2319c52 --- /dev/null +++ b/ingestion_tools/docs/fetch_enqueue_runs_logs.md @@ -0,0 +1,47 @@ +# fetch_enqueue_runs_logs.py + +This script is used for fetching logs for the jobs run with `enqueue_runs.py` and categorizing them into success and failed directories based on the execution status. It looks into the AWS Step Functions executions and categorizes them into failed and success directories based on the execution status. The logs are retrieved using AWS CloudWatch Logs. + +## One-time Setup +Make sure you have at least python 3.11 installed. If you need to work with multiple versions of python, [pyenv](https://github.com/pyenv/pyenv) can help with that. + +Before running the script, ensure all the required packages are installed in a virtualenv: +```bash +cd ingestion_tools +python3 -m venv .venv # create a virtualenv +source .venv/bin/activate # activate the virtualenv +python3 -m pip install poetry # Install the poetry package manager +poetry install # Use poetry to install this package's dependencies +``` + +## Setting Up AWS Profile +Before running the script, you need to set up your AWS profile. This can be done by setting the AWS_PROFILE environment variable to the desired profile name. You can set the `AWS_PROFILE` variable using the following command: + +`export AWS_PROFILE=your-profile-name` + +## Script Usage +To generate the log file for the jobs run with `enqueue_runs.py`, run `enqueue_runs.py` with an output log file specified using the `--execution-machine-log` flag. This will generate a file containing the execution ARNs of the jobs run with `enqueue_runs.py`. This file can be used as input to `fetch_enqueue_runs_logs.py` to fetch the logs for the jobs. + +Command-Line Arguments + +`execution-arn`: One or more AWS Step Function execution ARNs. If multiple ARNs are provided, they should be separated by space. + +`--input-file`: Path to a file containing a list of execution ARNs, one per line. + +`--output-dir`: Directory to save the fetched logs. Defaults to ./fetch-logs. + +`--profile`: AWS profile to use. If not provided, your default profile will be used. + +`--failed-only`: Fetch logs only for failed executions. + +`--links-only`: Only retrieve links to the CloudWatch logs, don't fetch any actual logs. + +## Examples: + +Fetch logs for specific execution ARNs: + +`python fetch_enqueue_runs_logs.py arn:aws:states:us-west-2:123456789012:execution:StateMachineName:execution1 arn:aws:states:us-west-2:123456789012:execution:StateMachineName:execution2` + +Fetch logs using an input file containing execution ARNs: + +`python fetch_enqueue_runs_logs.py --input-file execution_arns.txt --output-dir /tmp/fetch-logs` diff --git a/ingestion_tools/scripts/enqueue_runs.py b/ingestion_tools/scripts/enqueue_runs.py index 6e9e887b8..2ab8177c5 100644 --- a/ingestion_tools/scripts/enqueue_runs.py +++ b/ingestion_tools/scripts/enqueue_runs.py @@ -1,5 +1,6 @@ import json import logging +import os import os.path import re import time @@ -58,6 +59,14 @@ def enqueue_common_options(func): help="Specify docker image tag, defaults to 'main'", ), ) + options.append( + click.option( + "--execution-machine-log", + type=str, + default=None, + help="Log execution machine ARNs to this file", + ), + ) options.append(click.option("--memory", type=int, default=None, help="Specify memory allocation override")) options.append(click.option("--parallelism", required=True, type=int, default=20)) for option in options: @@ -65,16 +74,26 @@ def enqueue_common_options(func): return func +def create_execution_machine_log_file(filename): + abs_path = os.path.abspath(filename) + if os.path.exists(abs_path): + os.remove(abs_path) + logger.warning("Removing existing %s file.", filename) + + os.makedirs(os.path.dirname(abs_path), exist_ok=True) + + def handle_common_options(ctx, kwargs): ctx.obj = { "environment": kwargs["environment"], "ecr_repo": kwargs["ecr_repo"], "ecr_tag": kwargs["ecr_tag"], + "execution_machine_log": kwargs["execution_machine_log"], "memory": kwargs["memory"], "parallelism": kwargs["parallelism"], **get_aws_env(kwargs["environment"]), } - enqueue_common_keys = ["environment", "ecr_repo", "ecr_tag", "memory", "parallelism"] + enqueue_common_keys = ["environment", "ecr_repo", "ecr_tag", "execution_machine_log", "memory", "parallelism"] # Make sure to remove these common options from the list of args processed by commands. for opt in enqueue_common_keys: del kwargs[opt] @@ -98,6 +117,7 @@ def run_job( ecr_repo: str, ecr_tag: str, memory: int | None = None, + execution_machine_log: str | None = None, **kwargs, # Ignore any the extra vars this method doesn't need. ): if not memory: @@ -128,12 +148,18 @@ def run_job( service_name="stepfunctions", ) - return client.start_execution( + execution = client.start_execution( stateMachineArn=state_machine_arn, name=execution_name, input=json.dumps(sfn_input_json), ) + if execution_machine_log is not None: + with open(execution_machine_log, "a") as f: + f.write(execution["executionArn"] + "\n") + + return execution + def get_aws_env(environment): # Learn more about our AWS environment @@ -278,6 +304,9 @@ def db_import( if not ctx.obj.get("memory"): ctx.obj["memory"] = 4000 + if ctx.obj.get("execution_machine_log") is not None: + create_execution_machine_log_file(ctx.obj.get("execution_machine_log")) + futures = [] with ProcessPoolExecutor(max_workers=ctx.obj["parallelism"]) as workerpool: for dataset_id, _ in get_datasets( @@ -388,6 +417,9 @@ def queue( filter_datasets = [re.compile(pattern) for pattern in kwargs.get("filter_dataset_name", [])] exclude_datasets = [re.compile(pattern) for pattern in kwargs.get("exclude_dataset_name", [])] + if ctx.obj.get("execution_machine_log") is not None: + create_execution_machine_log_file(ctx.obj.get("execution_machine_log")) + # Always iterate over depostions, datasets and runs. for deposition in DepositionImporter.finder(config): print(f"Processing deposition: {deposition.name}") @@ -565,6 +597,9 @@ def sync( for param, value in OrderedSyncFilters._options: new_args.append(f"--{param.name} '{value}'") + if ctx.obj.get("execution_machine_log") is not None: + create_execution_machine_log_file(ctx.obj.get("execution_machine_log")) + if sync_dataset: entities = get_datasets( input_bucket, diff --git a/ingestion_tools/scripts/fetch_enqueue_runs_logs.py b/ingestion_tools/scripts/fetch_enqueue_runs_logs.py new file mode 100755 index 000000000..25bff2a68 --- /dev/null +++ b/ingestion_tools/scripts/fetch_enqueue_runs_logs.py @@ -0,0 +1,144 @@ +import logging +import os +import re +from concurrent.futures import ThreadPoolExecutor +from typing import Tuple + +import click +from boto3 import Session + +logger = logging.getLogger("db_import") +logging.basicConfig(level=logging.INFO) + +LOG_GROUP_NAME = "/aws/batch/job" + + +def get_log_stream(session: Session, execution_arn: str) -> Tuple[bool, str]: + client = session.client("stepfunctions") + response = client.get_execution_history( + executionArn=execution_arn, + ) + + # get the last task scheduled with a log stream + history = response["events"] + history.sort(key=lambda x: x["timestamp"]) + history = [ + event + for event in history + if event["type"] == "TaskScheduled" and "LogStreamName" in event["taskScheduledEventDetails"]["parameters"] + ] + if len(history) == 0: + logger.error("Skipping, no log stream found for %s", execution_arn) + return False, None + last_task_submitted = history[-1] + parameters = last_task_submitted["taskScheduledEventDetails"]["parameters"] + failed = re.search(r'"Status":"FAILED"', parameters, re.IGNORECASE) + return failed, re.search(r'"LogStreamName":"([a-zA-Z-/0-9]*)"', parameters).group(1) + + +def get_log_events(session: Session, log_group_name, log_stream_name): + client = session.client("logs") + response = client.get_log_events( + logGroupName=log_group_name, + logStreamName=log_stream_name, + startFromHead=True, + ) + events = response["events"] + return [event["message"] for event in events] + + +def process_arn(arn: str, session: Session, output_dir: str, failed_only: bool, links_only: bool) -> str: + log_stream_failed, log_stream_name = get_log_stream(session, arn) + if not log_stream_name: + logger.warning("No log stream found for %s, possibly still running", arn) + return + + result = "FAILED" if log_stream_failed else "SUCCESS" + logger.info("%s: %s", result, arn) + output_file = ( + output_dir + ("failed/" if log_stream_failed else "success/") + arn.replace("/", "_").replace(":", "_") + ".log" + ) + + if links_only: + link = f"https://console.aws.amazon.com/cloudwatch/home?region={session.region_name}#logEventViewer:group={LOG_GROUP_NAME};stream={log_stream_name}" + logger.info("Link: %s", link) + return result + + if failed_only and not log_stream_failed: + return result + + logs = get_log_events(session, LOG_GROUP_NAME, log_stream_name) + if os.path.exists(output_file): + logger.warning("Removing existing %s", output_file) + os.remove(output_file) + logger.info("Writing to %s", output_file) + with open(output_file, "w") as f: + f.write("\n".join(logs)) + + return result + + +@click.command() +@click.argument("execution-arn", type=str, nargs=-1) +@click.option("--input-file", type=str, help="A file containing a list of execution ARNs.") +@click.option("--output-dir", type=str, default="./fetch-logs", help="The directory to save the logs to.") +@click.option("--profile", type=str, default=None, help="The AWS profile to use.") +@click.option("--failed-only", is_flag=True, help="Only fetch logs for failed executions.") +@click.option("--links-only", is_flag=True, help="Only get CloudWatch log links, not the logs themselves.") +def main(execution_arn: list[str], input_file: str, output_dir: str, profile: str, failed_only: bool, links_only: bool): + input_execution_arn = execution_arn + + if not execution_arn and not input_file: + logger.error("Please provide at least one execution ARN.") + return + + if input_file and execution_arn: + logger.error("Please provide either execution ARNs or an execution ARN file, not both.") + return + + if input_file: + if not os.path.exists(input_file): + logger.error("The provided execution ARN file does not exist.") + return + + with open(input_file, "r") as f: + input_execution_arn = f.read().splitlines() + + if output_dir[-1] != "/": + output_dir += "/" + + # setup output directory + if not links_only: + if not os.path.exists(output_dir): + os.makedirs(output_dir) + if not os.path.exists(f"{output_dir}failed"): + os.makedirs(f"{output_dir}failed") + if not os.path.exists(f"{output_dir}success"): + os.makedirs(f"{output_dir}success") + + input_execution_arn = list(set(input_execution_arn)) + session = Session(region_name=input_execution_arn[0].split(":")[3], profile_name=profile) + + failed_count = 0 + successful_count = 0 + + # fetch logs, multithreaded + with ThreadPoolExecutor() as executor: + results = executor.map( + lambda arn: process_arn(arn, session, output_dir, failed_only, links_only), + input_execution_arn, + ) + for result in results: + if result == "FAILED": + failed_count += 1 + elif result == "SUCCESS": + successful_count += 1 + + logger.info("====================================") + logger.info("TOTAL FAILED: %d/%d", failed_count, len(input_execution_arn)) + logger.info("TOTAL SUCCEEDED %d/%d", successful_count, len(input_execution_arn)) + logger.info("TOTAL SKIPPED: %d", len(input_execution_arn) - failed_count - successful_count) + + +if __name__ == "__main__": + main()