From 972fda618c86896b5d91b3f9630b5141c3911cbe Mon Sep 17 00:00:00 2001 From: Stephen Salinas Date: Fri, 26 Feb 2016 13:16:41 -0500 Subject: [PATCH] faster logfetch, ditch unpack step --- scripts/logfetch/cat.py | 27 +++++++----- scripts/logfetch/grep.py | 41 ++++++++++-------- scripts/logfetch/live_logs.py | 56 +++++++------------------ scripts/logfetch/logfetch_base.py | 56 +++++++++---------------- scripts/logfetch/s3_logs.py | 39 ++++++----------- scripts/logfetch/search.py | 11 ++--- scripts/logfetch/singularity_request.py | 18 -------- scripts/logfetch/tail.py | 15 +++---- 8 files changed, 98 insertions(+), 165 deletions(-) delete mode 100644 scripts/logfetch/singularity_request.py diff --git a/scripts/logfetch/cat.py b/scripts/logfetch/cat.py index f8dc352007..5e5d1299f8 100644 --- a/scripts/logfetch/cat.py +++ b/scripts/logfetch/cat.py @@ -1,15 +1,22 @@ import os import sys +import subprocess +from logfetch_base import log from termcolor import colored def cat_files(args, all_logs): - if all_logs: - all_logs.sort() - for log in all_logs: - if not args.silent: - sys.stderr.write(colored(log, 'cyan') + '\n') - command = 'cat {0}'.format(log) - sys.stdout.write(os.popen(command).read() + '\n') - else: - if not args.silent: - sys.stderr.write(colored('No log files found\n', 'magenta')) + log('\n', args, False) + if all_logs: + all_logs.sort() + for filename in all_logs: + log('=> ' + colored(filename, 'cyan') + '\n', args, False) + if filename.endswith('.gz'): + cat = subprocess.Popen(['cat', filename], stdout=subprocess.PIPE) + content = subprocess.Popen(['zcat'], stdin=cat.stdout) + content.communicate() + else: + cat = subprocess.Popen(['cat', filename]) + cat.communicate() + sys.stdout.write('\n') + else: + log(colored('No log files found\n', 'magenta'), args, False) diff --git a/scripts/logfetch/grep.py b/scripts/logfetch/grep.py index c0a2d34c47..2a7fd26449 100644 --- a/scripts/logfetch/grep.py +++ b/scripts/logfetch/grep.py @@ -1,31 +1,38 @@ -import os import sys +import subprocess +from logfetch_base import log from termcolor import colored -GREP_COMMAND_FORMAT = '{0} {1}' DEFAULT_GREP_COMMAND = 'grep --color=always \'{0}\'' def grep_files(args, all_logs): - if not args.silent: - sys.stderr.write('\n') + log('\n', args, False) if args.grep: if all_logs: all_logs.sort() - for log in all_logs: - command = grep_command(args, log) - output = os.popen(command).read() - if output is not None and output != '': - if not args.silent: - sys.stderr.write(colored(log, 'cyan') + '\n') - sys.stdout.write(output) - - if not args.silent: - sys.stderr.write(colored('Finished grep, exiting', 'green') + '\n') + grep_cmd = grep_command(args) + log(colored('\nRunning grep command ({0})\n'.format(grep_cmd), 'cyan'), args, False) + for filename in all_logs: + log('=> ' + colored(filename, 'cyan') + '\n', args, True) + content = subprocess.Popen(['cat', filename], stdout=subprocess.PIPE) + if filename.endswith('.gz'): + zcat = subprocess.Popen('zcat', stdin=content.stdout, stdout=subprocess.PIPE) + grep = subprocess.Popen(grep_cmd, stdin=zcat.stdout, shell=True) + else: + grep = subprocess.Popen(grep_cmd, stdin=content.stdout, shell=True) + grep.communicate() + log(colored('Finished grep, exiting', 'green') + '\n', args, False) else: sys.stderr.write(colored('No logs found\n', 'magenta')) -def grep_command(args, filename): +def grep_command(args): if 'grep' in args.grep: - return GREP_COMMAND_FORMAT.format(args.grep, filename) + return args.grep + else: + return DEFAULT_GREP_COMMAND.format(args.grep) + +def cat_command(filename): + if filename.endswith('.gz'): + return 'zcat {0}'.format(filename) else: - return GREP_COMMAND_FORMAT.format(DEFAULT_GREP_COMMAND.format(args.grep), filename) + return 'cat {0}'.format(filename) \ No newline at end of file diff --git a/scripts/logfetch/live_logs.py b/scripts/logfetch/live_logs.py index 87cdb58f4b..1f55922f3d 100644 --- a/scripts/logfetch/live_logs.py +++ b/scripts/logfetch/live_logs.py @@ -1,22 +1,18 @@ import os -import sys import fnmatch import grequests -from termcolor import colored import callbacks -from singularity_request import get_json_response import logfetch_base +from termcolor import colored DOWNLOAD_FILE_FORMAT = 'http://{0}:5051/files/download.json' BROWSE_FOLDER_FORMAT = '{0}/sandbox/{1}/browse' TASK_HISTORY_FORMAT = '{0}/history/task/{1}' def download_live_logs(args): - if not args.silent: - sys.stderr.write(colored('Finding current live log files', 'cyan') + '\n') + logfetch_base.log(colored('Finding current live log files', 'cyan') + '\n', args, False) tasks = tasks_to_check(args) async_requests = [] - zipped_files = [] all_logs = [] callbacks.progress = 0 tasks_check_progress = 0 @@ -25,8 +21,6 @@ def download_live_logs(args): metadata = files_json(args, task) if 'slaveHostname' in metadata: uri = DOWNLOAD_FILE_FORMAT.format(metadata['slaveHostname']) - if args.verbose and not args.silent: - sys.stderr.write(colored('Finding logs in base directory on {0}'.format(metadata['slaveHostname']), 'magenta') + '\n') for log_file in base_directory_files(args, task, metadata): logfile_name = '{0}-{1}'.format(task, log_file) if not args.logtype or (args.logtype and logfetch_base.log_matches(log_file, args.logtype.replace('logs/', ''))): @@ -38,15 +32,9 @@ def download_live_logs(args): headers=args.headers ) ) - if logfile_name.endswith('.gz'): - zipped_files.append('{0}/{1}'.format(args.dest, logfile_name)) - else: - all_logs.append('{0}/{1}'.format(args.dest, logfile_name.replace('.gz', '.log'))) - elif args.logtype and args.verbose and not args.silent: - sys.stderr.write(colored('Excluding log {0}, doesn\'t match {1}'.format(log_file, args.logtype), 'magenta') + '\n') - - if args.verbose and not args.silent: - sys.stderr.write(colored('Finding logs in logs directory on {0}'.format(metadata['slaveHostname']), 'magenta') + '\n') + all_logs.append('{0}/{1}'.format(args.dest, logfile_name)) + elif args.logtype: + logfetch_base.log(colored('Excluding log {0}, doesn\'t match {1}'.format(log_file, args.logtype), 'magenta') + '\n', args, True) for log_file in logs_folder_files(args, task): logfile_name = '{0}-{1}'.format(task, log_file) if not args.logtype or (args.logtype and logfetch_base.log_matches(log_file, args.logtype.replace('logs/', ''))): @@ -58,24 +46,16 @@ def download_live_logs(args): headers=args.headers ) ) - if logfile_name.endswith('.gz'): - zipped_files.append('{0}/{1}'.format(args.dest, logfile_name)) - else: - all_logs.append('{0}/{1}'.format(args.dest, logfile_name.replace('.gz', '.log'))) - elif args.logtype and args.verbose and not args.silent: - sys.stderr.write(colored('Excluding log {0}, doesn\'t match {1}'.format(log_file, args.logtype), 'magenta') + '\n') + all_logs.append('{0}/{1}'.format(args.dest, logfile_name)) + elif args.logtype: + logfetch_base.log(colored('Excluding log {0}, doesn\'t match {1}'.format(log_file, args.logtype), 'magenta') + '\n', args, True) tasks_check_progress += 1 logfetch_base.update_progress_bar(tasks_check_progress, tasks_check_goal, 'Log Finder', args.silent) if async_requests: - if not args.silent: - sys.stderr.write(colored('\nStarting {0} live logs downloads\n'.format(len(async_requests)), 'cyan')) + logfetch_base.log(colored('\nStarting {0} live logs downloads\n'.format(len(async_requests)), 'cyan'), args, False) callbacks.goal = len(async_requests) grequests.map(async_requests, stream=True, size=args.num_parallel_fetches) - if zipped_files and not args.download_only: - if not args.silent: - sys.stderr.write(colored('\nUnpacking {0} log(s)\n'.format(len(zipped_files)), 'cyan')) - all_logs = all_logs + logfetch_base.unpack_logs(args, zipped_files) return all_logs def tasks_to_check(args): @@ -86,7 +66,7 @@ def tasks_to_check(args): def task_history(args, task): uri = TASK_HISTORY_FORMAT.format(logfetch_base.base_uri(args), task) - return get_json_response(uri, args) + return logfetch_base.get_json_response(uri, args) def task_still_running(args, task, history): try: @@ -97,11 +77,11 @@ def task_still_running(args, task, history): def files_json(args, task): uri = BROWSE_FOLDER_FORMAT.format(logfetch_base.base_uri(args), task) - return get_json_response(uri, args, {}, True) + return logfetch_base.get_json_response(uri, args, {}, True) def logs_folder_files(args, task): uri = BROWSE_FOLDER_FORMAT.format(logfetch_base.base_uri(args), task) - files_json = get_json_response(uri, args, {'path' : '{0}/logs'.format(task)}, True) + files_json = logfetch_base.get_json_response(uri, args, {'path' : '{0}/logs'.format(task)}, True) if 'files' in files_json: files = files_json['files'] return [f['name'] for f in files if logfetch_base.is_in_date_range(args, f['mtime'])] @@ -123,20 +103,16 @@ def valid_logfile(args, fileData): def should_download(args, filename, task): if args.use_cache and already_downloaded(args, filename): - if args.verbose and not args.silent: - sys.stderr.write(colored('Using cached version of file {0}\n'.format(filename), 'magenta')) + logfetch_base.log(colored('Using cached version of file {0}\n'.format(filename), 'magenta'), args, True) return False if filename.endswith('.gz') and already_downloaded(args, filename): - if args.verbose and not args.silent: - sys.stderr.write(colored('Using cached version of file {0}, zipped file has not changed\n'.format(filename), 'magenta')) + logfetch_base.log(colored('Using cached version of file {0}, zipped file has not changed\n'.format(filename), 'magenta'), args, True) return False history = task_history(args, task) if not task_still_running(args, task, history) and already_downloaded(args, filename) and file_not_too_old(args, history, filename): - if args.verbose and not args.silent: - sys.stderr.write(colored('Using cached version of file {0}, {1}, file has not changed\n'.format(filename, history['taskUpdates'][-1]['taskState']), 'magenta')) + logfetch_base.log(colored('Using cached version of file {0}, {1}, file has not changed\n'.format(filename, history['taskUpdates'][-1]['taskState']), 'magenta'), args, True) else: - if args.verbose and not args.silent: - sys.stderr.write(colored('Will download file {0}, version on the server is newer than cached version\n'.format(filename), 'magenta')) + logfetch_base.log(colored('Will download file {0}, version on the server is newer than cached version\n'.format(filename), 'magenta'), args, True) return True diff --git a/scripts/logfetch/logfetch_base.py b/scripts/logfetch/logfetch_base.py index 1e51a50402..e908221d68 100644 --- a/scripts/logfetch/logfetch_base.py +++ b/scripts/logfetch/logfetch_base.py @@ -2,50 +2,16 @@ import sys import gzip import fnmatch +import requests from datetime import datetime, timedelta from termcolor import colored -from singularity_request import get_json_response +ERROR_STATUS_FORMAT = 'Singularity responded with an invalid status code ({0})' BASE_URI_FORMAT = '{0}{1}' ALL_REQUESTS = '/requests' REQUEST_TASKS_FORMAT = '/history/request/{0}/tasks' ACTIVE_TASKS_FORMAT = '/history/request/{0}/tasks/active' -def unpack_logs(args, logs): - successful = [] - goal = len(logs) - progress = 0 - for zipped_file in logs: - try: - if os.path.isfile(zipped_file): - if args.verbose and not args.silent: - sys.stderr.write(colored('Starting unpack of {0}'.format(zipped_file), 'magenta') + '\n') - file_in = gzip.open(zipped_file, 'rb') - unzipped = zipped_file.replace('.gz', '.log') - file_out = open(unzipped, 'wb') - file_out.write(file_in.read()) - file_out.close() - file_in.close - os.remove(zipped_file) - progress += 1 - if args.verbose and not args.silent: - sys.stderr.write(colored('Unpacked log {0}/{1}'.format(progress, goal), 'green') + colored(zipped_file, 'white') + '\n') - else: - update_progress_bar(progress, goal, 'Unpack', args.silent) - successful.append(unzipped) - else: - progress += 1 - update_progress_bar(progress, goal, 'Unpack', args.silent) - except Exception as e: - print e - if os.path.isfile(zipped_file): - os.remove(zipped_file) - sys.stderr.write(colored('Could not unpack {0}'.format(zipped_file), 'red') + '\n') - continue - if not args.silent: - sys.stderr.write('\n') - return successful - def base_uri(args): if not args.singularity_uri_base: exit("Specify a base uri for Singularity (-u)") @@ -62,7 +28,7 @@ def tasks_for_requests(args): tasks = tasks[0:args.task_count] if hasattr(args, 'task_count') else tasks all_tasks = all_tasks + tasks if not all_tasks: - sys.stderr.write(colored('No tasks found, check that the request/task you are searching for exists...', 'red')) + log(colored('No tasks found, check that the request/task you are searching for exists...', 'red'), args, False) exit(1) return all_tasks @@ -116,3 +82,19 @@ def update_progress_bar(progress, goal, progress_type, silent): if not silent: sys.stderr.write("\r{0} Progress: [".format(progress_type) + colored("{0}".format(hashes + spaces), color) + "] {0}%".format(int(round(percent * 100)))) sys.stderr.flush() + +def log(message, args, verbose): + if (not verbose or (verbose and args.verbose)) and not args.silent: + sys.stderr.write(message) + +def get_json_response(uri, args, params={}, skip404ErrMessage=False): + singularity_response = requests.get(uri, params=params, headers=args.headers) + if singularity_response.status_code < 199 or singularity_response.status_code > 299: + if not (skip404ErrMessage and singularity_response.status_code == 404): + log('{0} params:{1}\n'.format(uri, str(params)), args, False) + if not (skip404ErrMessage and singularity_response.status_code == 404): + sys.stderr.write(colored(ERROR_STATUS_FORMAT.format(singularity_response.status_code), 'red') + '\n') + if not (skip404ErrMessage and singularity_response.status_code == 404): + log(colored(singularity_response.text, 'red') + '\n', args, False) + return {} + return singularity_response.json() diff --git a/scripts/logfetch/s3_logs.py b/scripts/logfetch/s3_logs.py index 2b7091aa9a..2fff6c5f8d 100644 --- a/scripts/logfetch/s3_logs.py +++ b/scripts/logfetch/s3_logs.py @@ -5,7 +5,6 @@ import time from termcolor import colored import callbacks -from singularity_request import get_json_response TASK_FORMAT = '/task/{0}' S3LOGS_URI_FORMAT = '{0}/logs{1}' @@ -22,43 +21,30 @@ def download_s3_logs(args): callbacks.progress = 0 logs = logs_for_all_requests(args) async_requests = [] - zipped_files = [] all_logs = [] for log_file in logs: filename = log_file['key'].rsplit("/", 1)[1] if logfetch_base.is_in_date_range(args, int(str(log_file['lastModified'])[0:-3])): if not args.logtype or log_matches(args, filename): - if args.verbose and not args.silent: - sys.stderr.write(colored('Including log {0}'.format(filename), 'blue') + '\n') + logfetch_base.log(colored('Including log {0}'.format(filename), 'blue') + '\n', args, True) if not already_downloaded(args.dest, filename): async_requests.append( grequests.AsyncRequest('GET', log_file['getUrl'], callback=callbacks.generate_callback(log_file['getUrl'], args.dest, filename, args.chunk_size, args.verbose, args.silent), headers=args.headers) ) else: - if args.verbose and not args.silent: - sys.stderr.write(colored('Log already downloaded {0}'.format(filename), 'blue') + '\n') - all_logs.append('{0}/{1}'.format(args.dest, filename.replace('.gz', '.log'))) - zipped_files.append('{0}/{1}'.format(args.dest, filename)) + logfetch_base.log(colored('Log already downloaded {0}'.format(filename), 'blue') + '\n', args, True) + all_logs.append('{0}/{1}'.format(args.dest, filename)) else: - if args.verbose and not args.silent: - sys.stderr.write(colored('Excluding {0} log does not match logtype argument {1}'.format(filename, args.logtype), 'magenta') + '\n') + logfetch_base.log(colored('Excluding {0} log does not match logtype argument {1}'.format(filename, args.logtype), 'magenta') + '\n', args, True) else: - if args.verbose and not args.silent: - sys.stderr.write(colored('Excluding {0}, not in date range'.format(filename), 'magenta') + '\n') + logfetch_base.log(colored('Excluding {0}, not in date range'.format(filename), 'magenta') + '\n', args, True) if async_requests: - if not args.silent: - sys.stderr.write(colored('Starting {0} S3 Downloads with {1} parallel fetches\n'.format(len(async_requests), args.num_parallel_fetches), 'cyan')) + logfetch_base.log(colored('Starting {0} S3 Downloads with {1} parallel fetches\n'.format(len(async_requests), args.num_parallel_fetches), 'cyan'), args, False) callbacks.goal = len(async_requests) grequests.map(async_requests, stream=True, size=args.num_parallel_fetches) - if not args.silent and not args.download_only: - sys.stderr.write(colored('\nUnpacking {0} S3 log(s)\n'.format(len(async_requests)), 'cyan')) else: - if not args.silent: - sys.stderr.write(colored('No S3 logs to download\n', 'cyan')) - if not args.download_only: - all_logs = all_logs + logfetch_base.unpack_logs(args, zipped_files) - if not args.silent: - sys.stderr.write(colored('All S3 logs up to date\n', 'cyan')) + logfetch_base.log(colored('No S3 logs to download\n', 'cyan'), args, False) + logfetch_base.log(colored('All S3 logs up to date\n', 'cyan'), args, False) return all_logs def already_downloaded(dest, filename): @@ -67,21 +53,20 @@ def already_downloaded(dest, filename): def logs_for_all_requests(args): s3_params = {'start': int(time.mktime(args.start.timetuple()) * 1000), 'end': int(time.mktime(args.end.timetuple()) * 1000)} if args.taskId: - return get_json_response(s3_task_logs_uri(args, args.taskId), args, s3_params) + return logfetch_base.get_json_response(s3_task_logs_uri(args, args.taskId), args, s3_params) else: tasks = logfetch_base.tasks_for_requests(args) logs = [] tasks_progress = 0 tasks_goal = len(tasks) for task in tasks: - s3_logs = get_json_response(s3_task_logs_uri(args, task), args, s3_params) + s3_logs = logfetch_base.get_json_response(s3_task_logs_uri(args, task), args, s3_params) logs = logs + s3_logs if s3_logs else logs tasks_progress += 1 logfetch_base.update_progress_bar(tasks_progress, tasks_goal, 'S3 Log Finder', args.silent) - if not args.silent: - sys.stderr.write(colored('\nAlso searching s3 history...\n', 'cyan')) + logfetch_base.log(colored('\nAlso searching s3 history...\n', 'cyan'), args, False) for request in logfetch_base.all_requests(args): - s3_logs = get_json_response(s3_request_logs_uri(args, request), args, s3_params) + s3_logs = logfetch_base.get_json_response(s3_request_logs_uri(args, request), args, s3_params) logs = logs + s3_logs if s3_logs else logs return [dict(t) for t in set([tuple(l.items()) for l in logs])] # remove any duplicates diff --git a/scripts/logfetch/search.py b/scripts/logfetch/search.py index 1764984dea..35b44bbe23 100644 --- a/scripts/logfetch/search.py +++ b/scripts/logfetch/search.py @@ -1,8 +1,7 @@ import os import re -import sys import fnmatch -import logfetch_base +from logfetch_base import log, is_in_date_range from termcolor import colored def find_cached_logs(args): @@ -10,19 +9,17 @@ def find_cached_logs(args): log_fn_match = get_matcher(args) for filename in os.listdir(args.dest): if fnmatch.fnmatch(filename, log_fn_match) and in_date_range(args, filename): - if args.verbose and not args.silent: - sys.stderr.write(colored('Including log {0}\n'.format(filename), 'blue')) + log(colored('Including log {0}\n'.format(filename), 'blue'), arsg, True) matching_logs.append('{0}/{1}'.format(args.dest, filename)) else: - if args.verbose and not args.silent: - sys.stderr.write(colored('Excluding log {0}, not in date range\n'.format(filename), 'magenta')) + log(colored('Excluding log {0}, not in date range\n'.format(filename), 'magenta'), args, True) return matching_logs def in_date_range(args, filename): timestamps = re.findall(r"-\d{13}-", filename) if timestamps: - return logfetch_base.is_in_date_range(args, int(str(timestamps[-1]).replace("-", "")[0:-3])) + return is_in_date_range(args, int(str(timestamps[-1]).replace("-", "")[0:-3])) else: return True diff --git a/scripts/logfetch/singularity_request.py b/scripts/logfetch/singularity_request.py deleted file mode 100644 index a3d1fc1d98..0000000000 --- a/scripts/logfetch/singularity_request.py +++ /dev/null @@ -1,18 +0,0 @@ -import sys -import requests -from termcolor import colored - -ERROR_STATUS_FORMAT = 'Singularity responded with an invalid status code ({0})' - -def get_json_response(uri, args, params={}, skip404ErrMessage=False): - singularity_response = requests.get(uri, params=params, headers=args.headers) - if singularity_response.status_code < 199 or singularity_response.status_code > 299: - if not args.silent and not (skip404ErrMessage and singularity_response.status_code == 404): - sys.stderr.write('{0} params:{1}\n'.format(uri, str(params))) - if not (skip404ErrMessage and singularity_response.status_code == 404): - sys.stderr.write(colored(ERROR_STATUS_FORMAT.format(singularity_response.status_code), 'red') + '\n') - if not args.silent and not (skip404ErrMessage and singularity_response.status_code == 404): - sys.stderr.write(colored(singularity_response.text, 'red') + '\n') - return {} - return singularity_response.json() - diff --git a/scripts/logfetch/tail.py b/scripts/logfetch/tail.py index a7e45136c6..054261f25b 100644 --- a/scripts/logfetch/tail.py +++ b/scripts/logfetch/tail.py @@ -8,6 +8,7 @@ from grep import grep_command from termcolor import colored from singularity_request import get_json_response +from logfetch_base import log TAIL_LOG_FORMAT = '{0}/sandbox/{1}/read' READ_INTERVAL = 5 @@ -21,13 +22,10 @@ def start_tail(args): tasks = [str(t) for t in logfetch_base.tasks_for_requests(args)] else: tasks = [args.taskId] - if args.verbose: - sys.stderr.write(colored('Tailing logs for tasks:\n', 'green')) - if not args.silent: - for t in tasks: - sys.stderr.write(colored('{0}\n'.format(t), 'yellow')) - if not args.silent: - sys.stderr.write(colored('ctrl+c to exit\n', 'cyan')) + log(colored('Tailing logs for tasks:\n', 'green'), args, True) + for t in tasks: + log(colored('{0}\n'.format(t), 'yellow'), args, True) + log(colored('ctrl+c to exit\n', 'cyan'), args, False) try: threads = [] for task in tasks: @@ -39,8 +37,7 @@ def start_tail(args): if not t.isAlive: break except KeyboardInterrupt: - if not args.silent: - sys.stderr.write(colored('Stopping tail', 'magenta') + '\n') + log(colored('Stopping tail', 'magenta') + '\n', args, False) sys.exit(0) def logs_folder_files(args, task):