diff --git a/logging/elastic.go b/logging/elastic.go index 5889f748..c548cab1 100644 --- a/logging/elastic.go +++ b/logging/elastic.go @@ -2,6 +2,7 @@ package logging import ( "context" + "encoding/json" "fmt" "strings" "time" @@ -9,16 +10,18 @@ import ( "github.com/elastic/go-elasticsearch/esapi" "github.com/elastic/go-elasticsearch/v8" "github.com/jmpsec/osctrl/settings" + "github.com/jmpsec/osctrl/types" "github.com/rs/zerolog/log" "github.com/spf13/viper" ) // ElasticConfiguration to hold all elastic configuration values type ElasticConfiguration struct { - Host string `json:"host"` - Port string `json:"port"` - IndexPrefix string `json:"indexPrefix"` - IndexString string `json:"indexString"` // Expected is %s-%s for prefix-YYYY-MM-DD + Host string `json:"host"` + Port string `json:"port"` + IndexPrefix string `json:"indexPrefix"` + DateSeparator string `json:"dateSeparator"` // Expected is . for YYYY.MM.DD + IndexSeparator string `json:"indexSeparator"` // Expected is - for prefix-YYYY.MM.DD } // LoggerElastic will be used to log data using Elastic @@ -80,7 +83,8 @@ func LoadElastic(file string) (ElasticConfiguration, error) { // IndexName - Function to return the index name func (logE *LoggerElastic) IndexName() string { now := time.Now().UTC() - return fmt.Sprintf(logE.Configuration.IndexString, logE.Configuration.IndexPrefix, now.Format("2006-01-02")) + fNow := strings.ReplaceAll(now.Format("2006-01-02"), "-", logE.Configuration.DateSeparator) + return fmt.Sprintf("%s%s%s", logE.Configuration.IndexPrefix, logE.Configuration.IndexSeparator, fNow) } // Settings - Function to prepare settings for the logger @@ -96,20 +100,41 @@ func (logE *LoggerElastic) Send(logType string, data []byte, environment, uuid s if debug { log.Debug().Msgf("DebugService: Sending %d bytes to Elastic for %s - %s", len(data), environment, uuid) } - req := esapi.IndexRequest{ - Index: logE.IndexName(), - Body: strings.NewReader(string(data)), - Refresh: "true", - } - res, err := req.Do(context.Background(), logE.Client) - if err != nil { - log.Err(err).Msg("Error indexing document") + var logs []interface{} + if logType == types.QueryLog { + // For on-demand queries, just a JSON blob with results and statuses + var result interface{} + if err := json.Unmarshal(data, &result); err != nil { + log.Err(err).Msgf("error parsing data %s", string(data)) + } + logs = append(logs, result) + } else { + // For scheduled queries, convert the array in an array of multiple events + if err := json.Unmarshal(data, &logs); err != nil { + log.Err(err).Msgf("error parsing log %s", string(data)) + } } - defer res.Body.Close() - if res.IsError() { - log.Error().Msgf("Error response from Elasticsearch: %s", res.String()) + for _, l := range logs { + jsonEvent, err := json.Marshal(l) + if err != nil { + log.Err(err).Msg("Error parsing data") + continue + } + req := esapi.IndexRequest{ + Index: logE.IndexName(), + Body: strings.NewReader(string(jsonEvent)), + Refresh: "true", + } + res, err := req.Do(context.Background(), logE.Client) + if err != nil { + log.Err(err).Msg("Error indexing document") + } + defer res.Body.Close() + if res.IsError() { + log.Error().Msgf("Error response from Elasticsearch: %s", res.String()) + } } if debug { - log.Debug().Msgf("DebugService: Sent %s to Elastic from %s:%s", logType, uuid, environment) + log.Debug().Msgf("DebugService: Sent %d bytes of %s to Elastic from %s:%s", len(data), logType, uuid, environment) } }