diff --git a/pkg/mqtt/client.go b/pkg/mqtt/client.go index bd36c87..b6e3d4b 100644 --- a/pkg/mqtt/client.go +++ b/pkg/mqtt/client.go @@ -1,6 +1,8 @@ package mqtt import ( + "encoding/json" + "errors" "fmt" "math/rand" "path" @@ -15,6 +17,7 @@ type Client interface { GetTopic(string) (*Topic, bool) IsConnected() bool Subscribe(string) *Topic + Publish(string, map[string]any, string) (json.RawMessage, error) Unsubscribe(string) Dispose() } @@ -71,12 +74,13 @@ func (c *client) IsConnected() bool { return c.client.IsConnectionOpen() } -func (c *client) HandleMessage(_ paho.Client, msg paho.Message) { +func (c *client) HandleMessage(topic string, payload []byte) { message := Message{ Timestamp: time.Now(), - Value: msg.Payload(), + Value: payload, } - c.topics.AddMessage(msg.Topic(), message) + + c.topics.AddMessage(topic, message) } func (c *client) GetTopic(reqPath string) (*Topic, bool) { @@ -104,9 +108,16 @@ func (c *client) Subscribe(reqPath string) *Topic { return t } - log.DefaultLogger.Debug("Subscribing to MQTT topic", "topic", t.Path) - if token := c.client.Subscribe(t.Path, 0, c.HandleMessage); token.Wait() && token.Error() != nil { - log.DefaultLogger.Error("Error subscribing to MQTT topic", "topic", t.Path, "error", token.Error()) + log.DefaultLogger.Debug("Subscribing to MQTT topic", "topic", topicPath) + + topic := resolveTopic(t.Path) + + if token := c.client.Subscribe(topic, 0, func(_ paho.Client, m paho.Message) { + // by wrapping HandleMessage we can directly get the correct topicPath for the incoming topic + // and don't need to regex it against + and #. + c.HandleMessage(topicPath, []byte(m.Payload())) + }); token.Wait() && token.Error() != nil { + log.DefaultLogger.Error("Error subscribing to MQTT topic", "topic", topicPath, "error", token.Error()) } c.topics.Store(t) return t @@ -126,11 +137,61 @@ func (c *client) Unsubscribe(reqPath string) { } log.DefaultLogger.Debug("Unsubscribing from MQTT topic", "topic", t.Path) - if token := c.client.Unsubscribe(t.Path); token.Wait() && token.Error() != nil { + + topic := resolveTopic(t.Path) + if token := c.client.Unsubscribe(topic); token.Wait() && token.Error() != nil { log.DefaultLogger.Error("Error unsubscribing from MQTT topic", "topic", t.Path, "error", token.Error()) } } +func (c *client) Publish(topic string, payload map[string]any, responseTopic string) (json.RawMessage, error) { + var response json.RawMessage + var err error + done := make(chan struct{}, 1) + + if responseTopic != "" { + tokenSub := c.client.Subscribe(responseTopic, 2, func(c paho.Client, m paho.Message) { + response = m.Payload() + done <- struct{}{} + }) + + if !tokenSub.WaitTimeout(time.Second) && tokenSub.Error() != nil { + err = errors.Join(err, tokenSub.Error()) + return response, err + } + + defer c.client.Unsubscribe(responseTopic) + } else { + done <- struct{}{} + } + + data, errMarshal := json.Marshal(&payload) + if errMarshal != nil { + err = errors.Join(err, errMarshal) + return response, err + } + + token := c.client.Publish(topic, 2, false, data) + + if token.Error() != nil { + err = errors.Join(err, token.Error()) + return response, err + } + + if !token.WaitTimeout(time.Second) { + err = errors.Join(err, errors.New("publish timeout")) + return response, err + } + + select { + case <-done: + case <-time.After(time.Second): + err = errors.Join(err, errors.New("subscribe timeout")) + } + + return response, err +} + func (c *client) Dispose() { log.DefaultLogger.Info("MQTT Disconnecting") c.client.Disconnect(250) diff --git a/pkg/mqtt/topic.go b/pkg/mqtt/topic.go index a254454..091b344 100644 --- a/pkg/mqtt/topic.go +++ b/pkg/mqtt/topic.go @@ -2,6 +2,7 @@ package mqtt import ( "path" + "strings" "sync" "time" @@ -15,10 +16,12 @@ type Message struct { // Topic represents a MQTT topic. type Topic struct { - Path string `json:"topic"` - Interval time.Duration - Messages []Message - framer *framer + Path string `json:"topic"` + Payload map[string]any `json:"payload,omitempty"` + ResponsePath string `json:"response,omitempty"` + Interval time.Duration + Messages []Message + framer *framer } // Key returns the key for the topic. @@ -97,3 +100,10 @@ func (tm *TopicMap) Store(t *Topic) { func (tm *TopicMap) Delete(key string) { tm.Map.Delete(key) } + +// replace all __PLUS__ with + and one __HASH__ with # +// Question: Why does grafana not allow + and # in query? +func resolveTopic(topic string) string { + resolvedTopic := strings.ReplaceAll(topic, "__PLUS__", "+") + return strings.Replace(resolvedTopic, "__HASH__", "#", -1) +} diff --git a/pkg/plugin/datasource_test.go b/pkg/plugin/datasource_test.go index 96f8b65..93587e8 100644 --- a/pkg/plugin/datasource_test.go +++ b/pkg/plugin/datasource_test.go @@ -2,6 +2,7 @@ package plugin_test import ( "context" + "encoding/json" "testing" "github.com/grafana/grafana-plugin-sdk-go/backend" @@ -60,5 +61,8 @@ func (c *fakeMQTTClient) IsSubscribed(_ string) bool { } func (c *fakeMQTTClient) Subscribe(_ string) *mqtt.Topic { return nil } -func (c *fakeMQTTClient) Unsubscribe(_ string) {} -func (c *fakeMQTTClient) Dispose() {} +func (c *fakeMQTTClient) Publish(string, map[string]any, string) (json.RawMessage, error) { + return json.RawMessage{}, nil +} +func (c *fakeMQTTClient) Unsubscribe(_ string) {} +func (c *fakeMQTTClient) Dispose() {} diff --git a/pkg/plugin/query.go b/pkg/plugin/query.go index af8afc7..2e8b439 100644 --- a/pkg/plugin/query.go +++ b/pkg/plugin/query.go @@ -38,13 +38,24 @@ func (ds *MQTTDatasource) query(query backend.DataQuery) backend.DataResponse { return response } - t.Interval = query.Interval + // Subscribe + if len(t.Payload) == 0 { + t.Interval = query.Interval - frame := data.NewFrame("") - frame.SetMeta(&data.FrameMeta{ - Channel: path.Join(ds.channelPrefix, t.Key()), - }) + frame := data.NewFrame("") + frame.SetMeta(&data.FrameMeta{ + Channel: path.Join(ds.channelPrefix, t.Key()), + }) - response.Frames = append(response.Frames, frame) + response.Frames = append(response.Frames, frame) + return response + } + + // Publish + resp, err := ds.Client.Publish(t.Path, t.Payload, t.ResponsePath) + + field := data.NewField("Body", data.Labels{}, []json.RawMessage{resp}) + response.Frames = append(response.Frames, data.NewFrame("Response", field)) + response.Error = err return response } diff --git a/src/datasource.ts b/src/datasource.ts index 072fa78..b5d0b65 100644 --- a/src/datasource.ts +++ b/src/datasource.ts @@ -1,9 +1,21 @@ -import { DataSourceInstanceSettings } from '@grafana/data'; -import { DataSourceWithBackend } from '@grafana/runtime'; +import { DataSourceInstanceSettings, ScopedVars } from '@grafana/data'; +import { DataSourceWithBackend, getTemplateSrv } from '@grafana/runtime'; import { MqttDataSourceOptions, MqttQuery } from './types'; export class DataSource extends DataSourceWithBackend { constructor(instanceSettings: DataSourceInstanceSettings) { super(instanceSettings); } + + applyTemplateVariables(query: MqttQuery, scopedVars: ScopedVars): Record { + let resolvedTopic = getTemplateSrv().replace(query.topic, scopedVars); + resolvedTopic = resolvedTopic.replace(/\+/gi, '__PLUS__'); + resolvedTopic = resolvedTopic.replace(/\#/gi, '__HASH__'); + const resolvedQuery: MqttQuery = { + ...query, + topic: resolvedTopic, + }; + + return resolvedQuery; + } }