-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsolis_control.py
129 lines (106 loc) · 4.24 KB
/
solis_control.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
from homeassistant.helpers.aiohttp_client import async_get_clientsession
import hashlib
import hmac
import base64
import json
import re
from http import HTTPStatus
from datetime import datetime, timezone
VERB = "POST"
LOGIN_URL = '/v2/api/#'
CONTROL_URL= '/v2/api/control'
INVERTER_URL= '/v1/api/inverterList'
session = async_get_clientsession(hass)
def digest(body: str) -> str:
return base64.b64encode(hashlib.md5(body.encode('utf-8')).digest()).decode('utf-8')
def passwordEncode(password: str) -> str:
md5Result = hashlib.md5(password.encode('utf-8')).hexdigest()
return md5Result
def prepare_header(config: dict[str,str], body: str, canonicalized_resource: str) -> dict[str, str]:
content_md5 = digest(body)
content_type = "application/json"
now = datetime.now(timezone.utc)
date = now.strftime("%a, %d %b %Y %H:%M:%S GMT")
encrypt_str = (VERB + "\n"
+ content_md5 + "\n"
+ content_type + "\n"
+ date + "\n"
+ canonicalized_resource
)
hmac_obj = hmac.new(
config["secret"].encode('utf-8'),
msg=encrypt_str.encode('utf-8'),
digestmod=hashlib.sha1
)
sign = base64.b64encode(hmac_obj.digest())
authorization = "API " + config["key_id"] + ":" + sign.decode('utf-8')
header = {
"Content-MD5":content_md5,
"Content-Type":content_type,
"Date":date,
"Authorization":authorization
}
return header
def control_body(inverterId, chargeSettings) -> str:
body = '{"inverterId":"'+inverterId+'", "cid":"103","value":"'
for index, time in enumerate(chargeSettings):
body = body \
+str(time['chargeCurrent']) \
+"," \
+str(time['dischargeCurrent']) \
+"," \
+str(time['chargeStartTime']) \
+"," \
+str(time['chargeEndTime']) \
+"," \
+str(time['dischargeStartTime']) \
+"," \
+str(time['dischargeEndTime'])
if (index !=2):
body = body+","
return body+'"}'
def control_time_body(inverterId: str, currentTime: datetime) -> str:
body = '{"inverterId":"'+ inverterId + '", "cid":"56", "value":"' + \
currentTime.strftime('%Y-%m-%d %H:%M:%S') + \
+ '"}'
return body
async def set_control_times(token, inverterId: str, config, times):
body = control_body(inverterId, times)
headers = prepare_header(config, body, CONTROL_URL)
headers['token']= token
response = await session.post("https://www.soliscloud.com:13333"+CONTROL_URL, data = body, headers = headers)
log.warning("solis response:"+response.text())
async def set_updated_time(token, inverterId: str, config, currentTime: datetime):
body = control_time_body(inverterId, currentTime)
headers = prepare_header(config, body, CONTROL_URL)
headers['token']= token
response = await session.post("https://www.soliscloud.com:13333"+CONTROL_URL, data = body, headers = headers)
log.warning("solis response:"+response.text())
async def login(config):
body = '{"userInfo":"'+config['username']+'","password":"'+ passwordEncode(config['password'])+'"}'
header = prepare_header(config, body, LOGIN_URL)
response = await session.post("https://www.soliscloud.com:13333"+LOGIN_URL, data = body, headers = header)
status = response.status
result = ""
r = json.loads(re.sub(r'("(?:\\?.)*?")|,\s*([]}])', r'\1\2', response.text()))
if status == HTTPStatus.OK:
result = r
else:
log.warning(status)
result = response.text()
return result["csrfToken"]
async def getInverterList(config):
body = '{"stationId":"'+config['plantId']+'"}'
header = prepare_header(config, body, INVERTER_URL)
response = await session.post("https://www.soliscloud.com:13333"+INVERTER_URL, data = body, headers = header)
inverterList = response.json()
inverterId = ""
for record in inverterList['data']['page']['records']:
inverterId = record.get('id')
return inverterId
@service
async def solis_control(config=None,days=None):
inverterId= getInverterList(config)
token = login(config)
set_control_times(token, inverterId, config, days)
set_updated_time(token, inverterId, config, datetime.now())