-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathwfsequencer.py
181 lines (134 loc) · 5.91 KB
/
wfsequencer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
#! /usr/bin/env python
"""
#
#
#
"""
import os
import sys
import json
import collections
#
# implement rules sequence
#
class sequencer(object):
# init
def __init__(self, config, log, irods, mongo, WFcollector, dublinCore):
print("sequencer ")
#
# load ruleMap from file
cfg_dir = os.path.dirname(os.path.realpath(__file__))
with open(os.path.join(cfg_dir, 'ruleMap.json'), "r") as rlmp:
self.ruleMap = json.load(rlmp)
self.config = config
self.log = log
self.irods = irods
self.mongo = mongo
self.WFcollector = WFcollector
self.dublinCore = dublinCore
self.steps_definition = collections.OrderedDict(sorted(self.ruleMap['SEQUENCE'].items()))
#..................................... iREG_INGESTION -
#
# Exec Proc: Register Digital objects into iRODS
#
def register(self):
self.log.info("iREG on iRODS of : "+self.digitObjProperty['file'])
try:
self.irods.doRegister( self.digitObjProperty['dirname'], self.digitObjProperty['collname'], self.digitObjProperty['filename'])
except Exception as ex:
self.log.error("Could not execute a doRegister ")
self.log.error(ex)
pass
#..................................... TEST_RULE -
#
# Exec Rule: test rule execution w/o params (called directly self.digitObjProperty['file'].r)
#
def testRule(self):
# @TODO: fix this:
#rule_path = '/var/lib/irods/myrules/source_final/eudatGetV.r'
rule_path = self.ruleMap['RULE_PATHS']['TEST_RULE']
self.log.info("exec TEST rule on : "+self.digitObjProperty['file'])
try:
myvalue = self.irods._ruleExec(rule_path)
except Exception as ex:
self.log.error("Could not execute a rule")
self.log.error(ex)
pass
#return myvalue
#..................................... PID -
#
# Exec Rule: Make a PID and register into EPIC
#
def PidRule(self):
# rule execution w params (called w rule-body, params, and output -must-)
self.log.info("call PID rule on : "+self.digitObjProperty['file'])
# make a pid
retValue = self.irods.rulePIDsingle( self.digitObjProperty['object_path'], self.ruleMap['RULE_PATHS']['PID'])
#print (retValue)
#return retValue
#..................................... REPLICATION -
#
# Exec Rule: DO a Remote Replica
#
def ReplicationRule(self):
self.log.info("call REPLICATION rule on self.digitObjProperty['file'] : "+self.digitObjProperty['file'])
self.log.info("call REP rule object_path : "+self.digitObjProperty['object_path'])
self.log.info("call REP rule target_path : "+self.digitObjProperty['target_path'])
# make a replica
retValue = self.irods.ruleReplication(self.digitObjProperty['object_path'], self.digitObjProperty['target_path'], self.ruleMap['RULE_PATHS']['REPLICA'])
#return retValue
#..................................... REGISTRATION_REPLICA -
#
# Exec Rule: Registration of Remote PID into local ICAT
#
def RegistrationRule(self):
self.log.info("call REGISTRATION_REPLICA rule on : "+self.digitObjProperty['file'])
# make a registration
retValue = self.irods.ruleRegistration( self.digitObjProperty['object_path'], self.digitObjProperty['target_path'], self.ruleMap['RULE_PATHS']['REGISTER'])
#return retValue
#..................................... DUBLINCORE_META -
#
# Exec Proc: Store DublinCore metadata into mongo WF_CATALOG
#
def DublinCoreMeta(self):
self.log.info("call process DUBLIN CORE meta of : "+self.digitObjProperty['file'])
try:
self.dublinCore.processDCmeta(self.mongo, self.irods, self.digitObjProperty['collname'], self.digitObjProperty['start_time'], self.digitObjProperty['file'], self.digitObjProperty['datastations'])
self.log.info(" DUBLIN CORE for digitalObject: "+self.digitObjProperty['object_path']+" is: OK" )
except Exception as ex:
self.log.error("Could not process DublinCore metadata")
self.log.error(ex)
pass
#..................................... WFCATALOG_META -
#
# Exec Proc: Store WF_CATALOG metadata into mongo WF_CATALOG
#
def WFCatalogMeta(self):
self.log.info("called collect WF CATALOG METADATA of : "+self.digitObjProperty['file'])
try:
self.WFcollector.collectMetadata(self.digitObjProperty['file'])
self.log.info(" WF METADATA for digitalObject: "+self.digitObjProperty['object_path']+" is: OK" )
except Exception as ex:
self.log.error("Could not compute WF metadata")
self.log.error(ex)
pass
#
# run entire Sequence
#
def doSequence(self, digitObjProperty):
# load current property
self.digitObjProperty = digitObjProperty
self.log.info(" --- --- START SEQUENCE FOR : "+self.digitObjProperty['file']+"\n")
# Log info for each file processed
self.log.info( "collname: " + digitObjProperty["collname"])
self.log.info( "dirname: "+ digitObjProperty["dirname"])
self.log.info( "filename: "+ digitObjProperty["filename"])
# for each step apply rule
for step in self.steps_definition:
try:
self.log.info(self.ruleMap['RULE_MAP'][self.steps_definition[step]])
getattr(self, self.ruleMap['RULE_MAP'][self.steps_definition[step]])()
except Exception as ex:
self.log.error(" Sequence error, could not execute rule: "+self.steps_definition[step])
self.log.error(ex)
pass