forked from pklaus/avrnetio
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathavrnetio.py
267 lines (227 loc) · 10.2 KB
/
avrnetio.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
#!/usr/bin/env python2
# -*- encoding: UTF8 -*-
# author: Philipp Klaus, philipp.l.klaus AT web.de
# Author: 4096R/26F7CE8A Patrick Hieber <patrick.hieber AT gmx.net>
# This file is part of avrnetio.
#
# avrnetio is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# avrnetio is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with avrnetio. If not, see <http:#www.gnu.org/licenses/>.
# This class represents the ECMDs (Ethersex Commands) available on the Pollin AVR-NET-IO.
# It uses raw TCP communication to send the commands.
# The class aimes at providing complete coverage of the functionality of the box
# but not every action is supported yet.
## for the raw TCP socket connection:
from socket import *
## for md5 checksum:
#import hashlib
## for RegularExpressions:
import re
## for debugging (set debug mark with pdb.set_trace() )
import pdb
## for math.ceil()
#import math
## for sys.exc_info()
# import sys
import time
### for date.today()
#from datetime import date
from datetime import datetime
try:
import serial
except ImportError:
serial = None
LINE_END="\n"
BUFF_SIZE = 1024 # OK as long as ECMDs do not return anything longer than this...
TIMEOUT = 6
class Avrnetio(object):
""" Avrnetio is the representation of the hardware running the ethersex firmware. It can be used to send ECMDs via TCP/IP or via RS232. """
def __init__(self, host='', username="", password="", secure_login=False, port = 2701, tcp_mode = True):
""" custom constructor:
host: the network IP or hostname of the avrnetio to interface.
username, password: PAM data [not handled yet]
secureLogin: whether to use secure authentication [not handled yet]
port: TCP port to use when sending ECMDs to host.
"""
self.__host = host
self.__username = username
self.__password = password
self.__secureLogin = secure_login
self.__port = port
self.__tcp_mode = tcp_mode
self.__udp_connect = True # If you just communicate with one remote host, you can call "connect" and the socket will behave like TCP
self.__bufsize = BUFF_SIZE
self.__refEP = None # has to be set during operation
self.__serial_mode = False # has to be set during operation
try:
# find out about the host:
socktype = SOCK_STREAM if tcp_mode else SOCK_DGRAM
# getaddrinfo(host, port, family=0, socktype=0, proto=0)
connection = getaddrinfo(host, port, AF_UNSPEC, socktype)
if len(connection) < 1: raise NameError("Could not resolve hostname " + host)
# Take the preferred connection:
self.__connection = connection[0]
except:
raise NameError("There was a problem with the hostname & port you supplied: ",host,port)
def __login(self):
""" log in using TCP/IP"""
try:
# create the socket
self.__s = socket(self.__connection[0], self.__connection[1], self.__connection[2])
self.__s.settimeout(TIMEOUT)
if self.__tcp_mode or self.__udp_connect: self.__s.connect(self.__connection[4])
# self.__s.bind(('',0)) # (I think I don't need this as client!)
except NameError: # some socket error
raise NameError("No connection to endpoint " + self.__host)
return False
self.disconnect()
return True
def set_serial_mode(self, device="/dev/ttyS0", baud=115200):
""" use this procedure after instantiation of the object to set the communication with the avrnetio to rs232 instead of network."""
pdb.set_trace()
if serial:
self.__serial_mode = True
self.__serial_device = device
self.__serial_baud = baud
else:
raise NameError("Trying to set_serial_mode(), but python module pySerial is not installed.")
def get_system_time(self):
"""ask for the system time of the avrnetio"""
return self.__send_request("time")
def get_system_date(self):
"""ask for the system date of the avrnetio"""
return self.__send_request("date")
def get_system_uptime(self):
"""ask for the system uptime of the avrnetio"""
return self.__send_request("whm")
def __hex_string_to_int(self, hex_string):
return int(hex_string, 16)
def get_adcs(self):
data = self.__send_request("adc get").strip().split(" ")
data = map(self.__hex_string_to_int,data)
return data
def set_port(self, port, value):
return self.__send_request("io set port " + str(port) + " " + str(value))
def set_ddr(self, port, value):
return self.__send_request("io set ddr "+str(port)+" "+str(value))
# remove 'OK' from the given output
def rm_ok(self, output):
if 'OK' in output: output.remove('OK')
return output
def get_1w(self, which):
self.__send_request("1w convert " + which)
return self.__send_request("1w get " + which)
def get_1ws(self):
onewires = self.__send_request("1w list", True)
return self.rm_ok(onewires)
def get_i2cSlaves(self):
i2cs = self.__send_request("i2c detect", True)
return self.rm_ok(i2cs)
# read byte from I2C chip
def get_i2c_rbb(self, address):
return self.rm_ok(self.__send_request("i2c rbb "+address))
# read byte from register address at I2C chip
def get_i2c_rbd(self, address, regaddress):
return self.rm_ok(self.__send_request("i2c rbd "+address+" "+regaddress))
# read word from register address at I2C chip
def get_i2c_rwd(self, address, regaddress):
return self.rm_ok(self.__send_request("i2c rwd "+address+" "+regaddress))
# write byte to I2C chip
def get_i2c_wbb(self, address, hexbyte):
return self.rm_ok(self.__send_request("i2c wbb "+address+" "+hexbyte))
# write byte to register address on I2C chip
def get_i2c_wbd(self, address, regaddress, hexbyte):
return self.rm_ok(self.__send_request("i2c wbd "+address+" "+regaddress+ \
" "+hexbyte))
# write word to register address on I2C chip
def get_i2c_wwd(self, address, regaddress, hexbyte):
return self.rm_ok(self.__send_request("i2c wwd "+address+" "+regaddress))
# read temperature from the tmp175 sensor
# offset = realAddress - defaultAddress(=0x48)
# the offset must be between >=0 .. <27
def get_tmp175(self, offset):
return self.__send_request("tmp175 "+str(offset))
# set reference electrical potential
def set_ref_ep(self,reference_ep):
self.__ref_ep = reference_ep
def __10bit_int_to_volt(self, integer):
return integer/1023.0*self.__ref_ep
def get_adcs_as_volts(self):
if self.__ref_ep == None:
raise NameError("Please set reference electrical potential first.")
return map(self.__10bit_int_to_volt,self.get_adcs())
# generic method to send requests to the NET-IO 230A and checking the response
def __send_request(self,request, responses_till_OK=False):
# RS232 mode:
if self.__serial_mode:
try:
self.__serial
except:
self.__serial = serial.Serial(self.__serial_device, self.__serial_baud, timeout=2)
print "(re)connecting serial"
try:
self.__serial.write(request+LINE_END) # send the request
except serial.serialutil.SerialException:
raise NameError("Connection to ethersex device failed. Wrong RS232 port given? Device not powered?")
self.__serial.readline()
if responses_till_OK: answer = []
while True:
line = self.__serial.readline().replace(LINE_END,"") # read a '\n' terminated line
if line == 'OK': break
if responses_till_OK:
answer.append(line)
else:
answer = line
break
return answer
# network mode
try:
self.__s
except:
self.__login()
print "(re)connecting network"
if self.__tcp_mode or self.__udp_connect:
self.__s.send(request+LINE_END)
if responses_till_OK: answer = []
while True:
line = self.__s.recv(self.__bufsize).replace(LINE_END,'')
if responses_till_OK:
answer.append(line)
else:
answer = line
break
if line == 'OK': break
else:
self.__s.sendto(request+LINE_END, (self.__host, self.__port) )
addr = ('',0)
start = time.time()
while addr[0] != self.__host:
if time.time()-start > TIMEOUT*2:
raise NameError('Did not receive a response from ethersex in time.')
answer, addr = self.__s.recvfrom(self.__bufsize)
if (type(answer).__name__=='string' and re.match("parse error", answer) != None) or (type(answer).__name__=='list' and "parse error" in answer):
raise NameError("Error while sending request: " + request + "\nresponse from avrnetio is: " + answer)
return None
return answer
def disconnect(self):
"""disconnect the class from the avrnetio. That means closing the serial port and/or closing the TCP connection."""
try:
if self.__serial_mode:
self.__serial
# close the network socket:
self.__s.close()
except:
pass
def __del__(self):
self.disconnect()
### end of class netio230a ----------------
# vim:expandtab:ts=4:sw=4:ai:number