-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathgenerate_d0_d1.py
213 lines (179 loc) · 8.04 KB
/
generate_d0_d1.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""generate_d0_d1.py - Process MQTT messages to produce today (d0) & yesterday (d1).
The Python program listens to incoming MQTT messages from a
smart meter, capturing real-time consumption data. It processes
this data to compute the current consumption value for the
day (d0) and the previous day (d1). These daily consumption
metrics are then published back to a designated MQTT topic,
allowing for continuous updates on today's and yesterday's
power usage.
Usage:
generate_d0_d1.py
"""
#
# LICENSE:
#
# Copyright (C) 2024 Ixtalo, ixtalo@gmail.com
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import configparser
import logging
import os
from pathlib import Path
from datetime import datetime, timedelta
import paho.mqtt.client as mqtt
from smlmqttprocessor.utils.mylogging import setup_logging
MQTT_TOPIC_SMARTMETER_TOTAL = "tele/smartmeter/total/value"
MQTT_TOPIC_D0 = "tele/smartmeter/total/d0"
MQTT_TOPIC_D1 = "tele/smartmeter/total/d1"
CONFIG_FILENAME = "config.ini"
DEBUG = bool(os.getenv("DEBUG", "").lower() in ("1", "true", "yes"))
__script_dir = Path(__file__).parent
class DailyEnergyMonitor:
"""Calculate daily energy consumption for today & yesterday."""
d0_retained = None
d1_retained = None
def __init__(self, retain: bool = True):
"""Calculate daily energy consumption for today & yesterday."""
self.retain = retain
self.data = []
self.d0 = None # today
self.d1 = None # yesterday
self.current_date = datetime.now().date() # start date
def add_value(self, total_value: float):
"""Add a new value to the internal data store."""
timestamp = datetime.now()
self.data.append({'timestamp': timestamp, 'value': total_value})
logging.debug("new size of data[] is %s", len(self.data))
# calculate the difference (delta) aka consumption today so far (d_0)
delta = self.calculate_consumption_today()
if delta:
logging.debug("d0 delta since start: %.2f", delta)
self.d0 = delta
# if there has been a retained value, use it as offset from now on
self.d0 += self.d0_retained if self.d0_retained else 0
# tell/publish
logging.info("d0: %.2f", self.d0)
else:
logging.debug("d0 delta: not enough data yet")
# check if there's a new day
if self._check_is_new_day(timestamp):
# reset on new day
self.d0_retained = 0
self.current_date = timestamp.date()
# calculate the difference (delta) aka consumption of yesterday (d_-1)
delta = self.calculate_consumption_yesterday()
if not delta:
logging.debug("d1 delta: not enough data yet")
else:
logging.debug("d1 delta since start: %.2f", delta)
self.d1 = delta
# if there has been a retained value, use it as offset from now on
self.d1 += self.d1_retained if self.d1_retained else 0
self.d1_retained = self.d1
# tell/publish
logging.info("d1: %.2f", self.d1)
def _check_is_new_day(self, timestamp: datetime):
return timestamp.date() != self.current_date
def calculate_consumption_today(self):
"""Calculate the consumption of today (d_0)."""
today = datetime.now().date()
# slice data to just today's subset
today_data = [entry['value'] for entry in self.data
if entry['timestamp'].date() == today]
if len(today_data) > 1:
return today_data[-1] - today_data[0]
return None
def calculate_consumption_yesterday(self):
"""Calculate the consumption of yesterday (d_-1)."""
yesterday = datetime.now().date() - timedelta(days=1)
# slice data to yesterday
yesterday_data = [entry['value'] for entry in self.data
if entry['timestamp'].date() == yesterday]
if len(yesterday_data) > 1:
return yesterday_data[-1] - yesterday_data[0]
return None
def handle_smartmeter_message(client, userdata, msg):
"""Handle MQTT message for smartmeter total values."""
value = float(msg.payload.decode())
userdata.add_value(value)
if DEBUG:
return # do nothing, stop here
if userdata.d0:
client.publish(MQTT_TOPIC_D0, round(userdata.d0, 2), retain=userdata.retain)
if userdata.d1:
client.publish(MQTT_TOPIC_D1, round(userdata.d1, 2), retain=userdata.retain)
def handle_retained_dx_message(client, userdata, msg):
"""Handle MQTT retained messages to use as initial offsets."""
logging.debug("handle_last_dx_message: %s = %s", msg.topic, msg.payload)
value = float(msg.payload.decode())
if msg.topic == MQTT_TOPIC_D0:
# store value to be used as initial offset
userdata.d0_retained = value
logging.info("d0 (retained): %.2f", userdata.d0_retained)
# this topic is now done, no further handling is required in this session
client.unsubscribe(msg.topic)
elif msg.topic == MQTT_TOPIC_D1:
# store value to be used as initial offset
userdata.d1_retained = value
logging.info("d1 (retained): %.2f", userdata.d1_retained)
# this topic is now done, no further handling is required in this session
client.unsubscribe(msg.topic)
else:
logging.warning("Unexpected message! (%s, %s)", msg.topic, msg.payload)
def get_config(configfile: Path):
"""Read configuration from confile file."""
config = configparser.ConfigParser()
if not configfile.is_absolute():
configfile = __script_dir.joinpath(configfile)
logging.info("Config file: %s", configfile.resolve())
if not configfile.is_file():
raise FileNotFoundError(f"No configfile! ({configfile.resolve()})")
if not os.access(configfile, os.R_OK):
raise RuntimeError(f"Configfile not readable! ({configfile.resolve()})")
res = config.read(configfile)
logging.debug("config read result: %s", res)
return config
def main():
"""Start the program's main entry point."""
# set up logging framework
setup_logging(level=logging.INFO if not DEBUG else logging.DEBUG)
# configuration
config = get_config(Path(CONFIG_FILENAME))
mqtt_username = config.get('Mqtt', 'username')
mqtt_password = config.get('Mqtt', 'password')
mqtt_host = config.get('Mqtt', 'host', fallback='localhost')
mqtt_port = config.getint('Mqtt', 'port', fallback=1883)
mqtt_retain = config.getboolean('Mqtt', 'retain', fallback='true')
# MQTT initialization
client = mqtt.Client(userdata=DailyEnergyMonitor(retain=mqtt_retain))
client.username_pw_set(username=mqtt_username, password=mqtt_password)
client.enable_logger()
# MQTT message callbacks
client.message_callback_add(MQTT_TOPIC_D0, handle_retained_dx_message)
client.message_callback_add(MQTT_TOPIC_D1, handle_retained_dx_message)
client.message_callback_add(MQTT_TOPIC_SMARTMETER_TOTAL, handle_smartmeter_message)
# initialize MQTT connection
client.connect(mqtt_host, port=mqtt_port)
# NOTE subscriptions must come *after* connect() !
# subscribe to the retained messages
client.subscribe(MQTT_TOPIC_D0)
client.subscribe(MQTT_TOPIC_D1)
# subscribe to the very topic which contains the source data
client.subscribe(MQTT_TOPIC_SMARTMETER_TOTAL)
client.loop_forever()
if __name__ == "__main__":
main()