import boto3 import json import nbformat from nbconvert.preprocessors import ExecutePreprocessor, CellExecutionError, TagRemovePreprocessor import botocore.exceptions import re import os import sys from nbconvert.exporters import NotebookExporter from traitlets.config import Config from datetime import datetime import logging actpref = "local-" if "-l" in sys.argv else "" logdir = '/home/ec2-user/SageMaker/'+actpref+'stepfunctionlogs/' os.makedirs(logdir, exist_ok=True) logging.basicConfig(filename= logdir + datetime.now().strftime('%Y%m%d')+'.log', level=logging.DEBUG) def get_instance_name(): """ Get the name of the current Sagemaker instance :return: name of the instance (as seen in aws) """ log_path = '/opt/ml/metadata/resource-metadata.json' with open(log_path, 'r') as logs: _logs = json.load(logs) notebook_name = _logs['ResourceName'] return notebook_name def get_activity(activity_arn, notebook_name): """ Polls the step function until the activity_arn has a task. Is a blocking function :activity_arn: The arn of the activity to poll :return: the task object that aws returns {taskToken, input} """ task = step.get_activity_task(activityArn=activity_arn, workerName=notebook_name) logging.debug("Task received") logging.debug(task) return task def get_notebook(input): """ Get the notebook name from json and adds the full path to it :input: json object {Notebook} :return: returns the notebooks fullpath /home/ec2-user/SageMaker/ """ full_path = "/home/ec2-user/SageMaker" + input location = '/'.join(full_path.split('/')[:-1]) logging.debug("Location:" + location) return full_path def remove_cells(full_path,skip_cell_list): """ creates a new notebook at the location with timestamp folder including only those cells which are not present in tag list :full_path: absolute path to notebook to run :skip_cell_list : list of tags added to cell in the notebook to skip :return: any errors or notebook path if run succeeded """ try: logging.debug("remove_cells" + ' '.join(skip_cell_list)) c=Config() c.NotebookExporter.preprocessors = ["nbconvert.preprocessors.TagRemovePreprocessor"] c.TagRemovePreprocessor.enabled = True c.TagRemovePreprocessor.remove_cell_tags = (skip_cell_list) exporter = NotebookExporter(config=c) exporter.register_preprocessor(TagRemovePreprocessor(config=c),True) ipynb = nbformat.read(full_path, as_version=4) output = NotebookExporter(config=c).from_notebook_node(ipynb) nb_name = full_path.split("/")[-1].split(".")[0] logging.debug("notebook name" + nb_name) output_nb_tmp_path=os.path.dirname(full_path) output_nb_path=output_nb_tmp_path+'/'+'nb_output/' + datetime.now().strftime('%Y%m%d') + '/'+ nb_name + "_"+ datetime.now().strftime('%Y%m%d_%H%M%S') + ".ipynb" logging.debug("output path:" + output_nb_path) os.makedirs(os.path.dirname(output_nb_path), exist_ok=True) with open(output_nb_path, mode='w', encoding='utf-8') as f: f.write(output[0]) return output_nb_path except Exception as err: raise err def run_notebook(output_nb_path): """ runs the notebook and returns errors if there are any :full_path: absolute path to notebook to run :return: any errors or None if run succeeded """ error = None try: logging.debug("run notebook") with open(output_nb_path) as notebook: nb = nbformat.read(notebook, as_version=4) kernal_name = nb['metadata']['kernelspec']['name'] kernal = kernal_name if kernal_name != "conda_python3" else "python" logging.debug("kernal is " + kernal) ep = ExecutePreprocessor(timeout=1500, kernel_name=kernal) ep.preprocess(nb) except Exception as err: error = err finally: try: with open(output_nb_path, mode='w', encoding='utf-8') as output_notebook: nbformat.write(nb, output_notebook) except: return error return error def clean_error(err): """ removes colouring and converts error to string :err: an Exceptions output to clean up :return: a string with no colour information """ ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[-/]*[@-~])') return ansi_escape.sub('', str(err)) def send_result(token, error): """ send the results back to stepfunction :token: the taskToken aws uses to identify the activity :error: any errors in the run, None if all was successful :return: a string with no colour information """ if error == "None" or error == None: logging.debug("send success") step.send_task_success(taskToken=task['taskToken'], output = '{}') else: logging.debug("send failure") logging.error(error) step.send_task_failure(taskToken=task['taskToken'], cause = error) step = boto3.client('stepfunctions') notebook_name = get_instance_name() client = boto3.client("sts") account_id = client.get_caller_identity()["Account"] activity_arn = 'arn:aws:states:eu-west-1:'+account_id+':activity:'+actpref+notebook_name logging.debug(activity_arn) print(activity_arn) running = True while running: try: logging.debug("waiting for activity") task = get_activity(activity_arn, notebook_name) except botocore.exceptions.ReadTimeoutError: continue try: if 'input' not in task: continue input = json.loads(task['input']) notebook = get_notebook(input['notebook']) skip_cell_list = input['tags_to_skip'].split(',') logging.debug(skip_cell_list) cell_remove = remove_cells(notebook,skip_cell_list) logging.debug(cell_remove) result = run_notebook(cell_remove) logging.debug(result) send_result(task['taskToken'], clean_error(result)) except CellExecutionError as err: send_result(task['taskToken'], "Notebook: " + notebook + " " + clean_error(result)) except FileNotFoundError as err: send_result(task['taskToken'], "Notebook: " + notebook + " " + clean_error(err)) logging.debug("Finished")