-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathQLearning.py
executable file
·103 lines (82 loc) · 3.16 KB
/
QLearning.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# -*- coding: utf-8 -*-
"""Classe QLearning
Classe permettant d'utiliser l'algorithme QLearning.
Cette classe est un thread qui tourne en arrière plan pour résoudre un problème donné
"""
from random import shuffle, randint
import labytoet
from threading import Thread
from time import sleep
from math import exp
class QLearning(Thread):
def __init__(self, problem, iterations=3000, gamma=1, exploration=0.6):
"""
Prend un problème en paramètre.
"""
Thread.__init__(self)
self.iterations = iterations
self.exploration = exploration
self.problem = problem
self.gamma = gamma
self.terminated = False
# Initialisation du dictionnaire Q
self.Q = self.problem.etats.fromkeys(self.problem.etats.keys(), dict())
for i in self.Q:
self.Q[i] = dict(zip(range(len(self.problem.etats[i])), [0] * len(self.problem.etats)))
self.countAction = dict()
self.countState = self.problem.etats.fromkeys(self.problem.etats.keys(), 0)
#Thread(target=self.run).start()
#self.problem.afficher(self.countState)
def run(self):
"""
Fonction pour lancer le thread
"""
#for i in range(self.iterations):
i = self.iterations
while(i!=0 and not self.terminated):
s=self.problem.setInitialState()
self.countState[s] += 1
while not self.problem.isGoal():
if self.terminated:
break
sleep(0.0001)
a = self.getAction(s)
self.countAction[(s, a)] = self.countAction.get((s, a), 0) + 1
r = self.problem.action(a)
new_s=self.problem.getCurrentPosition()
self.Q[s][a] = r + self.gamma * max(self.Q[new_s].values())
s = new_s
self.countState[s] += 1
i -= 1
QLearning.__init__(self, self.problem)
def getAction(self, s):
"""
Choisit une action parmi ceux qui sont possibles suivant une startégie.
Par défaut, la fonction opère plus d'exploration que d'exploitation avec une loi exponentielle :
au départ, beaucoup d'exploration pour converger par la suite que sur de l'exploitation
"""
somme = 0
for i in self.problem.fin:
somme += self.countState[i]
return max(self.Q[s], key=lambda i:\
(1-self.exploration)*self.Q[s][i] \
- self.exploration*self.countAction.get((s, i), 0)*exp(-somme/(len(self.problem.fin)*10)))
def stop(self):
"""
Permet d'arrêter le thread en cours d'exécution
"""
self.terminated = True
def getIterationsValue(self):
return self.iterations
def setIterationsValue(self, iterations):
self.iterations = iterations
def getGammaValue(self):
return self.gamma
def setGammaValue(self, gamma):
self.gamma = gamma
def getExplorationValue(self):
return self.exploration
def setExplorationValue(self, exploration):
self.exploration = exploration
def getCountState(self):
return self.countState