Skip to content

Re-add Elasticsearch Plugin #78

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
merged 1 commit into from
Jul 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions app/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"@kobsio/plugin-applications": "*",
"@kobsio/plugin-core": "*",
"@kobsio/plugin-dashboards": "*",
"@kobsio/plugin-elasticsearch": "*",
"@kobsio/plugin-prometheus": "*",
"@kobsio/plugin-resources": "*",
"@kobsio/plugin-teams": "*",
Expand Down
2 changes: 2 additions & 0 deletions app/src/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import teamsPlugin from '@kobsio/plugin-teams';
import applicationsPlugin from '@kobsio/plugin-applications';
import dashboardsPlugin from '@kobsio/plugin-dashboards';
import prometheusPlugin from '@kobsio/plugin-prometheus';
import elasticsearchPlugin from '@kobsio/plugin-elasticsearch';

ReactDOM.render(
<React.StrictMode>
Expand All @@ -18,6 +19,7 @@ ReactDOM.render(
...applicationsPlugin,
...dashboardsPlugin,
...prometheusPlugin,
...elasticsearchPlugin,
}} />
</React.StrictMode>,
document.getElementById('root')
Expand Down
15 changes: 8 additions & 7 deletions deploy/docker/kobs/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ plugins:
password:
token:

elasticsearch:
- name: Elasticsearch
description: Elasticsearch can be used for the logs of your application.
address: http://localhost:9200
username:
password:
token:
elasticsearch:
- name: elasticsearch
displayName: Elasticsearch
description: Elasticsearch can be used for the logs of your application.
address: http://localhost:9200
username:
password:
token:

jaeger:
- name: Jaeger
Expand Down
13 changes: 8 additions & 5 deletions pkg/api/plugins/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,20 @@ import (
// the plugins folder.
"github.com/kobsio/kobs/plugins/applications"
"github.com/kobsio/kobs/plugins/dashboards"
"github.com/kobsio/kobs/plugins/elasticsearch"
"github.com/kobsio/kobs/plugins/prometheus"
"github.com/kobsio/kobs/plugins/resources"
"github.com/kobsio/kobs/plugins/teams"
)

// Config holds the configuration for all plugins. We have to add the configuration for all the imported plugins.
type Config struct {
Applications applications.Config `yaml:"applications"`
Resources resources.Config `yaml:"resources"`
Teams teams.Config `yaml:"teams"`
Dashboards dashboards.Config `yaml:"dashboards"`
Prometheus prometheus.Config `yaml:"prometheus"`
Applications applications.Config `yaml:"applications"`
Resources resources.Config `yaml:"resources"`
Teams teams.Config `yaml:"teams"`
Dashboards dashboards.Config `yaml:"dashboards"`
Prometheus prometheus.Config `yaml:"prometheus"`
Elasticsearch elasticsearch.Config `yaml:"elasticsearch"`
}

// Router implements the router for the plugins package. This only registeres one route which is used to return all the
Expand Down Expand Up @@ -54,6 +56,7 @@ func Register(clusters *clusters.Clusters, config Config) chi.Router {
router.Mount(teams.Route, teams.Register(clusters, router.plugins, config.Teams))
router.Mount(dashboards.Route, dashboards.Register(clusters, router.plugins, config.Dashboards))
router.Mount(prometheus.Route, prometheus.Register(clusters, router.plugins, config.Prometheus))
router.Mount(elasticsearch.Route, elasticsearch.Register(clusters, router.plugins, config.Elasticsearch))

return router
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const DashboardToolbar: React.FunctionComponent<IDashboardToolbarProps> = ({

return (
<Card style={{ maxWidth: '100%' }}>
<Toolbar id="dashboard-toolbar">
<Toolbar id="dashboard-toolbar" style={{ paddingBottom: '0px', zIndex: 300 }}>
<ToolbarContent>
<ToolbarToggleGroup style={{ width: '100%' }} toggleIcon={<FilterIcon />} breakpoint="lg">
{variables.map((variable, index) =>
Expand Down
114 changes: 114 additions & 0 deletions plugins/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package elasticsearch

import (
"net/http"
"strconv"

"github.com/kobsio/kobs/pkg/api/clusters"
"github.com/kobsio/kobs/pkg/api/middleware/errresponse"
"github.com/kobsio/kobs/pkg/api/plugins/plugin"
"github.com/kobsio/kobs/plugins/elasticsearch/pkg/instance"

"github.com/go-chi/chi/v5"
"github.com/go-chi/render"
"github.com/sirupsen/logrus"
)

// Route is the route under which the plugin should be registered in our router for the rest api.
const Route = "/elasticsearch"

var (
log = logrus.WithFields(logrus.Fields{"package": "elasticsearch"})
)

// Config is the structure of the configuration for the elasticsearch plugin.
type Config []instance.Config

// Router implements the router for the resources plugin, which can be registered in the router for our rest api.
type Router struct {
*chi.Mux
clusters *clusters.Clusters
instances []*instance.Instance
}

func (router *Router) getInstance(name string) *instance.Instance {
for _, i := range router.instances {
if i.Name == name {
return i
}
}

return nil
}

// getLogs returns the raw documents for a given query from Elasticsearch. The result also contains the distribution of
// the documents in the given time range. The name of the Elasticsearch instance must be set via the name path
// parameter, all other values like the query, scrollID, start and end time are set via query parameters. These
// parameters are then passed to the GetLogs function of the Elasticsearch instance, which returns the documents and
// buckets.
func (router *Router) getLogs(w http.ResponseWriter, r *http.Request) {
name := chi.URLParam(r, "name")
query := r.URL.Query().Get("query")
scrollID := r.URL.Query().Get("scrollID")
timeStart := r.URL.Query().Get("timeStart")
timeEnd := r.URL.Query().Get("timeEnd")

log.WithFields(logrus.Fields{"name": name, "query": query, "scrollID": scrollID, "timeStart": timeStart, "timeEnd": timeEnd}).Tracef("getLogs")

i := router.getInstance(name)
if i == nil {
render.Render(w, r, errresponse.Render(nil, http.StatusBadRequest, "could not find instance name"))
return
}

parsedTimeStart, err := strconv.ParseInt(timeStart, 10, 64)
if err != nil {
render.Render(w, r, errresponse.Render(nil, http.StatusBadRequest, "could not parse start time"))
return
}

parsedTimeEnd, err := strconv.ParseInt(timeEnd, 10, 64)
if err != nil {
render.Render(w, r, errresponse.Render(nil, http.StatusBadRequest, "could not parse end time"))
return
}

data, err := i.GetLogs(r.Context(), query, scrollID, parsedTimeStart, parsedTimeEnd)
if err != nil {
render.Render(w, r, errresponse.Render(err, http.StatusInternalServerError, "could not get logs"))
return
}

render.JSON(w, r, data)
}

// Register returns a new router which can be used in the router for the kobs rest api.
func Register(clusters *clusters.Clusters, plugins *plugin.Plugins, config Config) chi.Router {
var instances []*instance.Instance

for _, cfg := range config {
instance, err := instance.New(cfg)
if err != nil {
log.WithError(err).WithFields(logrus.Fields{"name": cfg.Name}).Fatalf("Could not create Elasticsearch instance")
}

instances = append(instances, instance)

plugins.Append(plugin.Plugin{
Name: cfg.Name,
DisplayName: cfg.DisplayName,
Description: cfg.Description,
Type: "elasticsearch",
})
}

router := Router{
chi.NewRouter(),
clusters,
instances,
}

router.Get("/logs/{name}", router.getLogs)

return router
}
25 changes: 25 additions & 0 deletions plugins/elasticsearch/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"name": "@kobsio/plugin-elasticsearch",
"version": "0.0.0",
"license": "MIT",
"private": false,
"main": "./lib/index.js",
"module": "./lib-esm/index.js",
"types": "./lib/index.d.ts",
"scripts": {
"plugin": "tsc && tsc --build tsconfig.esm.json && cp -r src/assets lib && cp -r src/assets lib-esm"
},
"devDependencies": {
"@kobsio/plugin-core": "*",
"@nivo/bar": "^0.72.0",
"@patternfly/react-core": "^4.128.2",
"@types/react": "^17.0.0",
"@types/react-dom": "^17.0.0",
"@types/react-router-dom": "^5.1.7",
"react": "^17.0.2",
"react-dom": "^17.0.2",
"react-query": "^3.17.2",
"react-router-dom": "^5.2.0",
"typescript": "^4.3.4"
}
}
21 changes: 21 additions & 0 deletions plugins/elasticsearch/pkg/instance/helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package instance

import (
"time"
)

// formateTime is used to format the shown time for a bucket. This is required because the bar chart component in the
// frontend can not handle time series very well, so that we have to use strings instead of times.
func formateTime(timestamp, timeDiff int64) string {
timeDiff = timeDiff / 1000

if timeDiff < 3600 {
return time.Unix(timestamp/1000, 0).Format("15:04:05")
} else if timeDiff < 86400 {
return time.Unix(timestamp/1000, 0).Format("15:04")
} else if timeDiff < 604800 {
return time.Unix(timestamp/1000, 0).Format("01-02 15:04")
}

return time.Unix(timestamp/1000, 0).Format("01-02")
}
142 changes: 142 additions & 0 deletions plugins/elasticsearch/pkg/instance/instance.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package instance

import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"strings"

"github.com/kobsio/kobs/pkg/api/middleware/roundtripper"

"github.com/sirupsen/logrus"
)

var (
log = logrus.WithFields(logrus.Fields{"package": "elasticsearch"})
)

// Config is the structure of the configuration for a single Elasticsearch instance.
type Config struct {
Name string `yaml:"name"`
DisplayName string `yaml:"displayName"`
Description string `yaml:"description"`
Address string `yaml:"address"`
Username string `yaml:"username"`
Password string `yaml:"password"`
Token string `yaml:"token"`
}

// Instance represents a single Elasticsearch instance, which can be added via the configuration file.
type Instance struct {
Name string
address string
client *http.Client
}

// GetLogs returns the raw log documents and the buckets for the distribution of the logs accross the selected time
// range. We have to pass a query, start and end time to the function. The scrollID can be an empty string to start a
// new query. If a scrollID is provided it will be used for pagination.
func (i *Instance) GetLogs(ctx context.Context, query, scrollID string, timeStart, timeEnd int64) (*Data, error) {
var err error
var body []byte
var url string

if scrollID == "" {
url = fmt.Sprintf("%s/_search?scroll=15m", i.address)
body = []byte(fmt.Sprintf(`{"size":100,"sort":[{"@timestamp":{"order":"desc"}}],"query":{"bool":{"must":[{"range":{"@timestamp":{"gte":"%d","lte":"%d"}}},{"query_string":{"query":"%s"}}]}},"aggs":{"logcount":{"auto_date_histogram":{"field":"@timestamp","buckets":30}}}}`, timeStart*1000, timeEnd*1000, strings.ReplaceAll(query, "\"", "\\\"")))
} else {
url = fmt.Sprintf("%s/_search/scroll", i.address)
body = []byte(`{"scroll" : "15m", "scroll_id" : "` + scrollID + `"}`)
}

log.WithFields(logrus.Fields{"query": string(body)}).Debugf("Run Elasticsearch query")

req, err := http.NewRequest(http.MethodPost, url, bytes.NewBuffer(body))
if err != nil {
return nil, err
}
req.Header.Add("Content-Type", "application/json")

resp, err := i.client.Do(req)
if err != nil {
return nil, err
}

defer resp.Body.Close()

if resp.StatusCode >= 200 && resp.StatusCode < 300 {
var res Response

err = json.NewDecoder(resp.Body).Decode(&res)
if err != nil {
return nil, err
}

var buckets []Bucket

// When the response from the Elasticsearch API contains a list of buckets, we have to transform them into the
// format needed by our UI.
if len(res.Aggregations.LogCount.Buckets) > 0 {
var timeDiff = res.Aggregations.LogCount.Buckets[len(res.Aggregations.LogCount.Buckets)-1].Key - res.Aggregations.LogCount.Buckets[0].Key
for _, bucket := range res.Aggregations.LogCount.Buckets {
buckets = append(buckets, Bucket{
Time: formateTime(bucket.Key, timeDiff),
Documents: bucket.DocCount,
})
}
}

data := &Data{
ScrollID: res.ScrollID,
Took: res.Took,
Hits: res.Hits.Total.Value,
Documents: res.Hits.Hits,
Buckets: buckets,
}

log.WithFields(logrus.Fields{"scrollID": data.ScrollID, "took": data.Took, "hits": data.Hits, "documents": len(data.Documents), "buckets": len(data.Buckets)}).Debugf("Elasticsearch query results")

return data, nil
}

var res ResponseError

err = json.NewDecoder(resp.Body).Decode(&res)
if err != nil {
return nil, err
}

log.WithFields(logrus.Fields{"type": res.Error.Type, "reason": res.Error.Reason}).Error("The query returned an error.")

return nil, fmt.Errorf("%s: %s", res.Error.Type, res.Error.Reason)
}

// New returns a new Elasticsearch instance for the given configuration.
func New(config Config) (*Instance, error) {
roundTripper := roundtripper.DefaultRoundTripper

if config.Username != "" && config.Password != "" {
roundTripper = roundtripper.BasicAuthTransport{
Transport: roundTripper,
Username: config.Username,
Password: config.Password,
}
}

if config.Token != "" {
roundTripper = roundtripper.TokenAuthTransporter{
Transport: roundTripper,
Token: config.Token,
}
}

return &Instance{
Name: config.Name,
address: config.Address,
client: &http.Client{
Transport: roundTripper,
},
}, nil
}
Loading