From ca3855c3db438169ce258774ded0fcd1db403f79 Mon Sep 17 00:00:00 2001 From: Javier Marcos <1271349+javuto@users.noreply.github.com> Date: Wed, 6 Nov 2024 22:36:48 +0100 Subject: [PATCH 1/5] Fix for data serialization when sending to elastic --- logging/elastic.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/logging/elastic.go b/logging/elastic.go index 5889f748..68ed5e4d 100644 --- a/logging/elastic.go +++ b/logging/elastic.go @@ -1,9 +1,9 @@ package logging import ( + "bytes" "context" "fmt" - "strings" "time" "github.com/elastic/go-elasticsearch/esapi" @@ -98,7 +98,7 @@ func (logE *LoggerElastic) Send(logType string, data []byte, environment, uuid s } req := esapi.IndexRequest{ Index: logE.IndexName(), - Body: strings.NewReader(string(data)), + Body: bytes.NewReader(data), Refresh: "true", } res, err := req.Do(context.Background(), logE.Client) From 21abdb54c6a1e4ddb16d46a31fb1ca7727dbe35d Mon Sep 17 00:00:00 2001 From: Javier Marcos <1271349+javuto@users.noreply.github.com> Date: Wed, 6 Nov 2024 22:54:07 +0100 Subject: [PATCH 2/5] Skip sending JSON string and sending object instead --- logging/elastic.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/logging/elastic.go b/logging/elastic.go index 68ed5e4d..20286e7d 100644 --- a/logging/elastic.go +++ b/logging/elastic.go @@ -3,6 +3,7 @@ package logging import ( "bytes" "context" + "encoding/json" "fmt" "time" @@ -96,6 +97,10 @@ func (logE *LoggerElastic) Send(logType string, data []byte, environment, uuid s if debug { log.Debug().Msgf("DebugService: Sending %d bytes to Elastic for %s - %s", len(data), environment, uuid) } + var logs []interface{} + if err := json.Unmarshal(data, &logs); err != nil { + log.Err(err).Msg("Error unmarshalling data") + } req := esapi.IndexRequest{ Index: logE.IndexName(), Body: bytes.NewReader(data), From 4054dca8d1bf3b479722a22e3f78385c0480805f Mon Sep 17 00:00:00 2001 From: Javier Marcos <1271349+javuto@users.noreply.github.com> Date: Wed, 6 Nov 2024 23:08:08 +0100 Subject: [PATCH 3/5] Sending single events to elastic --- logging/elastic.go | 50 +++++++++++++++++++++++++++++++--------------- 1 file changed, 34 insertions(+), 16 deletions(-) diff --git a/logging/elastic.go b/logging/elastic.go index 20286e7d..3a66d2b6 100644 --- a/logging/elastic.go +++ b/logging/elastic.go @@ -1,15 +1,16 @@ package logging import ( - "bytes" "context" "encoding/json" "fmt" + "strings" "time" "github.com/elastic/go-elasticsearch/esapi" "github.com/elastic/go-elasticsearch/v8" "github.com/jmpsec/osctrl/settings" + "github.com/jmpsec/osctrl/types" "github.com/rs/zerolog/log" "github.com/spf13/viper" ) @@ -98,23 +99,40 @@ func (logE *LoggerElastic) Send(logType string, data []byte, environment, uuid s log.Debug().Msgf("DebugService: Sending %d bytes to Elastic for %s - %s", len(data), environment, uuid) } var logs []interface{} - if err := json.Unmarshal(data, &logs); err != nil { - log.Err(err).Msg("Error unmarshalling data") - } - req := esapi.IndexRequest{ - Index: logE.IndexName(), - Body: bytes.NewReader(data), - Refresh: "true", - } - res, err := req.Do(context.Background(), logE.Client) - if err != nil { - log.Err(err).Msg("Error indexing document") + if logType == types.QueryLog { + // For on-demand queries, just a JSON blob with results and statuses + var result interface{} + if err := json.Unmarshal(data, &result); err != nil { + log.Err(err).Msgf("error parsing data %s", string(data)) + } + logs = append(logs, result) + } else { + // For scheduled queries, convert the array in an array of multiple events + if err := json.Unmarshal(data, &logs); err != nil { + log.Err(err).Msgf("error parsing log %s", string(data)) + } } - defer res.Body.Close() - if res.IsError() { - log.Error().Msgf("Error response from Elasticsearch: %s", res.String()) + for _, l := range logs { + jsonEvent, err := json.Marshal(l) + if err != nil { + log.Err(err).Msg("Error parsing data") + continue + } + req := esapi.IndexRequest{ + Index: logE.IndexName(), + Body: strings.NewReader(string(jsonEvent)), + Refresh: "true", + } + res, err := req.Do(context.Background(), logE.Client) + if err != nil { + log.Err(err).Msg("Error indexing document") + } + defer res.Body.Close() + if res.IsError() { + log.Error().Msgf("Error response from Elasticsearch: %s", res.String()) + } } if debug { - log.Debug().Msgf("DebugService: Sent %s to Elastic from %s:%s", logType, uuid, environment) + log.Debug().Msgf("DebugService: Sent %d bytes of %s to Elastic from %s:%s", len(data), logType, uuid, environment) } } From 71e35ae98447d1e1054281a1a5cc7eba78029e13 Mon Sep 17 00:00:00 2001 From: Javier Marcos <1271349+javuto@users.noreply.github.com> Date: Wed, 6 Nov 2024 23:45:35 +0100 Subject: [PATCH 4/5] Format index name with separator --- logging/elastic.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/logging/elastic.go b/logging/elastic.go index 3a66d2b6..5bafa95a 100644 --- a/logging/elastic.go +++ b/logging/elastic.go @@ -17,10 +17,10 @@ import ( // ElasticConfiguration to hold all elastic configuration values type ElasticConfiguration struct { - Host string `json:"host"` - Port string `json:"port"` - IndexPrefix string `json:"indexPrefix"` - IndexString string `json:"indexString"` // Expected is %s-%s for prefix-YYYY-MM-DD + Host string `json:"host"` + Port string `json:"port"` + IndexPrefix string `json:"indexPrefix"` + IndexSeparator string `json:"indexSeparator"` // Expected is . for prefix.YYYY.MM.DD } // LoggerElastic will be used to log data using Elastic @@ -82,7 +82,8 @@ func LoadElastic(file string) (ElasticConfiguration, error) { // IndexName - Function to return the index name func (logE *LoggerElastic) IndexName() string { now := time.Now().UTC() - return fmt.Sprintf(logE.Configuration.IndexString, logE.Configuration.IndexPrefix, now.Format("2006-01-02")) + fNow := strings.ReplaceAll(now.Format("2006-01-02"), "-", logE.Configuration.IndexSeparator) + return fmt.Sprintf("%s%s%s", logE.Configuration.IndexPrefix, logE.Configuration.IndexSeparator, fNow) } // Settings - Function to prepare settings for the logger From 33d8ee81ecf810ffb3d8b9900508d7aaee6f8d3f Mon Sep 17 00:00:00 2001 From: Javier Marcos <1271349+javuto@users.noreply.github.com> Date: Wed, 6 Nov 2024 23:57:55 +0100 Subject: [PATCH 5/5] Having date separator and index separator --- logging/elastic.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/logging/elastic.go b/logging/elastic.go index 5bafa95a..c548cab1 100644 --- a/logging/elastic.go +++ b/logging/elastic.go @@ -20,7 +20,8 @@ type ElasticConfiguration struct { Host string `json:"host"` Port string `json:"port"` IndexPrefix string `json:"indexPrefix"` - IndexSeparator string `json:"indexSeparator"` // Expected is . for prefix.YYYY.MM.DD + DateSeparator string `json:"dateSeparator"` // Expected is . for YYYY.MM.DD + IndexSeparator string `json:"indexSeparator"` // Expected is - for prefix-YYYY.MM.DD } // LoggerElastic will be used to log data using Elastic @@ -82,7 +83,7 @@ func LoadElastic(file string) (ElasticConfiguration, error) { // IndexName - Function to return the index name func (logE *LoggerElastic) IndexName() string { now := time.Now().UTC() - fNow := strings.ReplaceAll(now.Format("2006-01-02"), "-", logE.Configuration.IndexSeparator) + fNow := strings.ReplaceAll(now.Format("2006-01-02"), "-", logE.Configuration.DateSeparator) return fmt.Sprintf("%s%s%s", logE.Configuration.IndexPrefix, logE.Configuration.IndexSeparator, fNow) }