Repositório com series temporais de geração média diária de algumas usinas fotovoltaicas do balança de geração de energia disponível no site da ONS http://www.ons.org.br/Paginas/resultados-da-operacao/boletim-geracao-solar.aspx.
Cada uma das seriés, modelos híbridos e código (jupyter notebooks) estão disponíveis nas respectivas pastas:
- assu_5
- bjl_solar
- fontes_solar_1
- rio_alto
Cada uma das pastas conta com um Notebook Jupyter, dados e modelos já treinados.
A implementação do algoritmo se dá a partir da classe:
class Ag_main:
def __init__(self, data, y_sarimax, num_epochs = 10, size_pop=10, prob_mut=0.5, tr_ts_percents=[80,20]):
self._data = data
self._data_train = data[:int(tr_ts_percents[0]/100*len(data))]
self._data_test = data[int(tr_ts_percents[0]/100*len(data)):]
self._y_sarimax = y_sarimax
self._erro = data-y_sarimax
self._data_train_arima = y_sarimax[:int(tr_ts_percents[0]/100*len(y_sarimax))]
self._data_test_arima = y_sarimax[int(tr_ts_percents[0]/100*len(y_sarimax)):]
self._num_epochs = num_epochs
self._size_pop = size_pop
self._prob_mut = prob_mut
self._tr_ts_percents = tr_ts_percents
self._fitness_array = np.array([])
self._best_of_all = None
def early_stop(self):
array = self._fitness_array
to_break=False
if len(array) > 4:
array_diff1_1 = array[1:] - array[:-1]
array_diff2 = array_diff1_1[1:] - array_diff1_1[:-1]
if (array_diff2[-4:].mean() < 0) and (abs(array_diff1_1[-4:].mean()) <1e-3):
to_break = True
return to_break
def train_test_split(self, serie, num_lags, print_shapes = False):
len_serie = len(serie)
X = np.zeros((len_serie, num_lags))
y = np.zeros((len_serie,1))
for i in np.arange(0, len_serie):
if i-num_lags>0:
X[i,:] = serie[i-num_lags:i]
y[i] = serie[i]
len_train = np.floor(len_serie*self._tr_ts_percents[0]/100).astype('int')
len_test = np.ceil(len_serie*self._tr_ts_percents[1]/100).astype('int')
X_train = X[0:len_train]
y_train = y[0:len_train]
X_test = X[len_train:len_train+len_test]
y_test = y[len_train:len_train+len_test]
return X_train, y_train, X_test, y_test
def train_test_split_prev(self, serie, num_lags_pass, num_lags_fut, print_shapes = False):
len_serie = len(serie)
X = np.zeros((len_serie, (num_lags_pass+num_lags_fut)))
y = np.zeros((len_serie,1))
for i in np.arange(0, len_serie):
if (i-num_lags_pass > 0) and ((i+num_lags_fut) <= len_serie):
X[i,:] = serie[i-num_lags_pass:i+num_lags_fut]
y[i] = serie[i]
elif (i-num_lags_pass > 0) and ((i+num_lags_fut) > len_serie):
X[i,-num_lags_pass:] = serie[i-num_lags_pass:i]
y[i] = serie[i]
len_train = np.floor(len_serie*self._tr_ts_percents[0]/100).astype('int')
len_test = np.ceil(len_serie*self._tr_ts_percents[1]/100).astype('int')
X_train = X[0:len_train]
y_train = serie[0:len_train]
X_test = X[len_train:len_train+len_test]
y_test = y[len_train:len_train+len_test]
return X_train, y_train, X_test, y_test
def gen_population(self):
population = [[1,1,1,1,'objeto_erro','objeto_ass',np.inf]]*self._size_pop
for i in range(0, self._size_pop):
population[i] = [random.randint(1, 20), random.randint(1, 20), random.randint(1, 20), random.randint(1, 20),10, 'objeto_erro', 'objeto_ass']
return population
def set_fitness(self, population, start_set_fit):
print('start_set_fit:', start_set_fit)
for i in range(start_set_fit, len(population)):
#erro estimado
erro_train_entrada, erro_train_saida, erro_test_entrada, erro_test_saida = self.train_test_split(self._erro, population[i][0])
#AG_erro
Ag_mlp_erro = self.Ag_mlp(erro_train_entrada, erro_train_saida, erro_test_entrada, erro_test_saida, self._num_epochs, self._size_pop, self._prob_mut).search_best_individual()
best_erro = Ag_mlp_erro._best_of_all
erro_estimado = np.concatenate([best_erro.predict(erro_train_entrada), best_erro.predict(erro_test_entrada)])
#y estimado
X_ass_1_train_in, _, X_ass_1_test_in, _ = self.train_test_split(self._y_sarimax, population[i][1])
X_ass_2_train_in, _, X_ass_2_test_in, _ = self.train_test_split_prev(erro_estimado, population[i][2], population[i][3])
X_in_train = np.concatenate((X_ass_1_train_in, X_ass_2_train_in), axis=1)
X_in_test = np.concatenate((X_ass_1_test_in, X_ass_2_test_in), axis=1)
#AG_ASS
Ag_MLP_ass = self.Ag_mlp(X_in_train, self._data_train, X_in_test, self._data_test, self._num_epochs, self._size_pop, self._prob_mut).search_best_individual()
best_ass = Ag_MLP_ass._best_of_all
population[i][4] = best_erro
population[i][5] = best_ass
population[i][-1] = mae(best_ass.predict(X_in_test), self._data_test)
return population
def new_gen(self, population, num_gen):
def cruzamento(population):
qt_cross = len(population[0])
pop_ori = population
for p in range(1, len(pop_ori)):
if np.random.rand() > self._prob_mut:
population[p][0:int(qt_cross/2)] = pop_ori[int(p/2)][0:int(qt_cross/2)]
population[p][int(qt_cross/2):qt_cross] = pop_ori[int(p/2)][int(qt_cross/2):qt_cross]
return population
def mutation(population):
for p in range(1, len(population)):
if np.random.rand() > self._prob_mut:
population[p][0] = population[p][0] + np.random.randint(1,2)
population[p][1] = population[p][1] + np.random.randint(1,2)
population[p][2] = population[p][2] + np.random.randint(1,2)
population[p][3] = population[p][3] + np.random.randint(1,2)
return population
population = cruzamento(population)
population = mutation(population)
population = self.set_fitness(population, int(self._size_pop*num_gen/(2*self._num_epochs)))
population.sort(key = lambda x: x[:][-1])
return population
def search_best_model(self):
ng = 0
population = self.gen_population()
population = self.set_fitness(population,0)
population.sort(key = lambda x: x[:][-1])
self._fitness_array = np.append(self._fitness_array, population[0][-1])
self._best_of_all = population[0]
for ng in tqdm(range(0, self._num_epochs)):
print('generation:', ng)
population = self.new_gen(population, ng)
if population[0][-1] < min(self._fitness_array):
self._best_of_all = population[0]
if self.early_stop():
break
return self
class Ag_mlp:
def __init__(self,X_train, y_train, X_test, y_test, num_generations, size_population, prob_mut):
self._X_train = X_train
self._y_train = y_train
self._X_test = X_test
self._y_test = y_test
self._num_generations = num_generations
self._size_population = size_population
self._prob_mut = prob_mut
self._fitness_array = np.array([])
self._best_of_all = None
def gen_population(self):
sizepop=self._size_population
population = [['']]*sizepop
solver = ['lbfgs', 'adam']
activation = ['identity', 'logistic', 'tanh', 'relu']
learning_rate = ['constant', 'invscaling', 'adaptive']
for i in range(0, sizepop):
population[i] = [random.choice(solver), random.randint(1, 100), random.randint(1, 50),
random.randint(1, 10), random.choice(activation), random.choice(learning_rate), 'objeto', 10]
return population
def set_fitness(self, population, start_set_fit):
for i in range(start_set_fit, len(population)):
mlp_volatil = MLPRegressor(hidden_layer_sizes=(population[i][1], population[i][2], population[i][3]),
activation = population[i][4], solver = population[i][0],
learning_rate = population[i][5], max_iter = 500)
qt_fits=0
mlp_volatil.fit(self._X_train, self._y_train)
mae_fits= mae(self._y_test, mlp_volatil.predict(self._X_test))
population[i][-1] = mae_fits
population[i][-2] = mlp_volatil
return population
def new_gen(self, population, num_gen):
def cruzamento(population):
qt_cross = len(population[0])
pop_ori = population
for p in range(1, len(pop_ori)):
if np.random.rand() > self._prob_mut:
population[p][0:int(qt_cross/2)] = pop_ori[int(p/2)][0:int(qt_cross/2)]
population[p][int(qt_cross/2):qt_cross] = pop_ori[int(p/2)][int(qt_cross/2):qt_cross]
return population
def mutation(population):
for p in range(1, len(population)):
if np.random.rand() > self._prob_mut:
population[p][1] = population[p][1] + np.random.randint(1,10)
population[p][2] = population[p][2] + np.random.randint(1,5)
population[p][3] = population[p][3] + np.random.randint(1,2)
return population
population = cruzamento(population)
population = mutation(population)
population = self.set_fitness(population, int(self._size_population*num_gen/(2*self._num_generations)))
population.sort(key = lambda x: x[:][-1])
return population
def early_stop(self):
array = self._fitness_array
to_break=False
if len(array) > 4:
array_diff1_1 = array[1:] - array[:-1]
array_diff1_2 = array[2:] - array[:-2]
array_diff2 = array_diff1_1[1:] - array_diff1_1[:-1]
if (array_diff2[-4:].mean() < 0) and (abs(array_diff1_1[-4:].mean()) <1e-3):
to_break = True
return to_break
def search_best_individual(self):
ng = 0
population = self.gen_population()
population = self.set_fitness(population, 0)
population.sort(key = lambda x: x[:][-1])
self._fitness_array= np.append(self._fitness_array, population[0][-1])
self._best_of_all = population[0][-2]
for ng in range(0, self._num_generations):
population = self.new_gen(population, ng)
if population[0][-1] < min(self._fitness_array):
self._best_of_all = population[0][-2]
if self.early_stop():
break
return self