-
Notifications
You must be signed in to change notification settings - Fork 23
/
Copy pathadOutputFileProcessor.py
136 lines (120 loc) · 5.74 KB
/
adOutputFileProcessor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#!/usr/bin/env python
'''
AD Output File Processor module.
'''
import os
import stat
import time
from PIL import Image
import pvaccess as pva
from .adImageProcessor import AdImageProcessor
from ..utility.floatWithUnits import FloatWithUnits
from ..utility.intWithUnits import IntWithUnits
class AdOutputFileProcessor(AdImageProcessor):
'''
Streaming framework processor class that can be used for saving Area
Detector images into files. Configuration dictionary should provide
the following settings:\n
\t\\- outputDirectory (str) : defines full path to the output directory\n
\t\\- outputFileNameFormat (str) : defines format to be used for naming output files, e.g. '{uniqueId:06}.{processorId}.tiff'\n
**AdImageProcessor(configDict)**
:Parameter: *configDict* (dict) - dictionary containing configuration parameters
'''
BYTES_IN_MEGABYTE = 1000000
DEFAULT_OUTPUT_DIRECTORY = '.'
DEFAULT_OUTPUT_FILE_NAME_FORMAT = '{uniqueId:06}.{processorId}.tiff'
def __init__(self, configDict={}):
AdImageProcessor.__init__(self,configDict)
self.outputDirectory = configDict.get('outputDirectory', self.DEFAULT_OUTPUT_DIRECTORY )
self.logger.debug('Using output directory: %s', self.outputDirectory)
if not os.path.exists(self.outputDirectory):
self.logger.debug('Creating output directory: %s', self.outputDirectory)
os.makedirs(self.outputDirectory, exist_ok=True)
self.outputFileNameFormat = configDict.get('outputFileNameFormat', self.DEFAULT_OUTPUT_FILE_NAME_FORMAT)
self.logger.debug('Using output file name format: %s', self.outputFileNameFormat)
self.nFilesSaved = 0
self.nBytesSaved = 0
self.fileProcessingTime = 0
def configure(self, configDict):
'''
Method invoked at user initiated runtime configuration changes. It
looks for 'outputDirectory' and 'outputFileNameFormat' in the configuration
dictionary and reconfigures processor behavior according to the specified
values.
:Parameter: *configDict* (dict) - dictionary containing configuration parameters
'''
if 'outputDirectory' in configDict:
outputDirectory = configDict.get('outputDirectory')
self.logger.debug('Reconfigured output directory: %s', outputDirectory)
if not os.path.exists(outputDirectory):
self.logger.debug('Creating output directory: %s', self.outputDirectory)
os.makedirs(outputDirectory, exist_ok=True)
self.outputDirectory = outputDirectory
if 'outputFileNameFormat' in configDict:
self.outputFileNameFormat = configDict.get('outputFileNameFormat')
self.logger.debug('Reconfigured output file name format: %s', self.outputFileNameFormat)
def process(self, pvObject):
'''
Method invoked every time input channel updates its PV record. It reshapes
input NtNdArray object and saves image data into output file.
:Parameter: *pvObject* (NtNdArray) - channel update object
'''
t0 = time.time()
(frameId,imageData,nx,_,_,_,_) = self.reshapeNtNdArray(pvObject)
if not nx:
self.logger.debug('Frame %s is empty', frameId)
return pvObject
filePath = os.path.join(self.outputDirectory, self.outputFileNameFormat)
filePath = filePath.format(frameId=frameId,uniqueId=frameId,objectId=frameId,processorId=self.processorId)
self.logger.debug('Saving frame %s to file %s', frameId, filePath)
im = Image.fromarray(imageData)
im.save(filePath)
self.updateOutputChannel(pvObject)
t1 = time.time()
dt = t1-t0
nBytesSaved = os.stat(filePath)[stat.ST_SIZE]
self.logger.debug('Saved %s bytes (frame %s) to file %s in %.4f seconds', nBytesSaved, frameId, filePath, dt)
self.nFilesSaved += 1
self.nBytesSaved += nBytesSaved
self.fileProcessingTime += dt
return pvObject
def resetStats(self):
'''
Method invoked at user initiated application statistics reset.
It resets total processing time, as well as counters for the
number of files and for the total number of bytes saved.
'''
self.nFilesSaved = 0
self.nBytesSaved = 0
self.fileProcessingTime = 0
def getStats(self):
'''
Method invoked periodically for generating processor statistics (number
of files and bytes saved and corresponding processing/storage rates).
:Returns: Dictionary containing processor statistics parameters
'''
fileProcessingRate = 0
dataStorageRateMBps = 0
if self.fileProcessingTime > 0:
fileProcessingRate = self.nFilesSaved/self.fileProcessingTime
dataStorageRateMBps = self.nBytesSaved/self.fileProcessingTime/self.BYTES_IN_MEGABYTE
return {
'nFilesSaved' : self.nFilesSaved,
'nBytesSaved' : IntWithUnits(self.nBytesSaved, 'B'),
'fileProcessingTime' : FloatWithUnits(self.fileProcessingTime, 's'),
'fileProcessingRate' : FloatWithUnits(fileProcessingRate, 'fps'),
'dataStorageRateMBps' : FloatWithUnits(dataStorageRateMBps, 'MBps')
}
def getStatsPvaTypes(self):
'''
Method invoked at processing startup. It defines processor part
of the status PvObject published on the status PVA channel.
:Returns: Dictionary containing PVA types for the processor statistics parameters
'''
return {
'nFilesSaved' : pva.UINT,
'nBytesSaved' : pva.ULONG,
'fileProcessingTime' : pva.DOUBLE,
'fileProcessingRate' : pva.DOUBLE,
'dataStorageRateMBps' : pva.DOUBLE
}