Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[WIP] Mandd/sax #18

Open
wants to merge 14 commits into
base: devel
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 108 additions & 0 deletions src/BasicEventScheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# Copyright 2017 Battelle Energy Alliance, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Created on June 24, 2020

@author: mandd
"""

#External Modules---------------------------------------------------------------
import numpy as np
import xarray as xr
import pandas as pd
#External Modules End-----------------------------------------------------------

#Internal Modules---------------------------------------------------------------
from PluginsBaseClasses.ExternalModelPluginBase import ExternalModelPluginBase
#Internal Modules End-----------------------------------------------------------

class basicEventScheduler(ExternalModelPluginBase):
"""
This class is designed to create a Maintenance Scheduler model
"""
def __init__(self):
"""
Constructor
@ In, None
@ Out, None
"""
ExternalModelPluginBase.__init__(self)

def initialize(self, container, runInfoDict, inputFiles):
"""
Method to initialize the Basic Event Scheduler model
@ In, container, object, self-like object where all the variables can be stored
@ In, runInfoDict, dict, dictionary containing all the RunInfo parameters (XML node <RunInfo>)
@ In, inputFiles, list, list of input files (if any)
@ Out, None
"""

def _readMoreXML(self, container, xmlNode):
"""
Method to read the portion of the XML that belongs to the Basic Event Scheduler model
@ In, container, object, self-like object where all the variables can be stored
@ In, xmlNode, xml.etree.ElementTree.Element, XML node that needs to be read
@ Out, None
"""
container.basicEvents = {}
container.timeSpamID = None

for child in xmlNode:
if child.tag == 'BE':
container.basicEvents[child.text.strip()] = [child.get('tin'),child.get('tfin')]
elif child.tag == 'timeSpamID':
container.timeSpamID = child.text.strip()
else:
raise IOError("basicEventScheduler: xml node " + str(child.tag) + " is not allowed")

def run(self, container, inputs):
"""
This method generate an historySet from the a pointSet which contains initial and final time of the
basic events
@ In, inputDataset, dict, dictionary of inputs from RAVEN
@ In, container, object, self-like object where all the variables can be stored
@ Out, basicEventHistorySet, Dataset, xarray dataset which contains time series for each basic event
"""
if len(inputs) > 2:
raise IOError("basicEventScheduler: More than one file has been passed to the MCS solver")

dataDict = {}
dataDict['tin'] = []
dataDict['tfin'] = []
for key in container.basicEvents.keys():
dataDict['tin'].append(inputs[container.basicEvents[key][0]])
dataDict['tfin'].append(inputs[container.basicEvents[key][1]])

inputDataset = pd.DataFrame.from_dict(dataDict)
timeArray = np.concatenate([inputDataset[container.tInitial],inputDataset[container.tEnd]])
timeArraySorted = np.sort(timeArray,axis=0)
timeArrayCleaned = np.unique(timeArraySorted)

keys = list(container.invMapping.keys())
dataVars={}
for key in keys:
dataVars[key]=(['RAVEN_sample_ID',container.timeID],np.zeros((1,timeArrayCleaned.shape[0])))

basicEventHistorySet = xr.Dataset(data_vars = dataVars,
coords = dict(time=timeArrayCleaned,
RAVEN_sample_ID=np.zeros(1)))

for index,key in enumerate(inputDataset[container.beId].values):
tin = inputDataset[container.tInitial][index].values
tend = inputDataset[container.tEnd][index].values
indexes = np.where(np.logical_and(timeArrayCleaned>tin,timeArrayCleaned<=tend))
basicEventHistorySet[key][0][indexes] = 1.0

return basicEventHistorySet

105 changes: 105 additions & 0 deletions src/utils/mathUtils/aakr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# Copyright 2020, Battelle Energy Alliance, LLC
"""
Created on Dec 20, 2020

@author: mandd
"""
# External Imports
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.metrics.pairwise import pairwise_distances
from sklearn.metrics.pairwise import pairwise_kernels
# Internal Imports

class AAKR():

def __init__(self, metric):
"""
This method initializes the AAKR class
@ In, metric, string, type of metric to be employed in the distance calculation
"""
self.metric = metric


def train(self, trainData):
"""
This method load the training data into the AAKR class
@ In, trainData, pandas DataFrame, dataframe containing the training dataset, i.e., X^{obs_NC}
"""
if isinstance(trainData,pd.DataFrame):
self.trainingData = trainData.to_numpy()
else:
self.trainingData = trainData

# Z-Normalize data
self.scaler = StandardScaler()
self.scaler.fit(self.trainingData)
self.trainingData = self.scaler.transform(self.trainingData)


def fit(self, timeSeries, batchSize=None, **Kwargs):
"""
This method performs partition the provided timeSeries in batches before performing the regression.
This is useful when training dataset and timeSeries are very big.
@ In, timeSeries, pandas DataFrame, time series of actual recorded data
@ In, Kwargs, dict, parameters for the chosen kernel
@ In, batchSize, int, number of partitions of the timeSeries to perform the regression
@ Out, reconstructedData, pandas DataFrame, reconstructed timeSeries
@ Out, residual, pandas DataFrame, residual: timeSeries - reconstructedData
"""
if batchSize is None:
return self.reconstruct(timeSeries, **Kwargs)
else:
batches = np.array_split(timeSeries, batchSize)
reconstructedDataList = [None] * batchSize
residualDataList = [None] * batchSize
counter = 0
for batch in batches:
print("serving batch: " + str(counter))
reconstructedDataBatch, residualDataBatch = self.reconstruct(batch, **Kwargs)
reconstructedDataList[counter] = reconstructedDataBatch
residualDataList[counter] = residualDataBatch
counter = counter + 1
reconstructedData = pd.concat(reconstructedDataList)
residualData = pd.concat(residualDataList)

return reconstructedData, residualData

def reconstruct(self, timeSeries, **Kwargs):
"""
This method performs the regression of the provided timeSeries for one single batch
using the training data X^{obs_NC}
@ In, timeSeries, pandas DataFrame, time series of actual recorded data
@ In, Kwargs, dict, parameters for the chosen kernel
@ Out, reconstructedData, pandas DataFrame, reconstructed timeSeries
@ Out, residual, pandas DataFrame, residual: timeSeries - reconstructedData
"""
recData = {}
resData = {}
keys = timeSeries.keys()

# Normalize actual data
timeSeriesNorm = self.scaler.transform(timeSeries.to_numpy())

distanceMatrix = pairwise_distances(X = self.trainingData,
Y = timeSeriesNorm,
metric = self.metric)

weights = 1.0/np.sqrt(2.0*3.14159*Kwargs['bw']**2.0) * np.exp(-distanceMatrix**2.0/(2.0*Kwargs['bw']**2.0))
weightSum = np.sum(weights,axis=0)
weightsClean = np.where(sum==0, 1, weightSum)[:, None]

recDataRaw = weights.T.dot(self.trainingData)
recDataRaw = recDataRaw/weightsClean

recDataRaw = self.scaler.inverse_transform(recDataRaw)

for index,key in enumerate(keys):
recData[key] = recDataRaw[:,index]
resData[key] = recDataRaw[:,index] - timeSeries.to_numpy()[:,index]

reconstructedData = pd.DataFrame(recData, index=timeSeries.index)
residualData = pd.DataFrame(resData, index=timeSeries.index)

return reconstructedData, residualData
147 changes: 147 additions & 0 deletions src/utils/mathUtils/sax.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
# Copyright 2020, Battelle Energy Alliance, LLC
# ALL RIGHTS RESERVED
"""
Created on Mar 17, 2021

@author: mandd
"""
# External Imports
import numpy as np
import pandas as pd
from scipy.stats import norm
import string
# Internal Imports

class SAX():
"""
Class containing the algorithm which performs symbolic conversion of time series using the SAX algorithm

Reference: Lin, J., Keogh, E., Wei, L. and Lonardi, S. (2007).
Experiencing SAX: a Novel Symbolic Representation of Time Series.
Data Mining and Knowledge Discovery Journal.

Link: https://www.cs.ucr.edu/~eamonn/SAX.htm
"""

def __init__(self, freq, alphabetSizeDict, timeID=None):
"""
This method initializes the SAX class
@ In, alphabetSizeDict, dict, discretization size for each dimensions
@ In, timeWindows, int, discretization of the time axis
"""
self.freq = freq
self.alphabetSizeDict = alphabetSizeDict
self.timeID = timeID

def fit(self, data, normalization=True):
"""
This method performs symbolic conversion of time series using the SAX algorithm
@ In, data, pandas DataFrame, time series that needs to be converted
@ In, normalization, bool, parameter that set if time series normalization is required (True) or not (False)
@ Out, symbolicTS, pandas DataFrame, symbolic conversion of provided time series
@ Out, varCutPoints, dict, dictionary containing the discretization points for each dimension
"""
# Normalize data
if normalization:
normalizedData, normalizationData = self.timeSeriesNormalization(data)

# PAA process
paaData = self.piecewiseAggregateApproximation(normalizedData)

symbolicData,varCutPoints = self.ndTS2String(paaData)

for var in varCutPoints:
varCutPoints[var] = varCutPoints[var]*normalizationData[var][1]+normalizationData[var][0]

return symbolicData, varCutPoints

def piecewiseAggregateApproximation(self, data):
print(data)
paa = data.resample(self.freq, on='time').mean().reset_index()
return paa

def piecewiseAggregateApproximationOLD(self, data):
"""
This method performs Piecewise Aggregate Approximation of the given time series
@ In, data, pandas DataFrame, time series to be discretized
@ Out, paa, pandas DataFrame, discretized time series
"""
nTimeVals, nVars = data.shape
paaData = {}
for var in self.alphabetSizeDict.keys():
res = np.zeros(self.timeWindows)
if (nTimeVals % self.timeWindows == 0):
inc = nTimeVals // self.timeWindows
for i in range(0, nTimeVals):
idx = i // inc
res[idx] = res[idx] + data[var].to_numpy()[i]
paaData[var] = res / inc
else:
for i in range(0, self.timeWindows * nTimeVals):
idx = i // nTimeVals
pos = i // self.timeWindows
res[idx] = res[idx] + data[var].to_numpy()[pos]
paaData[var] = res / nTimeVals

paa = pd.DataFrame(paaData)

return paa


def timeSeriesNormalization(self, data):
"""
This method performs the Z-normalization of a given time series
@ In, data, pandas DataFrame, time series to be normalized
@ Out, data, pandas DataFrame, normalized time series
@ Out, normalizationData, dict, dictionary containing mean and std-dev of each dimension of the time series
"""
normalizationData = {}
normalizedData = {}

for var in self.alphabetSizeDict.keys():
if var!=self.timeID:
normalizationData[var] = [np.mean(data[var].values),np.std(data[var].values)]
normalizedData[var] = (data[var].values-normalizationData[var][0])/normalizationData[var][1]

normalizedData[self.timeID] = data[self.timeID].values
normalizedDataDF = pd.DataFrame(normalizedData)
return normalizedDataDF, normalizationData


def ndTS2String(self, paaTimeSeries):
"""
This method performs the symbolic conversion of a given time series
@ In, data, pandas DataFrame, multi-variate time series to be converted into string
@ Out, paaTimeSeries, pandas DataFrame, symbolic converted time series
@ Out, varCutPoints, dict, dictionary containing cuts data for each dimension
"""
varCutPoints = {}

for var in paaTimeSeries:
if var!=self.timeID:
varCutPoints[var] = norm.ppf(np.linspace(0.0, 1.0, num=self.alphabetSizeDict[var]+1),loc=0., scale=1.)
paaTimeSeries[var] = self.ts2String(paaTimeSeries[var], varCutPoints[var])

return paaTimeSeries, varCutPoints

def ts2String(self, series, cuts):
"""
This method performs the symbolic conversion of a single time series
@ In, series, pandas DataFrame, uni-variate time series to be converted into string
@ In, cuts, dict, dictionary containing cuts data for the considered time series
@ Out, charArray, np.array, symbolic converted time series
"""
alphabetString = string.ascii_uppercase
alphabetList = list(alphabetString)

series = np.array(series)
charArray = np.chararray(series.shape[0],unicode=True)

for i in range(series.shape[0]):
j=0
while cuts[j]<series[i]:
j=j+1
charArray[i] = alphabetList[j-1]

return charArray

Loading