diff --git a/mcm/json_layer/chained_request.py b/mcm/json_layer/chained_request.py index cc1c01c9..b732e1ce 100644 --- a/mcm/json_layer/chained_request.py +++ b/mcm/json_layer/chained_request.py @@ -450,7 +450,25 @@ def flow_to_next_step(self, input_dataset='', block_black_list=None, block_white #current_campaign -> next_campaign ##determine whether we have an input dataset for the next request - if len(current_request.get_attribute('reqmgr_name')): + if len(current_request.get_attribute('output_dataset')): + input_dataset = current_request.get_attribute('output_dataset')[-1] + + if "use_datatier" in mcm_f.get_attribute('request_parameters'): + ## N.B. this does not function for TaskChain handling without further development + use_datatier=mcm_f.get_attribute('request_parameters')["use_datatier"] + outputs=current_request.get_attribute('output_dataset') + possible = filter(lambda ds : ds.split('/')[-1] == use_datatier, outputs) + if len(possible)!=0: + raise self.ChainedRequestCannotFlowException(self.get_attribute('_id'), + "There isn't a possible corresponding output datatiers (%s) for the requested datatier (%s)" %( map(lambda ds : ds.split('/')[-1], outputs), + use_datatier, + )) + + else: + input_dataset = possible[0] + + elif len(current_request.get_attribute('reqmgr_name')): + ## the later check pre-dates the inclusion of output_dataset as a member of request object last_wma = current_request.get_attribute('reqmgr_name')[-1] if 'content' in last_wma and 'pdmv_dataset_name' in last_wma['content']: input_dataset = last_wma['content']['pdmv_dataset_name'] diff --git a/mcm/json_layer/request.py b/mcm/json_layer/request.py index c498553c..9f1f05b2 100644 --- a/mcm/json_layer/request.py +++ b/mcm/json_layer/request.py @@ -757,9 +757,10 @@ def get_tier(self,i): tiers = s['datatier'] if isinstance(tiers, str): tiers = tiers.split(',') - return tiers + return reversed(tiers) def get_tiers(self): + ## the output is an ordered list of tiers, the last item being the main produced tiers r_tiers=[] keeps = self.get_attribute('keep_output') for (i, s) in enumerate(self.get_attribute('sequences')): @@ -1538,7 +1539,14 @@ def inspect_submitted(self): 'message' : 'No output dataset have been recognized'}) saved = db.save(self.json()) return not_good - ds_for_accounting = collected[0] + ## order according the expected tiers : the last is the more important + ## does not function if two output of the same tier are expected + recollected=[] + for tier in tiers_expected: + recollected.extend( filter( lambda dn : dn.split('/')[-1]==tier, collected) ) + collected=recollected + + ds_for_accounting = collected[-1] ## the last one if the one of importance if not 'pdmv_dataset_statuses' in wma_r_N['content'] or not ds_for_accounting in wma_r_N['content']['pdmv_dataset_statuses']: counted = wma_r_N['content']['pdmv_evts_in_DAS']+ wma_r_N['content']['pdmv_open_evts_in_DAS'] if not counted: diff --git a/mcm/rest_api/ChainedRequestActions.py b/mcm/rest_api/ChainedRequestActions.py index a66a1208..e8eee63b 100644 --- a/mcm/rest_api/ChainedRequestActions.py +++ b/mcm/rest_api/ChainedRequestActions.py @@ -650,7 +650,17 @@ def request_to_tasks( r , base, depend): task_dict.update({"SplittingAlgo" : "EventAwareLumiBased", "InputFromOutputModule" : ts[-1]['output_'], "InputTask" : ts[-1]['TaskName']}) + task_dict['output_'] = "%soutput"%(r.get_attribute('sequences')[si]['eventcontent'][0]) + task_dict['datatiers_'] = r.get_attribute('sequences')[si]['datatier'] + task_dict['outputs_'] = ["%soutput"%(ec) for ec in r.get_attribute('sequences')[si]['eventcontent']] + task_dict['use_datatier_']=None + if r.get_attribute('flown_with'): + fdb = database('flows') + f = fdb.get( r.get_attribute('flown_with') ) + if "use_datatier" in f["request_parameters"]: + task_dict['use_datatier_']=f["request_parameters"]["use_datatier"] + task_dict['priority_'] = r.get_attribute('priority') task_dict['request_type_'] = r.get_wmagent_type() ts.append(task_dict) @@ -704,9 +714,14 @@ def request_to_tasks( r , base, depend): for (r,item) in tasktree.items(): for n in item['next']: + ## do the connecting of tasks outputs tasktree[n]['dict'][0].update({"InputFromOutputModule" : item['dict'][-1]['output_'], - "InputTask" : item['dict'][-1]['TaskName']}) - + "InputTask" : item['dict'][-1]['TaskName']}) + ## do a special treatment if the flow was requested to use a specific datatier + if tasktree[n]['dict'][0]['use_datatier_']: + use_datatier=tasktree[n]['dict'][0]['use_datatier_'] + index_datatier=item['dict'][-1]['datatiers_'].index(use_datatier) + tasktree[n]['dict'][0]["InputFromOutputModule"] = item['dict'][-1]['outputs_'[index_datatier] wma={ "RequestType" : "TaskChain",