-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcul.py
152 lines (122 loc) · 3.84 KB
/
cul.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import sys
import serial
import serial.threaded
import io
import time
import json
import paho.mqtt.client as mqtt
#### CONFIG HERE ####
# Serial Device
DEVICE = '/dev/ttyUSB0'
BAUDRATE = 38400
PARITY = 'N'
RTSCTS = False
XONXOFF = False
# Serial Operations
TIMEOUT = 1 #s
INIT_SLEEP_TIME = 3 # how many seconds do we wait for the serial device to accept commands?
LOOP_SLEEP_TIME = 1 # read serial data every x seconds to prevent high CPU usage
# MQTT Config
MQTT_SERVER = "127.0.0.1"
MQTT_PORT = 1883
MQTT_CONTEXT = "homeassistant/binary_sensor/" # modify this for other uses, currently this has only binary sensors supported
# Translate device IDs to common names
# If you don't have a device ID in here, it will be ignored!
DEVICE_NAMES = {
"1111": "doorbell"
}
# Translate device IDs to device_class
DEVICE_CLASS = {
"1111": "motion"
}
### END OF CONFIG ###
######## Don't modify below here unless you know what you're doing ###########
STATE_CODES = {
"1111": "OFF",
"1212": "ON",
}
def hexToElv(string):
retStr = ""
for i in range(0,len(string),1):
substr = string[i]
# Convert strange ELV notation on base 4 to base 10
hexint = int(substr,16)
l_int = hexint % 4 + 1 # 1 to 4 instead of 0 to 3
r_int = int(hexint / 4)
h_int = r_int % 4 + 1
retStr += str(h_int) + str(l_int)
return retStr
# last two bytes are the RSSI value in dBm
def getRSSI(string):
return int(string,16)-256-74
# first four bytes are the house code
def getHauscode(string):
return string[0:8]
# next two bytes are the device address
def getDevicecode(string):
return string[8:12]
def getCommonName(string):
if string in DEVICE_NAMES:
return DEVICE_NAMES[string]
else:
return None
# next two bytes are state
def getState(string):
return STATE_CODES[string[12:16]]
ser = serial.serial_for_url(DEVICE, do_not_open=True, timeout=TIMEOUT)
ser.baudrate = BAUDRATE
ser.parity = PARITY
ser.rtscts = RTSCTS
ser.xonxoff = XONXOFF
try:
ser.open()
except serial.SerialException as e:
sys.stderr.write('Could not open serial port {}: {}\n'.format(ser.name, e))
sys.exit(1)
time.sleep(INIT_SLEEP_TIME) # we have to wait for init
ser.write(b"X21\r\n") # set mode to receive known good packets with RSSI attached
# Initialize the MQTT connection
def mqtt_onconnect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT")
client.subscribe(MQTT_CONTEXT+"#")
for key in DEVICE_NAMES:
configjson = {
"name": DEVICE_NAMES[key],
"device_class": DEVICE_CLASS[key]
}
client.publish(MQTT_CONTEXT+DEVICE_NAMES[key]+"/config", None, 1, True) # clear old config messages to prevent duplicate devices
client.publish(MQTT_CONTEXT+DEVICE_NAMES[key]+"/config", json.dumps(configjson))
else:
print("There was an error connecting to the MQTT server")
def mqtt_onmessage(client, userdata, msg):
print(msg.topic + " " + str(msg.payload))
client = mqtt.Client()
client.on_connect = mqtt_onconnect
client.on_message = mqtt_onmessage
client.connect(MQTT_SERVER, MQTT_PORT, 60)
client.loop_start()
while(True): # Loopy McLoopface
time.sleep(LOOP_SLEEP_TIME) # Sleep every so often to diminish CPU usage
if (ser.inWaiting()>0): # Only iterate on data, if there is any
data = ser.read(ser.inWaiting())
data_str = data.decode('ascii')
parseString = hexToElv(data_str[1:len(data_str)-4])
housecode = getHauscode(parseString)
devicecode = getDevicecode(parseString)
commonName = getCommonName(devicecode)
if commonName is None:
continue;
state = getState(parseString)
rssi = getRSSI(data_str[9:11])
content = {
"housecode": housecode,
"deviceaddress": devicecode,
"commonname": commonName,
"state": state,
"rssi": str(rssi)
}
print(json.dumps(content))
print("Sending state change for <"+commonName+"> to ["+state+"]")
client.publish(MQTT_CONTEXT+commonName+"/state", state)
#ser.close() #We're not going to end up here anyways