Skip to content

Commit

Permalink
Re-subscribe to all topics upon reconnect
Browse files Browse the repository at this point in the history
  • Loading branch information
haimgel committed Oct 5, 2022
1 parent 6e098cd commit 423f836
Showing 1 changed file with 27 additions and 23 deletions.
50 changes: 27 additions & 23 deletions internal/mqtt/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@ type Client struct {

func Init(appName string, config *config.MqttConfig, controls []controls.Switch, logger *zap.SugaredLogger) (*Client, error) {
client, err := Connect(appName, config, controls, logger)
if err != nil {
return nil, err
}
err = client.Subscribe()
if err != nil {
client.handle.Disconnect(0)
return nil, err
Expand All @@ -45,33 +41,41 @@ func Init(appName string, config *config.MqttConfig, controls []controls.Switch,
}

func Connect(appName string, config *config.MqttConfig, controls []controls.Switch, logger *zap.SugaredLogger) (*Client, error) {
switches := make([]*Switch, len(controls))
for i, control := range controls {
switches[i] = &Switch{control: control}
}
client := &Client{appName: appName, switches: switches, logger: logger}

opts := MQTT.NewClientOptions()
opts.AddBroker(config.Broker)
opts.SetOrderMatters(false)
opts.SetClientID(generateClientId(appName))
opts.SetWill(appAvailabilityTopic(appName), unavailablePayload, 1, true)
opts.SetClientID(client.generateClientId())
opts.SetWill(client.appAvailabilityTopic(), unavailablePayload, 1, true)
if (config.User != nil) && (config.Password != nil) {
opts.SetUsername(*config.User)
opts.SetPassword(*config.Password)
}
opts.SetOnConnectHandler(func(client MQTT.Client) {

opts.SetOnConnectHandler(func(handle MQTT.Client) {
// This has to be in the on-connection handler, to make sure we mark ourselves as "available" upon reconnect
setAppAvailable(client, appName, logger)
client.setAppAvailable()
err := client.Subscribe()
if err != nil {
client.logger.Errorw("Cannot subscribe", "error", err)
}
})
client := MQTT.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {

client.handle = MQTT.NewClient(opts)
if token := client.handle.Connect(); token.Wait() && token.Error() != nil {
return nil, token.Error()
}
logger.Infow("Connected to MQTT", "broker", config.Broker)
switches := make([]*Switch, len(controls))
for i, control := range controls {
switches[i] = &Switch{control: control}
}
return &Client{appName: appName, handle: client, switches: switches, logger: logger}, nil
return client, nil
}

func generateClientId(appName string) string {
return fmt.Sprintf("%s-%016x", appName, rand.Uint64())
func (client *Client) generateClientId() string {
return fmt.Sprintf("%s-%016x", client.appName, rand.Uint64())
}

func (client *Client) Subscribe() error {
Expand Down Expand Up @@ -168,10 +172,10 @@ func (client *Client) setAvailable(sw *Switch, available bool) {
logger.Debugw("Published availability to MQTT")
}

func setAppAvailable(client MQTT.Client, appName string, appLogger *zap.SugaredLogger) {
topic := appAvailabilityTopic(appName)
logger := appLogger.With(zap.String("topic", topic))
token := client.Publish(topic, 1, true, generateAvailablePayload(true))
func (client *Client) setAppAvailable() {
topic := client.appAvailabilityTopic()
logger := client.logger.With(zap.String("topic", topic))
token := client.handle.Publish(topic, 1, true, generateAvailablePayload(true))
token.Wait()
if token.Error() != nil {
logger.Error("Error publishing application availability to MQTT", "error", token.Error())
Expand Down Expand Up @@ -217,8 +221,8 @@ func (client *Client) availabilityTopic(sw *Switch) string {
return fmt.Sprintf("%s/switches/%s/available", client.appName, sw.control.Name)
}

func appAvailabilityTopic(appName string) string {
return fmt.Sprintf("%s/available", appName)
func (client *Client) appAvailabilityTopic() string {
return fmt.Sprintf("%s/available", client.appName)
}

func (client *Client) syncLog() {
Expand Down

0 comments on commit 423f836

Please # to comment.