diff --git a/scripts/README.md b/scripts/README.md index bb5ecb8f93..6853aaa7f2 100644 --- a/scripts/README.md +++ b/scripts/README.md @@ -14,22 +14,28 @@ pip install singularity-logfetch - Any arguments specified in the log file can be overriden on the command line - You can store a number of configuration files for different clusters in the config directory (`~/.logfetch` by default) and choose which config to use with the -c option +#Logfetch and Logcat +Two commands exist for downloading logs. +- `logfetch` will download and optionally output a grep command for the logs +- `logcat` will download logs and pipe the contents to stdout + ##Options |Flags|Description|Default| |:---:|:---------|:-----:| -|-f , --conf_folder|Folder to look for configuration files|`~/.logfetch`| -|-c , --conf_file|configuration file to use(path relative to conf_folder)|default| -|-t , --taskId|TaskId to fetch logs for| -|-r , --requestId|RequestId to fetch logs for| -|--task-count|Number of recent tasks (belonging to a request) to fetch live logs (on machine not s3)|1| -|-d , --deployId|DeployId to fetch logs for (Must also specify requestId when using this option)| -|--dest|Destination folder for downloaded log files|`~/.logfetch_cache`| +|-f , --conf-folder|Folder to look for configuration files|`~/.logfetch`| +|-c , --conf-file|configuration file to use(path relative to conf_folder)|default| +|-t , --task-id|Task Id to fetch logs for| +|-r , --request-id|Request Id to fetch logs for| +|-tc, --task-count|Number of recent tasks (belonging to a request) to fetch live logs (on machine not s3)|1| +|-d , --deploy-id|Deploy Id to fetch logs for (Must also specify requestId when using this option)| +|-o, --dest|Destination folder for download output|`~/.logfetch_cache`| |-n --num-parallel-fetches|Max number of log fetches to make at once|5 -|-cs, --chunk_size|Chunk size for writing responses to file system|8192 -|-u, --singularity-uri-base|Base url for singularity (e.g. `localhost:8080/singularity/v2/api`), This MUST be set| +|-cs, --chunk-size|Chunk size for writing responses to file system|8192 +|-u, --singularity-uri-base|Base url for singularity (e.g. `localhost:8080/singularity/v2/api`)| Must be set!| |-s , --start-days|Search for logs no older than this many days|7 |-e , --end-days|Search for logs no newer than this many days| None (today) -|-g, --grep|Grep string for searching log files| +|-g, --grep|Grep string for searching log files(Only for `logfetch`)| +|-l, --logtype|Glob matcher for type of log file to download| None (match all)| ##Grep and Log Files When the `-g` option is set, the log fetcher will grep the downloaded files for the provided regex. @@ -64,9 +70,14 @@ When the `-g` option is set, the log fetcher will grep the downloaded files for - Don't search, just download logs -`logfetch -r 'My_Jobs_Id'` +`logfetch -r 'My_Request_Id'` + +- Only get logs that match a glob or logfile name with the `-l` option -##Tailing Logs +`logfetch -r ‘My_Request_Id’ -l ‘*.out’` +`logfetch -r ‘My_Request_Id’ -l ‘access.log’` + +#Logtail You can tail live log files using `logtail`. Just provide the request, task, or request and deploy along with a log file path. For example, to tail the `service.log` file for all tasks for a request named `MyRequest`, you would use the command: @@ -76,3 +87,17 @@ For example, to tail the `service.log` file for all tasks for a request named `M - The path for the log file is relative to the base path for that task's sandbox. For example, to tail a file in `(sandbox path)/logs/access.log`, the argument to -l would be `logs/access.log` You can also provide the `-g` option which will provide the grep string to the singularity API and search the results. You cannot provide a full grep command as in some of the above examples, just a string to match on. + +##Options +|Flags|Description|Default| +|:---:|:---------|:-----:| +|-f , --conf-folder|Folder to look for configuration files|`~/.logfetch`| +|-c , --conf-file|configuration file to use(path relative to conf_folder)|default| +|-t , --task-id|Task Id to fetch logs for| +|-r , --request-id|Request Id to fetch logs for| +|-d , --deploy-id|Deploy Id to fetch logs for (Must also specify requestId when using this option)| +|-u, --singularity-uri-base|Base url for singularity (e.g. `localhost:8080/singularity/v2/api`)|Must be set!| +|-g, --grep|Grep string for searching log files| +|-l, --logfile|Log file path to tail (ie logs/access.log)|Must be set!| +|-v, --verbose|Extra output about the task id associated with logs in the output|False| + diff --git a/scripts/logfetch/callbacks.py b/scripts/logfetch/callbacks.py index a25c72f765..ff7b4851de 100644 --- a/scripts/logfetch/callbacks.py +++ b/scripts/logfetch/callbacks.py @@ -10,7 +10,7 @@ def callback(response, **kwargs): with open(path, 'wb') as f: for chunk in response.iter_content(chunk_size): f.write(chunk) - sys.stderr.write(colored('finished downloading {0}'.format(path), 'green') + '\n') + sys.stderr.write(colored('Downloaded ', 'green') + colored(path, 'white') + '\n') return callback diff --git a/scripts/logfetch/cat.py b/scripts/logfetch/cat.py new file mode 100644 index 0000000000..4c3f247ad0 --- /dev/null +++ b/scripts/logfetch/cat.py @@ -0,0 +1,23 @@ +import os +import sys + +CAT_COMMAND_FORMAT = 'xargs -n {0} cat < {1}' +def cat_files(args, all_logs): + if all_logs: + catlist_filename = '{0}/.catlist'.format(args.dest) + create_catlist(args, all_logs, catlist_filename) + command = CAT_COMMAND_FORMAT.format(len(all_logs), catlist_filename) + sys.stdout.write(os.popen(command).read() + '\n') + else: + sys.stderr.write(colored('No log files found\n', 'magenta')) + +def create_catlist(args, all_logs, catlist_filename): + catlist_file = open(catlist_filename, 'wb') + for log in all_logs: + catlist_file.write('{0}\n'.format(log)) + catlist_file.close() + +def remove_catlist(catlist_filename): + if os.path.isfile(catlist_filename): + os.remove(catlist_filename) + diff --git a/scripts/logfetch/entrypoint.py b/scripts/logfetch/entrypoint.py index c58aa96b64..99aac68dae 100644 --- a/scripts/logfetch/entrypoint.py +++ b/scripts/logfetch/entrypoint.py @@ -3,35 +3,52 @@ import sys import os from termcolor import colored - from fake_section_head import FakeSectionHead from live_logs import download_live_logs from s3_logs import download_s3_logs from tail import start_tail from grep import grep_files +from cat import cat_files CONF_READ_ERR_FORMAT = 'Could not load config from {0} due to {1}' DEFAULT_CONF_DIR = os.path.expanduser('~/.logfetch') DEFAULT_CONF_FILE = 'default' -DEFAULT_PARALLEL_FETCHES = 5 +DEFAULT_PARALLEL_FETCHES = 10 DEFAULT_CHUNK_SIZE = 8192 DEFAULT_DEST = os.path.expanduser('~/.logfetch_cache') DEFAULT_TASK_COUNT = 10 DEFAULT_DAYS = 7 -def exit(reason): - sys.stderr.write(colored(reason, 'red') + '\n') +def exit(reason, color='red'): + sys.stderr.write(colored(reason, color) + '\n') sys.exit(1) def tail_logs(args): - start_tail(args) + try: + start_tail(args) + except KeyboardInterrupt: + exit('Stopping logtail...', 'magenta') def fetch_logs(args): - check_dest(args) - all_logs = [] - all_logs += download_s3_logs(args) - all_logs += download_live_logs(args) - grep_files(args, all_logs) + try: + check_dest(args) + all_logs = [] + all_logs += download_s3_logs(args) + all_logs += download_live_logs(args) + grep_files(args, all_logs) + except KeyboardInterrupt: + exit('Stopping logfetch...', 'magenta') + +def cat_logs(args): + try: + check_dest(args) + all_logs = [] + all_logs += download_s3_logs(args) + all_logs += download_live_logs(args) + cat_files(args, all_logs) + except KeyboardInterrupt: + exit('Stopping logcat...', 'magenta') + def check_dest(args): if not os.path.exists(args.dest): @@ -39,8 +56,8 @@ def check_dest(args): def fetch(): conf_parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, add_help=False) - conf_parser.add_argument("-f", "--conf_folder", help="specify a folder for config files to live") - conf_parser.add_argument("-c", "--conf_file", help="Specify config file within the conf folder", metavar="FILE") + conf_parser.add_argument("-f", "--conf-folder", dest='conf_folder', help="specify a folder for config files to live") + conf_parser.add_argument("-c", "--conf-file", dest='conf_file', help="Specify config file within the conf folder", metavar="FILE") args, remaining_argv = conf_parser.parse_known_args() conf_dir = args.conf_folder if args.conf_folder else DEFAULT_CONF_DIR conf_file = os.path.expanduser(conf_dir + '/' + args.conf_file) if args.conf_file else os.path.expanduser(conf_dir + '/' + DEFAULT_CONF_FILE) @@ -61,36 +78,87 @@ def fetch(): sys.stderr.write(CONF_READ_ERR_FORMAT.format(conf_file, err) + '\n') parser = argparse.ArgumentParser(parents=[conf_parser], description="Fetch log files from Singularity. One can specify either a TaskId, RequestId and DeployId, or RequestId", - prog="log_fetcher") + prog="logfetch") parser.set_defaults(**defaults) - parser.add_argument("-t", "--taskId", help="TaskId of task to fetch logs for", metavar="taskId") - parser.add_argument("-r", "--requestId", help="RequestId of request to fetch logs for", metavar="requestId") - parser.add_argument("--task-count", help="Number of recent tasks per request to fetch logs from", metavar="taskCount") - parser.add_argument("-d", "--deployId", help="DeployId of task to fetch logs for", metavar="deployId") - parser.add_argument("--dest", help="Destination directory", metavar="DIR") - parser.add_argument("-n", "--num-parallel-fetches", help="Number of fetches to make at once", type=int, metavar="INT") - parser.add_argument("-cs", "--chunk-size", help="Chunk size for writing from response to filesystem", type=int, metavar="INT") - parser.add_argument("-u", "--singularity-uri-base", help="The base for singularity (eg. http://localhost:8080/singularity/v1)", metavar="URI") - parser.add_argument("-s", "--start-days", help="Search for logs no older than this many days", type=int, metavar="start_days") - parser.add_argument("-e", "--end-days", help="Search for logs no new than this many days (defaults to None/today)", type=int, metavar="end_days") - parser.add_argument("-g", "--grep", help="Regex to grep for (normal grep syntax) or a full grep command", metavar='grep') + parser.add_argument("-t", "--task-id", dest="taskId", help="TaskId of task to fetch logs for") + parser.add_argument("-r", "--request-id", dest="requestId", help="RequestId of request to fetch logs for (can be a glob)") + parser.add_argument("-tc","--task-count", dest="task_count", help="Number of recent tasks per request to fetch logs from") + parser.add_argument("-d", "--deploy-id", dest="deployId", help="DeployId of task to fetch logs for (can be a glob)") + parser.add_argument("-o", "--dest", dest="dest", help="Destination directory") + parser.add_argument("-n", "--num-parallel-fetches", dest="num_parallel_fetches", help="Number of fetches to make at once", type=int) + parser.add_argument("-cs", "--chunk-size", dest="chunk_size", help="Chunk size for writing from response to filesystem", type=int) + parser.add_argument("-u", "--singularity-uri-base", dest="singularity_uri_base", help="The base for singularity (eg. http://localhost:8080/singularity/v1)") + parser.add_argument("-s", "--start-days", dest="start_days", help="Search for logs no older than this many days", type=int) + parser.add_argument("-e", "--end-days", dest="end_days", help="Search for logs no new than this many days (defaults to None/today)", type=int) + parser.add_argument("-l", "--log-type", dest="logtype", help="Logfile type to downlaod (ie 'access.log'), can be a glob (ie *.log)") + parser.add_argument("-g", "--grep", dest="grep", help="Regex to grep for (normal grep syntax) or a full grep command") args = parser.parse_args(remaining_argv) if args.deployId and not args.requestId: - exit("Must specify requestId (-r) when specifying deployId") + exit("Must specify request-id (-r) when specifying deploy-id") elif not args.requestId and not args.deployId and not args.taskId: - exit('Must specify one of\n -t taskId\n -r requestId and -d deployId\n -r requestId') + exit('Must specify one of\n -t task-id\n -r request-id and -d deploy-id\n -r request-id') args.dest = os.path.expanduser(args.dest) fetch_logs(args) +def cat(): + conf_parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, add_help=False) + conf_parser.add_argument("-f", "--conf-folder", dest="conf_folder", help="specify a folder for config files to live") + conf_parser.add_argument("-c", "--conf-file", dest="conf_file", help="Specify config file within the conf folder", metavar="FILE") + args, remaining_argv = conf_parser.parse_known_args() + conf_dir = args.conf_folder if args.conf_folder else DEFAULT_CONF_DIR + conf_file = os.path.expanduser(conf_dir + '/' + args.conf_file) if args.conf_file else os.path.expanduser(conf_dir + '/' + DEFAULT_CONF_FILE) + config = ConfigParser.SafeConfigParser() + + defaults = { + "num_parallel_fetches" : DEFAULT_PARALLEL_FETCHES, + "chunk_size" : DEFAULT_CHUNK_SIZE, + "dest" : DEFAULT_DEST, + "task_count" : DEFAULT_TASK_COUNT, + "start_days" : DEFAULT_DAYS + } + + try: + config.readfp(FakeSectionHead(open(os.path.expanduser(conf_file)))) + defaults.update(dict(config.items("Defaults"))) + except Exception, err: + sys.stderr.write(CONF_READ_ERR_FORMAT.format(conf_file, err) + '\n') + + parser = argparse.ArgumentParser(parents=[conf_parser], description="Fetch log files from Singularity and cat to stdout. One can specify either a TaskId, RequestId and DeployId, or RequestId", + prog="logcat") + + parser.set_defaults(**defaults) + parser.add_argument("-t", "--task-id", dest="taskId", help="TaskId of task to fetch logs for") + parser.add_argument("-r", "--request-id", dest="requestId", help="RequestId of request to fetch logs for (can be a glob)") + parser.add_argument("-tc","--task-count", dest="taskCount", help="Number of recent tasks per request to fetch logs from") + parser.add_argument("-d", "--deploy-id", dest="deployId", help="DeployId of tasks to fetch logs for (can be a glob)") + parser.add_argument("-o", "--dest", dest="dest", help="Destination directory") + parser.add_argument("-n", "--num-parallel-fetches", dest="num_parallel_fetches", help="Number of fetches to make at once", type=int) + parser.add_argument("-cs", "--chunk-size", dest="chunk_size", help="Chunk size for writing from response to filesystem", type=int) + parser.add_argument("-u", "--singularity-uri-base", dest="singularity_uri_base", help="The base for singularity (eg. http://localhost:8080/singularity/v1)") + parser.add_argument("-s", "--start-days", dest="start_days", help="Search for logs no older than this many days", type=int) + parser.add_argument("-e", "--end-days", dest="end_days", help="Search for logs no new than this many days (defaults to None/today)", type=int) + parser.add_argument("-l", "--logtype", dest="logtype", help="Logfile type to downlaod (ie 'access.log'), can be a glob (ie *.log)") + + args = parser.parse_args(remaining_argv) + + if args.deployId and not args.requestId: + exit("Must specify requestId (-r) when specifying deploy-id") + elif not args.requestId and not args.deployId and not args.taskId: + exit('Must specify one of\n -t task-id\n -r request-id and -d deploy-id\n -r request-id') + + args.dest = os.path.expanduser(args.dest) + + cat_logs(args) + def tail(): conf_parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, add_help=False) - conf_parser.add_argument("-f", "--conf_folder", help="specify a folder for config files to live") - conf_parser.add_argument("-c", "--conf_file", help="Specify config file within the conf folder", metavar="FILE") + conf_parser.add_argument("-f", "--conf-folder", dest="conf_folder", help="specify a folder for config files to live") + conf_parser.add_argument("-c", "--conf-file", dest="conf_file", help="Specify config file within the conf folder", metavar="FILE") args, remaining_argv = conf_parser.parse_known_args() conf_dir = args.conf_folder if args.conf_folder else DEFAULT_CONF_DIR conf_file = os.path.expanduser(conf_dir + '/' + args.conf_file) if args.conf_file else os.path.expanduser(conf_dir + '/' + DEFAULT_CONF_FILE) @@ -105,23 +173,23 @@ def tail(): sys.stderr.write(CONF_READ_ERR_FORMAT.format(conf_file, err) + '\n') parser = argparse.ArgumentParser(parents=[conf_parser], description="Tail log files from Singularity. One can specify either a TaskId, RequestId and DeployId, or RequestId", - prog="log_fetcher") + prog="logtail") parser.set_defaults(**defaults) - parser.add_argument("-t", "--taskId", help="TaskId of task to fetch logs for", metavar="taskId") - parser.add_argument("-r", "--requestId", help="RequestId of request to fetch logs for", metavar="requestId") - parser.add_argument("-d", "--deployId", help="DeployId of task to fetch logs for", metavar="deployId") - parser.add_argument("-u", "--singularity-uri-base", help="The base for singularity (eg. http://localhost:8080/singularity/v1)", metavar="URI") - parser.add_argument("-g", "--grep", help="String to grep for", metavar='grep') - parser.add_argument("-l", "--logfile", help="Logfile path/name to tail (ie 'logs/access.log')", metavar="logfile") - parser.add_argument("-v", "--verbose", help="more verbose output", action='store_true') + parser.add_argument("-t", "--task-id", dest="taskId", help="TaskId of task to fetch logs for") + parser.add_argument("-r", "--request-id", dest="requestId", help="RequestId of request to fetch logs for (can be a glob)") + parser.add_argument("-d", "--deploy-id", dest="deployId", help="DeployId of tasks to fetch logs for (can be a glob)") + parser.add_argument("-u", "--singularity-uri-base", dest="singularity_uri_base", help="The base for singularity (eg. http://localhost:8080/singularity/v1)") + parser.add_argument("-g", "--grep", dest="grep", help="String to grep for") + parser.add_argument("-l", "--logfile", dest="logfile", help="Logfile path/name to tail (ie 'logs/access.log')") + parser.add_argument("-v", "--verbose", dest="verbose", help="more verbose output", action='store_true') args = parser.parse_args(remaining_argv) if args.deployId and not args.requestId: - exit("Must specify requestId (-r) when specifying deployId") + exit("Must specify request-id (-r) when specifying deploy-id") elif not args.requestId and not args.deployId and not args.taskId: - exit('Must specify one of\n -t taskId\n -r requestId and -d deployId\n -r requestId') + exit('Must specify one of\n -t task-id\n -r request-id and -d deploy-id\n -r request-id') elif not args.logfile: exit("Must specify logfile to tail (-l)") diff --git a/scripts/logfetch/grep.py b/scripts/logfetch/grep.py index 7f0356cc85..087dfc993a 100644 --- a/scripts/logfetch/grep.py +++ b/scripts/logfetch/grep.py @@ -2,18 +2,21 @@ import sys from termcolor import colored -GREP_COMMAND_FORMAT = 'xargs -n {0} {1} < {2}' +GREP_COMMAND_FORMAT = 'while read file; do {0} "$file"; done < {1}' DEFAULT_GREP_COMMAND = 'grep --color=always \'{0}\'' def grep_files(args, all_logs): if args.grep: - greplist_filename = '{0}/.greplist'.format(args.dest) - create_greplist(args, all_logs, greplist_filename) - command = grep_command(args, all_logs, greplist_filename) - sys.stderr.write(colored('Running "{0}" this might take a minute'.format(command), 'blue') + '\n') - sys.stdout.write(os.popen(command).read() + '\n') - remove_greplist(greplist_filename) - sys.stderr.write(colored('Finished grep, exiting', 'green') + '\n') + if all_logs: + greplist_filename = '{0}/.greplist'.format(args.dest) + create_greplist(args, all_logs, greplist_filename) + command = grep_command(args, all_logs, greplist_filename) + sys.stderr.write(colored('Running "{0}" this might take a minute'.format(command), 'blue') + '\n') + sys.stdout.write(os.popen(command).read() + '\n') + remove_greplist(greplist_filename) + sys.stderr.write(colored('Finished grep, exiting', 'green') + '\n') + else: + sys.stderr.write(colored('No logs found\n', 'magenta')) def create_greplist(args, all_logs, greplist_filename): greplist_file = open(greplist_filename, 'wb') @@ -27,6 +30,6 @@ def remove_greplist(greplist_filename): def grep_command(args, all_logs, greplist_filename): if 'grep' in args.grep: - return GREP_COMMAND_FORMAT.format(len(all_logs), args.grep, greplist_filename) + return GREP_COMMAND_FORMAT.format(args.grep, greplist_filename) else: - return GREP_COMMAND_FORMAT.format(len(all_logs), DEFAULT_GREP_COMMAND.format(args.grep), greplist_filename) + return GREP_COMMAND_FORMAT.format(DEFAULT_GREP_COMMAND.format(args.grep), greplist_filename) diff --git a/scripts/logfetch/live_logs.py b/scripts/logfetch/live_logs.py index 37e2635502..213c6c17d8 100644 --- a/scripts/logfetch/live_logs.py +++ b/scripts/logfetch/live_logs.py @@ -1,7 +1,6 @@ -import os import sys +import fnmatch import grequests -from glob import glob from termcolor import colored from callbacks import generate_callback from singularity_request import get_json_response @@ -15,50 +14,50 @@ def download_live_logs(args): async_requests = [] zipped_files = [] all_logs = [] - sys.stderr.write(colored('Removing old service.log files', 'blue') + '\n') - for f in glob('{0}/*service.log'.format(args.dest)): - os.remove(f) - sys.stderr.write(colored('Downloading current live log files', 'blue') + '\n') + sys.stderr.write(colored('Finding current live log files', 'cyan') + '\n') for task in tasks: metadata = files_json(args, task) - uri = DOWNLOAD_FILE_FORMAT.format(metadata['slaveHostname']) - service_log = '{0}-service.log'.format(task) - tail_log = '{0}-tail_of_finished_service.log'.format(task) - async_requests.append( - grequests.AsyncRequest('GET',uri , - callback=generate_callback(uri, args.dest, service_log, args.chunk_size), - params={'path' : '{0}/{1}/service.log'.format(metadata['fullPathToRoot'], metadata['currentDirectory'])} - ) - ) - all_logs.append('{0}/{1}'.format(args.dest, service_log)) - async_requests.append( - grequests.AsyncRequest('GET',uri , - callback=generate_callback(uri, args.dest, tail_log, args.chunk_size), - params={'path' : '{0}/{1}/tail_of_finished_service.log'.format(metadata['fullPathToRoot'], metadata['currentDirectory'])} - ) - ) - all_logs.append('{0}/{1}'.format(args.dest, service_log)) - for log_file in logs_folder_files(args, task): - logfile_name = '{0}-{1}'.format(task, log_file) - async_requests.append( - grequests.AsyncRequest('GET',uri , - callback=generate_callback(uri, args.dest, logfile_name, args.chunk_size), - params={'path' : '{0}/{1}/logs/{1}'.format(metadata['fullPathToRoot'], metadata['currentDirectory'], log_file)} - ) - ) - if logfile_name.endswith('.gz'): - zipped_files.append('{0}/{1}'.format(args.dest, logfile_name)) - all_logs.append('{0}/{1}'.format(args.dest, logfile_name.replace('.gz', '.log'))) + if 'slaveHostname' in metadata: + uri = DOWNLOAD_FILE_FORMAT.format(metadata['slaveHostname']) + for log_file in base_directory_files(args, task, metadata): + logfile_name = '{0}-{1}'.format(task, log_file) + if (args.logtype and logfetch_base.log_matches(log_file, args.logtype)) or not args.logtype: + async_requests.append( + grequests.AsyncRequest('GET',uri , + callback=generate_callback(uri, args.dest, logfile_name, args.chunk_size), + params={'path' : '{0}/{1}/{2}'.format(metadata['fullPathToRoot'], metadata['currentDirectory'], log_file)} + ) + ) + if logfile_name.endswith('.gz'): + zipped_files.append('{0}/{1}'.format(args.dest, logfile_name)) + all_logs.append('{0}/{1}'.format(args.dest, logfile_name.replace('.gz', '.log'))) - grequests.map(async_requests, stream=True, size=args.num_parallel_fetches) - logfetch_base.unpack_logs(zipped_files) + for log_file in logs_folder_files(args, task): + logfile_name = '{0}-{1}'.format(task, log_file) + if (args.logtype and logfetch_base.log_matches(log_file, args.logtype)) or not args.logtype: + async_requests.append( + grequests.AsyncRequest('GET',uri , + callback=generate_callback(uri, args.dest, logfile_name, args.chunk_size), + params={'path' : '{0}/{1}/logs/{2}'.format(metadata['fullPathToRoot'], metadata['currentDirectory'], log_file)} + ) + ) + if logfile_name.endswith('.gz'): + zipped_files.append('{0}/{1}'.format(args.dest, logfile_name)) + all_logs.append('{0}/{1}'.format(args.dest, logfile_name.replace('.gz', '.log'))) + + if async_requests: + sys.stderr.write(colored('Starting live logs downloads\n', 'cyan')) + grequests.map(async_requests, stream=True, size=args.num_parallel_fetches) + if zipped_files: + sys.stderr.write(colored('Unpacking logs\n', 'cyan')) + logfetch_base.unpack_logs(zipped_files) return all_logs def tasks_to_check(args): if args.taskId: return [args.taskId] else: - return logfetch_base.tasks_for_request(args) + return logfetch_base.tasks_for_requests(args) def files_json(args, task): uri = BROWSE_FOLDER_FORMAT.format(logfetch_base.base_uri(args), task) @@ -72,3 +71,17 @@ def logs_folder_files(args, task): return [f['name'] for f in files if logfetch_base.is_in_date_range(args, f['mtime'])] else: return [f['path'].rsplit('/')[-1] for f in files_json if logfetch_base.is_in_date_range(args, f['mtime'])] + +def base_directory_files(args, task, files_json): + if 'files' in files_json: + files = files_json['files'] + return [f['name'] for f in files if valid_logfile(args, f)] + else: + return [f['path'].rsplit('/')[-1] for f in files_json if valid_logfile(args, f)] + +def valid_logfile(args, fileData): + is_in_range = logfetch_base.is_in_date_range(args, fileData['mtime']) + not_a_directory = not fileData['mode'].startswith('d') + is_a_logfile = fnmatch.fnmatch(fileData['name'], '*.log') or fnmatch.fnmatch(fileData['name'], '*.out') or fnmatch.fnmatch(fileData['name'], '*.err') + return is_in_range and not_a_directory and is_a_logfile + diff --git a/scripts/logfetch/logfetch_base.py b/scripts/logfetch/logfetch_base.py index d4af0e9da4..e05ca005db 100644 --- a/scripts/logfetch/logfetch_base.py +++ b/scripts/logfetch/logfetch_base.py @@ -1,11 +1,13 @@ import os import sys import gzip +import fnmatch from datetime import datetime from termcolor import colored from singularity_request import get_json_response BASE_URI_FORMAT = '{0}{1}' +ALL_REQUESTS = '/requests' REQUEST_TASKS_FORMAT = '/history/request/{0}/tasks' ACTIVE_TASKS_FORMAT = '/history/request/{0}/tasks/active' @@ -20,8 +22,10 @@ def unpack_logs(logs): file_out.close() file_in.close os.remove(zipped_file) - sys.stderr.write(colored('Unpacked {0}'.format(zipped_file), 'green') + '\n') + sys.stderr.write(colored('Unpacked ', 'green') + colored(zipped_file, 'white') + '\n') except: + if os.path.isfile(zipped_file): + os.remove(zipped_file) sys.stderr.write(colored('Could not unpack {0}'.format(zipped_file), 'red') + '\n') continue @@ -29,23 +33,27 @@ def base_uri(args): if not args.singularity_uri_base: exit("Specify a base uri for Singularity (-u)") uri_prefix = "" if args.singularity_uri_base.startswith(("http://", "https://")) else "http://" - uri = BASE_URI_FORMAT.format(uri_prefix, args.singularity_uri_base) - return uri + return BASE_URI_FORMAT.format(uri_prefix, args.singularity_uri_base) -def tasks_for_request(args): - if args.requestId and args.deployId: - tasks = [task["taskId"]["id"] for task in all_tasks_for_request(args) if (task["taskId"]["deployId"] == args.deployId)] - else: - tasks = [task["taskId"]["id"] for task in all_tasks_for_request(args)] - if hasattr(args, 'task_count'): - tasks = tasks[0:args.task_count] - return tasks +def tasks_for_requests(args): + all_tasks = [] + for request in all_requests(args): + if args.requestId and args.deployId: + tasks = [task["taskId"]["id"] for task in all_tasks_for_request(args, request) if log_matches(task["taskId"]["deployId"], args.deployId)] + else: + tasks = [task["taskId"]["id"] for task in all_tasks_for_request(args, request)] + tasks = tasks[0:args.task_count] if hasattr(args, 'task_count') else tasks + all_tasks = all_tasks + tasks + return all_tasks + +def log_matches(inputString, pattern): + return fnmatch.fnmatch(inputString, pattern) or fnmatch.fnmatch(inputString, pattern + '*.gz') -def all_tasks_for_request(args): - uri = '{0}{1}'.format(base_uri(args), ACTIVE_TASKS_FORMAT.format(args.requestId)) +def all_tasks_for_request(args, request): + uri = '{0}{1}'.format(base_uri(args), ACTIVE_TASKS_FORMAT.format(request)) active_tasks = get_json_response(uri) if hasattr(args, 'start_days'): - uri = '{0}{1}'.format(base_uri(args), REQUEST_TASKS_FORMAT.format(args.requestId)) + uri = '{0}{1}'.format(base_uri(args), REQUEST_TASKS_FORMAT.format(request)) historical_tasks = get_json_response(uri) if len(historical_tasks) == 0: return active_tasks @@ -56,16 +64,18 @@ def all_tasks_for_request(args): else: return active_tasks +def all_requests(args): + uri = '{0}{1}'.format(base_uri(args), ALL_REQUESTS) + requests = get_json_response(uri) + included_requests = [] + for request in requests: + if fnmatch.fnmatch(request['request']['id'], args.requestId): + included_requests.append(request['request']['id']) + return included_requests + def is_in_date_range(args, timestamp): timedelta = datetime.utcnow() - datetime.utcfromtimestamp(timestamp) if args.end_days: - if timedelta.days > args.start_days or timedelta.days <= args.end_days: - return False - else: - return True + return False if timedelta.days > args.start_days or timedelta.days <= args.end_days else True else: - if timedelta.days > args.start_days: - return False - else: - return True - + return False if timedelta.days > args.start_days else True diff --git a/scripts/logfetch/s3_logs.py b/scripts/logfetch/s3_logs.py index ec813fbf4b..cc51927712 100644 --- a/scripts/logfetch/s3_logs.py +++ b/scripts/logfetch/s3_logs.py @@ -2,69 +2,54 @@ import sys import re import grequests -from datetime import datetime -from termcolor import colored - import logfetch_base -from singularity_request import get_json_response +from termcolor import colored from callbacks import generate_callback +from singularity_request import get_json_response TASK_FORMAT = '/task/{0}' -DEPLOY_FORMAT = '/request/{0}/deploy/{1}' -REQUEST_FORMAT = '/request/{0}' S3LOGS_URI_FORMAT = '{0}/logs{1}' def download_s3_logs(args): - sys.stderr.write(colored('Checking for S3 log files', 'blue') + '\n') - logs = get_json_response(singularity_s3logs_uri(args)) + sys.stderr.write(colored('Checking for S3 log files', 'cyan') + '\n') + logs = logs_for_all_requests(args) async_requests = [] all_logs = [] for log_file in logs: filename = log_file['key'].rsplit("/", 1)[1] - full_log_path = '{0}/{1}'.format(args.dest, filename.replace('.gz', '.log')) - full_gz_path = '{0}/{1}'.format(args.dest, filename) - if in_date_range(args, filename): - if not (os.path.isfile(full_log_path) or os.path.isfile(full_gz_path)): + if logfetch_base.is_in_date_range(args, time_from_filename(filename)): + if not already_downloaded(args.dest, filename): async_requests.append( - grequests.AsyncRequest('GET', log_file['getUrl'], - callback=generate_callback(log_file['getUrl'], args.dest, filename, args.chunk_size) - ) + grequests.AsyncRequest('GET', log_file['getUrl'], callback=generate_callback(log_file['getUrl'], args.dest, filename, args.chunk_size)) ) all_logs.append('{0}/{1}'.format(args.dest, filename.replace('.gz', '.log'))) - grequests.map(async_requests, stream=True, size=args.num_parallel_fetches) + if async_requests: + sys.stderr.write(colored('Starting S3 Downloads', 'cyan')) + grequests.map(async_requests, stream=True, size=args.num_parallel_fetches) zipped_files = ['{0}/{1}'.format(args.dest, log_file['key'].rsplit("/", 1)[1]) for log_file in logs] + sys.stderr.write(colored('Unpacking S3 logs\n', 'cyan')) logfetch_base.unpack_logs(zipped_files) - sys.stderr.write(colored('All S3 logs up to date', 'blue') + '\n') + sys.stderr.write(colored('All S3 logs up to date', 'cyan') + '\n') return all_logs -def in_date_range(args, filename): - timedelta = datetime.utcnow() - time_from_filename(filename) - if args.end_days: - if timedelta.days > args.start_days or timedelta.days <= args.end_days: - return False - else: - return True +def already_downloaded(dest, filename): + return (os.path.isfile('{0}/{1}'.format(dest, filename.replace('.gz', '.log'))) or os.path.isfile('{0}/{1}'.format(dest, filename))) + +def logs_for_all_requests(args): + if args.taskId: + return get_json_response(singularity_s3logs_uri(args, args.taskId)) else: - if timedelta.days > args.start_days: - return False - else: - return True + tasks = logfetch_base.tasks_for_requests(args) + logs = [] + for task in tasks: + s3_logs = get_json_response(singularity_s3logs_uri(args, task)) + logs = logs + s3_logs if s3_logs else logs + return logs def time_from_filename(filename): time_string = re.search('(\d{13})', filename).group(1) - return datetime.utcfromtimestamp(int(time_string[0:-3])) - - -def singularity_s3logs_uri(args): - if args.taskId: - singularity_path = TASK_FORMAT.format(args.taskId) - elif args.deployId and args.requestId: - singularity_path = DEPLOY_FORMAT.format(args.requestId, args.deployId) - elif args.requestId: - singularity_path = REQUEST_FORMAT.format(args.requestId) - else: - exit("Specify one of taskId, requestId and deployId, or requestId") - singularity_uri = S3LOGS_URI_FORMAT.format(logfetch_base.base_uri(args), singularity_path) + return int(time_string[0:-3]) - return singularity_uri +def singularity_s3logs_uri(args, idString): + return S3LOGS_URI_FORMAT.format(logfetch_base.base_uri(args), TASK_FORMAT.format(idString)) diff --git a/scripts/logfetch/singularity_request.py b/scripts/logfetch/singularity_request.py index 9e3bf815de..029917e93c 100644 --- a/scripts/logfetch/singularity_request.py +++ b/scripts/logfetch/singularity_request.py @@ -1,5 +1,6 @@ import sys import requests +from termcolor import colored ERROR_STATUS_FORMAT = 'Singularity responded with an invalid status code ({0})' @@ -8,6 +9,6 @@ def get_json_response(uri, params={}): if singularity_response.status_code < 199 or singularity_response.status_code > 299: sys.stderr.write(uri + '\n') sys.stderr.write(str(params) + '\n') - exit(ERROR_STATUS_FORMAT.format(singularity_response.status_code)) + sys.stderr.write(colored(ERROR_STATUS_FORMAT.format(singularity_response.status_code), 'red') + '\n') return singularity_response.json() diff --git a/scripts/logfetch/tail.py b/scripts/logfetch/tail.py index 1e62e7875c..df50a90a50 100644 --- a/scripts/logfetch/tail.py +++ b/scripts/logfetch/tail.py @@ -12,13 +12,13 @@ def start_tail(args): if args.requestId: sys.stderr.write('Fetching tasks\n') - tasks = [str(t) for t in logfetch_base.tasks_for_request(args)] + tasks = [str(t) for t in logfetch_base.tasks_for_requests(args)] else: tasks = [args.taskId] if args.verbose: sys.stderr.write(colored('Tailing logs for tasks:\n', 'green')) for t in tasks: - sys.stderr.write(colored('{0}\n'.format(t), 'blue')) + sys.stderr.write(colored('{0}\n'.format(t), 'yellow')) sys.stderr.write(colored('ctrl+c to exit\n', 'cyan')) try: threads = [] @@ -31,7 +31,7 @@ def start_tail(args): if not t.isAlive: break except KeyboardInterrupt: - sys.stderr.write(colored('Stopping tail', 'cyan')) + sys.stderr.write(colored('Stopping tail', 'magenta')) sys.exit(0) class LogStreamer(threading.Thread): @@ -73,6 +73,7 @@ def fetch_new_log_data(self, uri, path, offset, args, task): if args.grep: params['grep'] = args.grep response = requests.get(uri, params=params).json() - prefix = '({0}) =>'.format(task) if args.verbose else '' - sys.stdout.write('{0}{1}\n'.format(colored(prefix, 'blue'), response['data'])) + prefix = '({0}) =>\n'.format(task) if args.verbose else '' + if response['data'] != '': + sys.stdout.write('{0}{1}'.format(colored(prefix, 'cyan'), response['data'])) return offset + len(response['data'].encode('utf-8')) diff --git a/scripts/setup.py b/scripts/setup.py index 4c886da43e..57c807db49 100644 --- a/scripts/setup.py +++ b/scripts/setup.py @@ -10,7 +10,7 @@ setup( name='singularity-logfetch', - version='0.0.8', + version='0.0.9', description='Singularity log fetching and searching', author="HubSpot", author_email='singularity-users@googlegroups.com', @@ -22,7 +22,8 @@ entry_points={ 'console_scripts':[ 'logfetch=logfetch.entrypoint:fetch', - 'logtail=logfetch.entrypoint:tail' + 'logtail=logfetch.entrypoint:tail', + 'logcat=logfetch.entrypoint:cat' ], } )