From 423f83682f7163071dc169149981abb2e7f32c62 Mon Sep 17 00:00:00 2001 From: Haim Gelfenbeyn Date: Wed, 5 Oct 2022 10:29:57 -0400 Subject: [PATCH] Re-subscribe to all topics upon reconnect --- internal/mqtt/mqtt.go | 50 +++++++++++++++++++++++-------------------- 1 file changed, 27 insertions(+), 23 deletions(-) diff --git a/internal/mqtt/mqtt.go b/internal/mqtt/mqtt.go index 49b4e86..30791df 100644 --- a/internal/mqtt/mqtt.go +++ b/internal/mqtt/mqtt.go @@ -33,10 +33,6 @@ type Client struct { func Init(appName string, config *config.MqttConfig, controls []controls.Switch, logger *zap.SugaredLogger) (*Client, error) { client, err := Connect(appName, config, controls, logger) - if err != nil { - return nil, err - } - err = client.Subscribe() if err != nil { client.handle.Disconnect(0) return nil, err @@ -45,33 +41,41 @@ func Init(appName string, config *config.MqttConfig, controls []controls.Switch, } func Connect(appName string, config *config.MqttConfig, controls []controls.Switch, logger *zap.SugaredLogger) (*Client, error) { + switches := make([]*Switch, len(controls)) + for i, control := range controls { + switches[i] = &Switch{control: control} + } + client := &Client{appName: appName, switches: switches, logger: logger} + opts := MQTT.NewClientOptions() opts.AddBroker(config.Broker) opts.SetOrderMatters(false) - opts.SetClientID(generateClientId(appName)) - opts.SetWill(appAvailabilityTopic(appName), unavailablePayload, 1, true) + opts.SetClientID(client.generateClientId()) + opts.SetWill(client.appAvailabilityTopic(), unavailablePayload, 1, true) if (config.User != nil) && (config.Password != nil) { opts.SetUsername(*config.User) opts.SetPassword(*config.Password) } - opts.SetOnConnectHandler(func(client MQTT.Client) { + + opts.SetOnConnectHandler(func(handle MQTT.Client) { // This has to be in the on-connection handler, to make sure we mark ourselves as "available" upon reconnect - setAppAvailable(client, appName, logger) + client.setAppAvailable() + err := client.Subscribe() + if err != nil { + client.logger.Errorw("Cannot subscribe", "error", err) + } }) - client := MQTT.NewClient(opts) - if token := client.Connect(); token.Wait() && token.Error() != nil { + + client.handle = MQTT.NewClient(opts) + if token := client.handle.Connect(); token.Wait() && token.Error() != nil { return nil, token.Error() } logger.Infow("Connected to MQTT", "broker", config.Broker) - switches := make([]*Switch, len(controls)) - for i, control := range controls { - switches[i] = &Switch{control: control} - } - return &Client{appName: appName, handle: client, switches: switches, logger: logger}, nil + return client, nil } -func generateClientId(appName string) string { - return fmt.Sprintf("%s-%016x", appName, rand.Uint64()) +func (client *Client) generateClientId() string { + return fmt.Sprintf("%s-%016x", client.appName, rand.Uint64()) } func (client *Client) Subscribe() error { @@ -168,10 +172,10 @@ func (client *Client) setAvailable(sw *Switch, available bool) { logger.Debugw("Published availability to MQTT") } -func setAppAvailable(client MQTT.Client, appName string, appLogger *zap.SugaredLogger) { - topic := appAvailabilityTopic(appName) - logger := appLogger.With(zap.String("topic", topic)) - token := client.Publish(topic, 1, true, generateAvailablePayload(true)) +func (client *Client) setAppAvailable() { + topic := client.appAvailabilityTopic() + logger := client.logger.With(zap.String("topic", topic)) + token := client.handle.Publish(topic, 1, true, generateAvailablePayload(true)) token.Wait() if token.Error() != nil { logger.Error("Error publishing application availability to MQTT", "error", token.Error()) @@ -217,8 +221,8 @@ func (client *Client) availabilityTopic(sw *Switch) string { return fmt.Sprintf("%s/switches/%s/available", client.appName, sw.control.Name) } -func appAvailabilityTopic(appName string) string { - return fmt.Sprintf("%s/available", appName) +func (client *Client) appAvailabilityTopic() string { + return fmt.Sprintf("%s/available", client.appName) } func (client *Client) syncLog() {