-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathcondor_queue_to_es.py
executable file
·103 lines (82 loc) · 3.46 KB
/
condor_queue_to_es.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
#!/usr/bin/env python3
import os
import glob
from argparse import ArgumentParser
import logging
from functools import partial
from rest_tools.client import ClientCredentialsAuth
parser = ArgumentParser('usage: %prog [options] history_files')
parser.add_argument('-a','--address',help='elasticsearch address')
parser.add_argument('-n','--indexname',default='condor',
help='index name (default condor)')
parser.add_argument('--dailyindex', default=False, action='store_true',
help='Index pattern daily')
parser.add_argument("-y", "--dry-run", default=False,
action="store_true",
help="query jobs, but do not ingest into ES",)
parser.add_argument('--collectors', default=False, action='store_true',
help='Args are collector addresses, not files')
parser.add_argument('--client_id',help='oauth2 client id',default=None)
parser.add_argument('--client_secret',help='oauth2 client secret',default=None)
parser.add_argument('--token_url',help='oauth2 realm token url',default=None)
parser.add_argument("positionals", nargs='+')
options = parser.parse_args()
if not options.positionals:
parser.error('no condor history files or collectors')
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(name)s : %(message)s')
import htcondor
from condor_utils import *
# daily index manditory
options.indexname += '-'+now.strftime("%Y.%m.%d")
# key filter
keys = {
'RequestCpus','Requestgpus', 'RequestMemory', 'RequestDisk',
'NumJobStarts', 'NumShadowStarts',
'GlobalJobId', '@timestamp', 'queue_time', 'Owner',
'JobStatus','MATCH_EXP_JOBGLIDEIN_ResourceName',
'IceProdDataset', 'IceProdTaskName'
}
def es_generator(entries):
for data in entries:
add_classads(data)
data = {k:data[k] for k in keys if k in data} # do filtering
data['_index'] = options.indexname
data['_id'] = data['GlobalJobId'].replace('#','-').replace('.','-') + data['@timestamp']
yield data
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from rest_tools.client import ClientCredentialsAuth
prefix = 'http'
address = options.address
if '://' in address:
prefix,address = address.split('://')
url = '{}://{}'.format(prefix, address)
token = None
if None not in (options.token_url, options.client_secret, options.client_id):
api = ClientCredentialsAuth(address='https://elasticsearch.icecube.aq',
token_url=options.token_url,
client_secret=options.client_secret,
client_id=options.client_id)
token = api.make_access_token()
logging.info('connecting to ES at %s',url)
es = Elasticsearch(hosts=[url],
timeout=5000,
bearer_auth=token)
es_import = partial(bulk, es, max_retries=20, initial_backoff=10, max_backoff=3600)
failed = False
if options.collectors:
for coll_address in options.positionals:
try:
gen = es_generator(read_from_collector(coll_address))
success, _ = es_import(gen)
except htcondor.HTCondorIOError as e:
failed = e
logging.error('Condor error', exc_info=True)
else:
for path in options.args:
for filename in glob.iglob(path):
gen = es_generator(read_from_file(filename))
success, _ = es_import(gen)
logging.info('finished processing %s', filename)
if failed:
raise failed