-
Notifications
You must be signed in to change notification settings - Fork 2
/
i2ct.py
148 lines (130 loc) · 4.13 KB
/
i2ct.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# class to handle i2c
# import i2ct as i2ct
# k=i2ct()
# k.menu()
import machine
from machine import I2C
import gc
import time
class i2ct():
def __init__(self):
self.pinSDA=machine.Pin(4)
self.pinSCL=machine.Pin(5)
self.con=I2C(scl=self.pinSCL, sda=self.pinSDA)
self.target=0
# debug only:
self.inp=0
self.inpAkt=False
self.lasthash="2cbac32719086e89b856e17bd2a34f21032a51d2"
self.newhash ="0a536a14db3b230247d4d90c3b33abff25b64382"
self.difficulty=10
def setSpeed(self,khz):
self.con.init(self.pinSCL,self.pinSDA, freq=khz*1000)
def request(self, anz):
try:
rec=self.con.readfrom(self.target,anz)
return rec
except Exception as inst:
print (self.target,"i2c req Exc: "+str(inst))
return bytearray("X00")
def send(self,txt):
try:
self.con.writeto(self.target,bytearray(txt) )
except Exception as inst:
print (self.target,"i2c send Exc: "+str(inst))
def queryStatus(self):
self.send("S")
rec=self.request(3)
try:
return rec.decode("utf-8")
except Exception as inst:
print (self.target,"i2c queryStatus Exc: "+str(inst))
return bytearray("X00")
def check(self):
# have to give slave time to prepare answer
# do not use if another query is running
cnt=10
while (cnt > 0):
cnt-=1
rec=self.queryStatus()
if rec[0]=='B':
print ('B')
gc.collect() # or sleep
else:
return True
return False
def queryResult(self):
self.send("R")
rec=self.request(5) # must be 5 as decode fails on xff
try:
return int(rec.decode("utf-8"))
except Exception as inst:
print ("i2c queryResult Exc: "+str(inst))
return 0
def queryElapsed(self):
self.send("E")
rec=self.request(5) # must be 5 as decode fails on xff
try:
return int(rec.decode("utf-8"))
except Exception as inst:
print ("i2c queryEla Exc: "+str(inst))
return 0
def queryId(self):
self.send("I")
time.sleep_ms(10) #it takes time
rec=self.request(22)[:22] # must be 22
try:
return rec.decode("utf-8")
except: #not supported by slave
return 'DUCOID8159497002237243'
def setDifficulty(self):
if self.difficulty>99:
self.difficulty=99
tx="D{:02d}".format(self.difficulty)
self.send(tx)
def setTwiAdr(self,adr):
tx="V{:02d}".format(adr)
self.send(tx)
def sendHash(self,was):
#print("Sending hash "+was)
self.check()
if was=='L':
tx=was+self.lasthash[0:20]
elif was=='M':
tx=was+self.lasthash[20:40]
elif was=='N':
tx=was+self.newhash[0:20]
elif was=='O':
tx=was+self.newhash[20:40]
else:
print ("Invalid SendHash",was)
return
self.send(tx)
def sendHash85(self,was):
#print("Sending hash "+was)
self.check()
if was=='L':
tx=was+self.lasthash[0:14]
elif was=='M':
tx=was+self.lasthash[14:28]
elif was=='N':
tx=was+self.lasthash[28:40]
elif was=='O':
tx=was+self.newhash[0:14]
elif was=='P':
tx=was+self.newhash[14:28]
elif was=='Q':
tx=was+self.newhash[28:40]
else:
print ("Invalid SendHash85",was)
return
self.send(tx)
def sendHashes(self):
#start=time.ticks_ms()
if self.target <50:
for was in ['L','M','N','O']:
self.sendHash(was);
else: #tiny
for was in ['L','M','N','O','P','Q']:
self.sendHash85(was);
#print ("Hashes sent ",time.ticks_diff(time.ticks_ms(),start))