Skip to content

Commit

Permalink
faster logfetch, ditch unpack step
Browse files Browse the repository at this point in the history
  • Loading branch information
ssalinas committed Feb 26, 2016
1 parent 2cdee1d commit 972fda6
Show file tree
Hide file tree
Showing 8 changed files with 98 additions and 165 deletions.
27 changes: 17 additions & 10 deletions scripts/logfetch/cat.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
import os
import sys
import subprocess
from logfetch_base import log
from termcolor import colored

def cat_files(args, all_logs):
if all_logs:
all_logs.sort()
for log in all_logs:
if not args.silent:
sys.stderr.write(colored(log, 'cyan') + '\n')
command = 'cat {0}'.format(log)
sys.stdout.write(os.popen(command).read() + '\n')
else:
if not args.silent:
sys.stderr.write(colored('No log files found\n', 'magenta'))
log('\n', args, False)
if all_logs:
all_logs.sort()
for filename in all_logs:
log('=> ' + colored(filename, 'cyan') + '\n', args, False)
if filename.endswith('.gz'):
cat = subprocess.Popen(['cat', filename], stdout=subprocess.PIPE)
content = subprocess.Popen(['zcat'], stdin=cat.stdout)
content.communicate()
else:
cat = subprocess.Popen(['cat', filename])
cat.communicate()
sys.stdout.write('\n')
else:
log(colored('No log files found\n', 'magenta'), args, False)
41 changes: 24 additions & 17 deletions scripts/logfetch/grep.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,38 @@
import os
import sys
import subprocess
from logfetch_base import log
from termcolor import colored

GREP_COMMAND_FORMAT = '{0} {1}'
DEFAULT_GREP_COMMAND = 'grep --color=always \'{0}\''

def grep_files(args, all_logs):
if not args.silent:
sys.stderr.write('\n')
log('\n', args, False)
if args.grep:
if all_logs:
all_logs.sort()
for log in all_logs:
command = grep_command(args, log)
output = os.popen(command).read()
if output is not None and output != '':
if not args.silent:
sys.stderr.write(colored(log, 'cyan') + '\n')
sys.stdout.write(output)

if not args.silent:
sys.stderr.write(colored('Finished grep, exiting', 'green') + '\n')
grep_cmd = grep_command(args)
log(colored('\nRunning grep command ({0})\n'.format(grep_cmd), 'cyan'), args, False)
for filename in all_logs:
log('=> ' + colored(filename, 'cyan') + '\n', args, True)
content = subprocess.Popen(['cat', filename], stdout=subprocess.PIPE)
if filename.endswith('.gz'):
zcat = subprocess.Popen('zcat', stdin=content.stdout, stdout=subprocess.PIPE)
grep = subprocess.Popen(grep_cmd, stdin=zcat.stdout, shell=True)
else:
grep = subprocess.Popen(grep_cmd, stdin=content.stdout, shell=True)
grep.communicate()
log(colored('Finished grep, exiting', 'green') + '\n', args, False)
else:
sys.stderr.write(colored('No logs found\n', 'magenta'))

def grep_command(args, filename):
def grep_command(args):
if 'grep' in args.grep:
return GREP_COMMAND_FORMAT.format(args.grep, filename)
return args.grep
else:
return DEFAULT_GREP_COMMAND.format(args.grep)

def cat_command(filename):
if filename.endswith('.gz'):
return 'zcat {0}'.format(filename)
else:
return GREP_COMMAND_FORMAT.format(DEFAULT_GREP_COMMAND.format(args.grep), filename)
return 'cat {0}'.format(filename)
56 changes: 16 additions & 40 deletions scripts/logfetch/live_logs.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,18 @@
import os
import sys
import fnmatch
import grequests
from termcolor import colored
import callbacks
from singularity_request import get_json_response
import logfetch_base
from termcolor import colored

DOWNLOAD_FILE_FORMAT = 'http://{0}:5051/files/download.json'
BROWSE_FOLDER_FORMAT = '{0}/sandbox/{1}/browse'
TASK_HISTORY_FORMAT = '{0}/history/task/{1}'

def download_live_logs(args):
if not args.silent:
sys.stderr.write(colored('Finding current live log files', 'cyan') + '\n')
logfetch_base.log(colored('Finding current live log files', 'cyan') + '\n', args, False)
tasks = tasks_to_check(args)
async_requests = []
zipped_files = []
all_logs = []
callbacks.progress = 0
tasks_check_progress = 0
Expand All @@ -25,8 +21,6 @@ def download_live_logs(args):
metadata = files_json(args, task)
if 'slaveHostname' in metadata:
uri = DOWNLOAD_FILE_FORMAT.format(metadata['slaveHostname'])
if args.verbose and not args.silent:
sys.stderr.write(colored('Finding logs in base directory on {0}'.format(metadata['slaveHostname']), 'magenta') + '\n')
for log_file in base_directory_files(args, task, metadata):
logfile_name = '{0}-{1}'.format(task, log_file)
if not args.logtype or (args.logtype and logfetch_base.log_matches(log_file, args.logtype.replace('logs/', ''))):
Expand All @@ -38,15 +32,9 @@ def download_live_logs(args):
headers=args.headers
)
)
if logfile_name.endswith('.gz'):
zipped_files.append('{0}/{1}'.format(args.dest, logfile_name))
else:
all_logs.append('{0}/{1}'.format(args.dest, logfile_name.replace('.gz', '.log')))
elif args.logtype and args.verbose and not args.silent:
sys.stderr.write(colored('Excluding log {0}, doesn\'t match {1}'.format(log_file, args.logtype), 'magenta') + '\n')

if args.verbose and not args.silent:
sys.stderr.write(colored('Finding logs in logs directory on {0}'.format(metadata['slaveHostname']), 'magenta') + '\n')
all_logs.append('{0}/{1}'.format(args.dest, logfile_name))
elif args.logtype:
logfetch_base.log(colored('Excluding log {0}, doesn\'t match {1}'.format(log_file, args.logtype), 'magenta') + '\n', args, True)
for log_file in logs_folder_files(args, task):
logfile_name = '{0}-{1}'.format(task, log_file)
if not args.logtype or (args.logtype and logfetch_base.log_matches(log_file, args.logtype.replace('logs/', ''))):
Expand All @@ -58,24 +46,16 @@ def download_live_logs(args):
headers=args.headers
)
)
if logfile_name.endswith('.gz'):
zipped_files.append('{0}/{1}'.format(args.dest, logfile_name))
else:
all_logs.append('{0}/{1}'.format(args.dest, logfile_name.replace('.gz', '.log')))
elif args.logtype and args.verbose and not args.silent:
sys.stderr.write(colored('Excluding log {0}, doesn\'t match {1}'.format(log_file, args.logtype), 'magenta') + '\n')
all_logs.append('{0}/{1}'.format(args.dest, logfile_name))
elif args.logtype:
logfetch_base.log(colored('Excluding log {0}, doesn\'t match {1}'.format(log_file, args.logtype), 'magenta') + '\n', args, True)
tasks_check_progress += 1
logfetch_base.update_progress_bar(tasks_check_progress, tasks_check_goal, 'Log Finder', args.silent)

if async_requests:
if not args.silent:
sys.stderr.write(colored('\nStarting {0} live logs downloads\n'.format(len(async_requests)), 'cyan'))
logfetch_base.log(colored('\nStarting {0} live logs downloads\n'.format(len(async_requests)), 'cyan'), args, False)
callbacks.goal = len(async_requests)
grequests.map(async_requests, stream=True, size=args.num_parallel_fetches)
if zipped_files and not args.download_only:
if not args.silent:
sys.stderr.write(colored('\nUnpacking {0} log(s)\n'.format(len(zipped_files)), 'cyan'))
all_logs = all_logs + logfetch_base.unpack_logs(args, zipped_files)
return all_logs

def tasks_to_check(args):
Expand All @@ -86,7 +66,7 @@ def tasks_to_check(args):

def task_history(args, task):
uri = TASK_HISTORY_FORMAT.format(logfetch_base.base_uri(args), task)
return get_json_response(uri, args)
return logfetch_base.get_json_response(uri, args)

def task_still_running(args, task, history):
try:
Expand All @@ -97,11 +77,11 @@ def task_still_running(args, task, history):

def files_json(args, task):
uri = BROWSE_FOLDER_FORMAT.format(logfetch_base.base_uri(args), task)
return get_json_response(uri, args, {}, True)
return logfetch_base.get_json_response(uri, args, {}, True)

def logs_folder_files(args, task):
uri = BROWSE_FOLDER_FORMAT.format(logfetch_base.base_uri(args), task)
files_json = get_json_response(uri, args, {'path' : '{0}/logs'.format(task)}, True)
files_json = logfetch_base.get_json_response(uri, args, {'path' : '{0}/logs'.format(task)}, True)
if 'files' in files_json:
files = files_json['files']
return [f['name'] for f in files if logfetch_base.is_in_date_range(args, f['mtime'])]
Expand All @@ -123,20 +103,16 @@ def valid_logfile(args, fileData):

def should_download(args, filename, task):
if args.use_cache and already_downloaded(args, filename):
if args.verbose and not args.silent:
sys.stderr.write(colored('Using cached version of file {0}\n'.format(filename), 'magenta'))
logfetch_base.log(colored('Using cached version of file {0}\n'.format(filename), 'magenta'), args, True)
return False
if filename.endswith('.gz') and already_downloaded(args, filename):
if args.verbose and not args.silent:
sys.stderr.write(colored('Using cached version of file {0}, zipped file has not changed\n'.format(filename), 'magenta'))
logfetch_base.log(colored('Using cached version of file {0}, zipped file has not changed\n'.format(filename), 'magenta'), args, True)
return False
history = task_history(args, task)
if not task_still_running(args, task, history) and already_downloaded(args, filename) and file_not_too_old(args, history, filename):
if args.verbose and not args.silent:
sys.stderr.write(colored('Using cached version of file {0}, {1}, file has not changed\n'.format(filename, history['taskUpdates'][-1]['taskState']), 'magenta'))
logfetch_base.log(colored('Using cached version of file {0}, {1}, file has not changed\n'.format(filename, history['taskUpdates'][-1]['taskState']), 'magenta'), args, True)
else:
if args.verbose and not args.silent:
sys.stderr.write(colored('Will download file {0}, version on the server is newer than cached version\n'.format(filename), 'magenta'))
logfetch_base.log(colored('Will download file {0}, version on the server is newer than cached version\n'.format(filename), 'magenta'), args, True)

return True

Expand Down
56 changes: 19 additions & 37 deletions scripts/logfetch/logfetch_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,50 +2,16 @@
import sys
import gzip
import fnmatch
import requests
from datetime import datetime, timedelta
from termcolor import colored
from singularity_request import get_json_response

ERROR_STATUS_FORMAT = 'Singularity responded with an invalid status code ({0})'
BASE_URI_FORMAT = '{0}{1}'
ALL_REQUESTS = '/requests'
REQUEST_TASKS_FORMAT = '/history/request/{0}/tasks'
ACTIVE_TASKS_FORMAT = '/history/request/{0}/tasks/active'

def unpack_logs(args, logs):
successful = []
goal = len(logs)
progress = 0
for zipped_file in logs:
try:
if os.path.isfile(zipped_file):
if args.verbose and not args.silent:
sys.stderr.write(colored('Starting unpack of {0}'.format(zipped_file), 'magenta') + '\n')
file_in = gzip.open(zipped_file, 'rb')
unzipped = zipped_file.replace('.gz', '.log')
file_out = open(unzipped, 'wb')
file_out.write(file_in.read())
file_out.close()
file_in.close
os.remove(zipped_file)
progress += 1
if args.verbose and not args.silent:
sys.stderr.write(colored('Unpacked log {0}/{1}'.format(progress, goal), 'green') + colored(zipped_file, 'white') + '\n')
else:
update_progress_bar(progress, goal, 'Unpack', args.silent)
successful.append(unzipped)
else:
progress += 1
update_progress_bar(progress, goal, 'Unpack', args.silent)
except Exception as e:
print e
if os.path.isfile(zipped_file):
os.remove(zipped_file)
sys.stderr.write(colored('Could not unpack {0}'.format(zipped_file), 'red') + '\n')
continue
if not args.silent:
sys.stderr.write('\n')
return successful

def base_uri(args):
if not args.singularity_uri_base:
exit("Specify a base uri for Singularity (-u)")
Expand All @@ -62,7 +28,7 @@ def tasks_for_requests(args):
tasks = tasks[0:args.task_count] if hasattr(args, 'task_count') else tasks
all_tasks = all_tasks + tasks
if not all_tasks:
sys.stderr.write(colored('No tasks found, check that the request/task you are searching for exists...', 'red'))
log(colored('No tasks found, check that the request/task you are searching for exists...', 'red'), args, False)
exit(1)
return all_tasks

Expand Down Expand Up @@ -116,3 +82,19 @@ def update_progress_bar(progress, goal, progress_type, silent):
if not silent:
sys.stderr.write("\r{0} Progress: [".format(progress_type) + colored("{0}".format(hashes + spaces), color) + "] {0}%".format(int(round(percent * 100))))
sys.stderr.flush()

def log(message, args, verbose):
if (not verbose or (verbose and args.verbose)) and not args.silent:
sys.stderr.write(message)

def get_json_response(uri, args, params={}, skip404ErrMessage=False):
singularity_response = requests.get(uri, params=params, headers=args.headers)
if singularity_response.status_code < 199 or singularity_response.status_code > 299:
if not (skip404ErrMessage and singularity_response.status_code == 404):
log('{0} params:{1}\n'.format(uri, str(params)), args, False)
if not (skip404ErrMessage and singularity_response.status_code == 404):
sys.stderr.write(colored(ERROR_STATUS_FORMAT.format(singularity_response.status_code), 'red') + '\n')
if not (skip404ErrMessage and singularity_response.status_code == 404):
log(colored(singularity_response.text, 'red') + '\n', args, False)
return {}
return singularity_response.json()
39 changes: 12 additions & 27 deletions scripts/logfetch/s3_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import time
from termcolor import colored
import callbacks
from singularity_request import get_json_response

TASK_FORMAT = '/task/{0}'
S3LOGS_URI_FORMAT = '{0}/logs{1}'
Expand All @@ -22,43 +21,30 @@ def download_s3_logs(args):
callbacks.progress = 0
logs = logs_for_all_requests(args)
async_requests = []
zipped_files = []
all_logs = []
for log_file in logs:
filename = log_file['key'].rsplit("/", 1)[1]
if logfetch_base.is_in_date_range(args, int(str(log_file['lastModified'])[0:-3])):
if not args.logtype or log_matches(args, filename):
if args.verbose and not args.silent:
sys.stderr.write(colored('Including log {0}'.format(filename), 'blue') + '\n')
logfetch_base.log(colored('Including log {0}'.format(filename), 'blue') + '\n', args, True)
if not already_downloaded(args.dest, filename):
async_requests.append(
grequests.AsyncRequest('GET', log_file['getUrl'], callback=callbacks.generate_callback(log_file['getUrl'], args.dest, filename, args.chunk_size, args.verbose, args.silent), headers=args.headers)
)
else:
if args.verbose and not args.silent:
sys.stderr.write(colored('Log already downloaded {0}'.format(filename), 'blue') + '\n')
all_logs.append('{0}/{1}'.format(args.dest, filename.replace('.gz', '.log')))
zipped_files.append('{0}/{1}'.format(args.dest, filename))
logfetch_base.log(colored('Log already downloaded {0}'.format(filename), 'blue') + '\n', args, True)
all_logs.append('{0}/{1}'.format(args.dest, filename))
else:
if args.verbose and not args.silent:
sys.stderr.write(colored('Excluding {0} log does not match logtype argument {1}'.format(filename, args.logtype), 'magenta') + '\n')
logfetch_base.log(colored('Excluding {0} log does not match logtype argument {1}'.format(filename, args.logtype), 'magenta') + '\n', args, True)
else:
if args.verbose and not args.silent:
sys.stderr.write(colored('Excluding {0}, not in date range'.format(filename), 'magenta') + '\n')
logfetch_base.log(colored('Excluding {0}, not in date range'.format(filename), 'magenta') + '\n', args, True)
if async_requests:
if not args.silent:
sys.stderr.write(colored('Starting {0} S3 Downloads with {1} parallel fetches\n'.format(len(async_requests), args.num_parallel_fetches), 'cyan'))
logfetch_base.log(colored('Starting {0} S3 Downloads with {1} parallel fetches\n'.format(len(async_requests), args.num_parallel_fetches), 'cyan'), args, False)
callbacks.goal = len(async_requests)
grequests.map(async_requests, stream=True, size=args.num_parallel_fetches)
if not args.silent and not args.download_only:
sys.stderr.write(colored('\nUnpacking {0} S3 log(s)\n'.format(len(async_requests)), 'cyan'))
else:
if not args.silent:
sys.stderr.write(colored('No S3 logs to download\n', 'cyan'))
if not args.download_only:
all_logs = all_logs + logfetch_base.unpack_logs(args, zipped_files)
if not args.silent:
sys.stderr.write(colored('All S3 logs up to date\n', 'cyan'))
logfetch_base.log(colored('No S3 logs to download\n', 'cyan'), args, False)
logfetch_base.log(colored('All S3 logs up to date\n', 'cyan'), args, False)
return all_logs

def already_downloaded(dest, filename):
Expand All @@ -67,21 +53,20 @@ def already_downloaded(dest, filename):
def logs_for_all_requests(args):
s3_params = {'start': int(time.mktime(args.start.timetuple()) * 1000), 'end': int(time.mktime(args.end.timetuple()) * 1000)}
if args.taskId:
return get_json_response(s3_task_logs_uri(args, args.taskId), args, s3_params)
return logfetch_base.get_json_response(s3_task_logs_uri(args, args.taskId), args, s3_params)
else:
tasks = logfetch_base.tasks_for_requests(args)
logs = []
tasks_progress = 0
tasks_goal = len(tasks)
for task in tasks:
s3_logs = get_json_response(s3_task_logs_uri(args, task), args, s3_params)
s3_logs = logfetch_base.get_json_response(s3_task_logs_uri(args, task), args, s3_params)
logs = logs + s3_logs if s3_logs else logs
tasks_progress += 1
logfetch_base.update_progress_bar(tasks_progress, tasks_goal, 'S3 Log Finder', args.silent)
if not args.silent:
sys.stderr.write(colored('\nAlso searching s3 history...\n', 'cyan'))
logfetch_base.log(colored('\nAlso searching s3 history...\n', 'cyan'), args, False)
for request in logfetch_base.all_requests(args):
s3_logs = get_json_response(s3_request_logs_uri(args, request), args, s3_params)
s3_logs = logfetch_base.get_json_response(s3_request_logs_uri(args, request), args, s3_params)
logs = logs + s3_logs if s3_logs else logs
return [dict(t) for t in set([tuple(l.items()) for l in logs])] # remove any duplicates

Expand Down
Loading

0 comments on commit 972fda6

Please # to comment.