Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Suggestion for output-datatier-branching in flow #665

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion mcm/json_layer/chained_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,25 @@ def flow_to_next_step(self, input_dataset='', block_black_list=None, block_white
#current_campaign -> next_campaign

##determine whether we have an input dataset for the next request
if len(current_request.get_attribute('reqmgr_name')):
if len(current_request.get_attribute('output_dataset')):
input_dataset = current_request.get_attribute('output_dataset')[-1]

if "use_datatier" in mcm_f.get_attribute('request_parameters'):
## N.B. this does not function for TaskChain handling without further development
use_datatier=mcm_f.get_attribute('request_parameters')["use_datatier"]
outputs=current_request.get_attribute('output_dataset')
possible = filter(lambda ds : ds.split('/')[-1] == use_datatier, outputs)
if len(possible)!=0:
raise self.ChainedRequestCannotFlowException(self.get_attribute('_id'),
"There isn't a possible corresponding output datatiers (%s) for the requested datatier (%s)" %( map(lambda ds : ds.split('/')[-1], outputs),
use_datatier,
))

else:
input_dataset = possible[0]

elif len(current_request.get_attribute('reqmgr_name')):
## the later check pre-dates the inclusion of output_dataset as a member of request object
last_wma = current_request.get_attribute('reqmgr_name')[-1]
if 'content' in last_wma and 'pdmv_dataset_name' in last_wma['content']:
input_dataset = last_wma['content']['pdmv_dataset_name']
Expand Down
12 changes: 10 additions & 2 deletions mcm/json_layer/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -757,9 +757,10 @@ def get_tier(self,i):
tiers = s['datatier']
if isinstance(tiers, str):
tiers = tiers.split(',')
return tiers
return reversed(tiers)

def get_tiers(self):
## the output is an ordered list of tiers, the last item being the main produced tiers
r_tiers=[]
keeps = self.get_attribute('keep_output')
for (i, s) in enumerate(self.get_attribute('sequences')):
Expand Down Expand Up @@ -1538,7 +1539,14 @@ def inspect_submitted(self):
'message' : 'No output dataset have been recognized'})
saved = db.save(self.json())
return not_good
ds_for_accounting = collected[0]
## order according the expected tiers : the last is the more important
## does not function if two output of the same tier are expected
recollected=[]
for tier in tiers_expected:
recollected.extend( filter( lambda dn : dn.split('/')[-1]==tier, collected) )
collected=recollected

ds_for_accounting = collected[-1] ## the last one if the one of importance
if not 'pdmv_dataset_statuses' in wma_r_N['content'] or not ds_for_accounting in wma_r_N['content']['pdmv_dataset_statuses']:
counted = wma_r_N['content']['pdmv_evts_in_DAS']+ wma_r_N['content']['pdmv_open_evts_in_DAS']
if not counted:
Expand Down
19 changes: 17 additions & 2 deletions mcm/rest_api/ChainedRequestActions.py
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,17 @@ def request_to_tasks( r , base, depend):
task_dict.update({"SplittingAlgo" : "EventAwareLumiBased",
"InputFromOutputModule" : ts[-1]['output_'],
"InputTask" : ts[-1]['TaskName']})

task_dict['output_'] = "%soutput"%(r.get_attribute('sequences')[si]['eventcontent'][0])
task_dict['datatiers_'] = r.get_attribute('sequences')[si]['datatier']
task_dict['outputs_'] = ["%soutput"%(ec) for ec in r.get_attribute('sequences')[si]['eventcontent']]
task_dict['use_datatier_']=None
if r.get_attribute('flown_with'):
fdb = database('flows')
f = fdb.get( r.get_attribute('flown_with') )
if "use_datatier" in f["request_parameters"]:
task_dict['use_datatier_']=f["request_parameters"]["use_datatier"]

task_dict['priority_'] = r.get_attribute('priority')
task_dict['request_type_'] = r.get_wmagent_type()
ts.append(task_dict)
Expand Down Expand Up @@ -704,9 +714,14 @@ def request_to_tasks( r , base, depend):

for (r,item) in tasktree.items():
for n in item['next']:
## do the connecting of tasks outputs
tasktree[n]['dict'][0].update({"InputFromOutputModule" : item['dict'][-1]['output_'],
"InputTask" : item['dict'][-1]['TaskName']})

"InputTask" : item['dict'][-1]['TaskName']})
## do a special treatment if the flow was requested to use a specific datatier
if tasktree[n]['dict'][0]['use_datatier_']:
use_datatier=tasktree[n]['dict'][0]['use_datatier_']
index_datatier=item['dict'][-1]['datatiers_'].index(use_datatier)
tasktree[n]['dict'][0]["InputFromOutputModule"] = item['dict'][-1]['outputs_'[index_datatier]

wma={
"RequestType" : "TaskChain",
Expand Down