From e52ecffbf69a572593504ea9acc4ff65854a3e9a Mon Sep 17 00:00:00 2001
From: Pavol Loffay
Date: Tue, 23 Jan 2018 10:05:20 +0100
Subject: [PATCH] Use elasticsearch bulk API (#656)
* Use elasticsearch bulk API
Signed-off-by: Pavol Loffay
* Fix review comments
Signed-off-by: Pavol Loffay
---
Makefile | 10 ++
cmd/collector/main.go | 8 +
pkg/es/client.go | 6 +-
pkg/es/config/config.go | 61 +++++--
pkg/es/mocks/Client.go | 16 +-
pkg/es/mocks/IndexService.go | 32 +---
pkg/es/mocks/IndicesCreateService.go | 2 +-
pkg/es/mocks/IndicesExistsService.go | 2 +-
pkg/es/mocks/MultiSearchService.go | 2 +-
pkg/es/mocks/SearchService.go | 2 +-
pkg/es/wrapper.go | 36 ++--
plugin/storage/cassandra/spanstore/writer.go | 6 +
.../cassandra/spanstore/writer_test.go | 8 +
plugin/storage/es/dependencystore/storage.go | 18 +-
.../es/dependencystore/storage_test.go | 6 +-
plugin/storage/es/factory.go | 2 +-
plugin/storage/es/factory_test.go | 2 +-
plugin/storage/es/options.go | 54 ++++--
.../storage/es/spanstore/service_operation.go | 23 +--
.../es/spanstore/service_operation_test.go | 32 +---
plugin/storage/es/spanstore/writer.go | 31 ++--
plugin/storage/es/spanstore/writer_test.go | 156 +++---------------
.../integration/es_integration_test.go | 13 +-
23 files changed, 241 insertions(+), 287 deletions(-)
diff --git a/Makefile b/Makefile
index f44b888b60f..26965fb296f 100644
--- a/Makefile
+++ b/Makefile
@@ -45,6 +45,8 @@ COLORIZE=$(SED) ''/PASS/s//$(PASS)/'' | $(SED) ''/FAIL/s//$(FAIL)/''
DOCKER_NAMESPACE?=jaegertracing
DOCKER_TAG?=latest
+MOCKERY=mockery
+
.DEFAULT_GOAL := test-and-lint
.PHONY: test-and-lint
@@ -236,3 +238,11 @@ thrift-image:
generate-zipkin-swagger: idl-submodule
$(SWAGGER) generate server -f ./idl/swagger/zipkin2-api.yaml -t $(SWAGGER_GEN_DIR) -O PostSpans --exclude-main
rm $(SWAGGER_GEN_DIR)/restapi/operations/post_spans_urlbuilder.go $(SWAGGER_GEN_DIR)/restapi/server.go $(SWAGGER_GEN_DIR)/restapi/configure_zipkin.go $(SWAGGER_GEN_DIR)/models/trace.go $(SWAGGER_GEN_DIR)/models/list_of_traces.go $(SWAGGER_GEN_DIR)/models/dependency_link.go
+
+.PHONY: install-mockery
+install-mockery:
+ go get github.com/vektra/mockery
+
+.PHONY: generate-mocks
+generate-mocks: install-mockery
+ $(MOCKERY) -all -dir ./pkg/es/ -output ./pkg/es/mocks && rm pkg/es/mocks/ClientBuilder.go
diff --git a/cmd/collector/main.go b/cmd/collector/main.go
index bc3d1d696ca..27a5d00be74 100644
--- a/cmd/collector/main.go
+++ b/cmd/collector/main.go
@@ -16,6 +16,7 @@ package main
import (
"fmt"
+ "io"
"log"
"net"
"net/http"
@@ -148,6 +149,13 @@ func main() {
hc.Ready()
select {
case <-signalsChannel:
+ if closer, ok := spanWriter.(io.Closer); ok {
+ err := closer.Close()
+ if err != nil {
+ logger.Error("Failed to close span writer", zap.Error(err))
+ }
+ }
+
logger.Info("Jaeger Collector is finishing")
}
return nil
diff --git a/pkg/es/client.go b/pkg/es/client.go
index 4472eb8d30d..30e03aa9cbb 100644
--- a/pkg/es/client.go
+++ b/pkg/es/client.go
@@ -16,6 +16,7 @@ package es
import (
"context"
+ "io"
"gopkg.in/olivere/elastic.v5"
)
@@ -27,6 +28,7 @@ type Client interface {
Index() IndexService
Search(indices ...string) SearchService
MultiSearch() MultiSearchService
+ io.Closer
}
// IndicesExistsService is an abstraction for elastic.IndicesExistsService
@@ -40,13 +42,13 @@ type IndicesCreateService interface {
Do(ctx context.Context) (*elastic.IndicesCreateResult, error)
}
-// IndexService is an abstraction for elastic.IndexService
+// IndexService is an abstraction for elastic BulkService
type IndexService interface {
Index(index string) IndexService
Type(typ string) IndexService
Id(id string) IndexService
BodyJson(body interface{}) IndexService
- Do(ctx context.Context) (*elastic.IndexResponse, error)
+ Add()
}
// SearchService is an abstraction for elastic.SearchService
diff --git a/pkg/es/config/config.go b/pkg/es/config/config.go
index 28cddb57e15..d37ec99ab7b 100644
--- a/pkg/es/config/config.go
+++ b/pkg/es/config/config.go
@@ -15,9 +15,12 @@
package config
import (
+ "bytes"
+ "context"
"time"
"github.com/pkg/errors"
+ "go.uber.org/zap"
"gopkg.in/olivere/elastic.v5"
"github.com/jaegertracing/jaeger/pkg/es"
@@ -25,25 +28,29 @@ import (
// Configuration describes the configuration properties needed to connect to an ElasticSearch cluster
type Configuration struct {
- Servers []string
- Username string
- Password string
- Sniffer bool // https://github.com/olivere/elastic/wiki/Sniffing
- MaxSpanAge time.Duration `yaml:"max_span_age"` // configures the maximum lookback on span reads
- NumShards int64 `yaml:"shards"`
- NumReplicas int64 `yaml:"replicas"`
+ Servers []string
+ Username string
+ Password string
+ Sniffer bool // https://github.com/olivere/elastic/wiki/Sniffing
+ MaxSpanAge time.Duration `yaml:"max_span_age"` // configures the maximum lookback on span reads
+ NumShards int64 `yaml:"shards"`
+ NumReplicas int64 `yaml:"replicas"`
+ BulkSize int
+ BulkWorkers int
+ BulkActions int
+ BulkFlushInterval time.Duration
}
// ClientBuilder creates new es.Client
type ClientBuilder interface {
- NewClient() (es.Client, error)
+ NewClient(logger *zap.Logger) (es.Client, error)
GetNumShards() int64
GetNumReplicas() int64
GetMaxSpanAge() time.Duration
}
// NewClient creates a new ElasticSearch client
-func (c *Configuration) NewClient() (es.Client, error) {
+func (c *Configuration) NewClient(logger *zap.Logger) (es.Client, error) {
if len(c.Servers) < 1 {
return nil, errors.New("No servers specified")
}
@@ -51,7 +58,29 @@ func (c *Configuration) NewClient() (es.Client, error) {
if err != nil {
return nil, err
}
- return es.WrapESClient(rawClient), nil
+ service, err := rawClient.BulkProcessor().
+ After(func(id int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) {
+ if err != nil {
+ var buffer bytes.Buffer
+ for i, r := range requests {
+ buffer.WriteString(r.String())
+ if i+1 < len(requests) {
+ buffer.WriteByte('\n')
+ }
+ }
+ logger.Error("Elasticsearch could not process bulk request", zap.Error(err),
+ zap.Any("response", response), zap.String("requests", buffer.String()))
+ }
+ }).
+ BulkSize(c.BulkSize).
+ Workers(c.BulkWorkers).
+ BulkActions(c.BulkActions).
+ FlushInterval(c.BulkFlushInterval).
+ Do(context.Background())
+ if err != nil {
+ return nil, err
+ }
+ return es.WrapESClient(rawClient, service), nil
}
// ApplyDefaults copies settings from source unless its own value is non-zero.
@@ -74,6 +103,18 @@ func (c *Configuration) ApplyDefaults(source *Configuration) {
if c.NumReplicas == 0 {
c.NumReplicas = source.NumReplicas
}
+ if c.BulkSize == 0 {
+ c.BulkSize = source.BulkSize
+ }
+ if c.BulkWorkers == 0 {
+ c.BulkWorkers = source.BulkWorkers
+ }
+ if c.BulkActions == 0 {
+ c.BulkActions = source.BulkActions
+ }
+ if c.BulkFlushInterval == 0 {
+ c.BulkFlushInterval = source.BulkFlushInterval
+ }
}
// GetNumShards returns number of shards from Configuration
diff --git a/pkg/es/mocks/Client.go b/pkg/es/mocks/Client.go
index 90783af50c5..fd4d82be937 100644
--- a/pkg/es/mocks/Client.go
+++ b/pkg/es/mocks/Client.go
@@ -1,6 +1,6 @@
// Code generated by mockery v1.0.0
-// Copyright (c) 2017 The Jaeger Authors.
+// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -24,6 +24,20 @@ type Client struct {
mock.Mock
}
+// Close provides a mock function with given fields:
+func (_m *Client) Close() error {
+ ret := _m.Called()
+
+ var r0 error
+ if rf, ok := ret.Get(0).(func() error); ok {
+ r0 = rf()
+ } else {
+ r0 = ret.Error(0)
+ }
+
+ return r0
+}
+
// CreateIndex provides a mock function with given fields: index
func (_m *Client) CreateIndex(index string) es.IndicesCreateService {
ret := _m.Called(index)
diff --git a/pkg/es/mocks/IndexService.go b/pkg/es/mocks/IndexService.go
index bf49c20d176..a17caf23533 100644
--- a/pkg/es/mocks/IndexService.go
+++ b/pkg/es/mocks/IndexService.go
@@ -1,6 +1,6 @@
// Code generated by mockery v1.0.0
-// Copyright (c) 2017 The Jaeger Authors.
+// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -16,8 +16,6 @@
package mocks
-import context "context"
-import elastic "gopkg.in/olivere/elastic.v5"
import es "github.com/jaegertracing/jaeger/pkg/es"
import mock "github.com/stretchr/testify/mock"
@@ -26,6 +24,11 @@ type IndexService struct {
mock.Mock
}
+// Add provides a mock function with given fields:
+func (_m *IndexService) Add() {
+ _m.Called()
+}
+
// BodyJson provides a mock function with given fields: body
func (_m *IndexService) BodyJson(body interface{}) es.IndexService {
ret := _m.Called(body)
@@ -42,29 +45,6 @@ func (_m *IndexService) BodyJson(body interface{}) es.IndexService {
return r0
}
-// Do provides a mock function with given fields: ctx
-func (_m *IndexService) Do(ctx context.Context) (*elastic.IndexResponse, error) {
- ret := _m.Called(ctx)
-
- var r0 *elastic.IndexResponse
- if rf, ok := ret.Get(0).(func(context.Context) *elastic.IndexResponse); ok {
- r0 = rf(ctx)
- } else {
- if ret.Get(0) != nil {
- r0 = ret.Get(0).(*elastic.IndexResponse)
- }
- }
-
- var r1 error
- if rf, ok := ret.Get(1).(func(context.Context) error); ok {
- r1 = rf(ctx)
- } else {
- r1 = ret.Error(1)
- }
-
- return r0, r1
-}
-
// Id provides a mock function with given fields: id
func (_m *IndexService) Id(id string) es.IndexService {
ret := _m.Called(id)
diff --git a/pkg/es/mocks/IndicesCreateService.go b/pkg/es/mocks/IndicesCreateService.go
index f9b9a1751cf..5c2fcda996f 100644
--- a/pkg/es/mocks/IndicesCreateService.go
+++ b/pkg/es/mocks/IndicesCreateService.go
@@ -1,6 +1,6 @@
// Code generated by mockery v1.0.0
-// Copyright (c) 2017 The Jaeger Authors.
+// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
diff --git a/pkg/es/mocks/IndicesExistsService.go b/pkg/es/mocks/IndicesExistsService.go
index 415a526e6a5..43234846716 100644
--- a/pkg/es/mocks/IndicesExistsService.go
+++ b/pkg/es/mocks/IndicesExistsService.go
@@ -1,6 +1,6 @@
// Code generated by mockery v1.0.0
-// Copyright (c) 2017 The Jaeger Authors.
+// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
diff --git a/pkg/es/mocks/MultiSearchService.go b/pkg/es/mocks/MultiSearchService.go
index a565e0a4a47..fbb8db3d7cb 100644
--- a/pkg/es/mocks/MultiSearchService.go
+++ b/pkg/es/mocks/MultiSearchService.go
@@ -1,6 +1,6 @@
// Code generated by mockery v1.0.0
-// Copyright (c) 2017 The Jaeger Authors.
+// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
diff --git a/pkg/es/mocks/SearchService.go b/pkg/es/mocks/SearchService.go
index 6bb72f59333..46d6b84c101 100644
--- a/pkg/es/mocks/SearchService.go
+++ b/pkg/es/mocks/SearchService.go
@@ -1,6 +1,6 @@
// Code generated by mockery v1.0.0
-// Copyright (c) 2017 The Jaeger Authors.
+// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
diff --git a/pkg/es/wrapper.go b/pkg/es/wrapper.go
index c29aa22dc37..cba08f67115 100644
--- a/pkg/es/wrapper.go
+++ b/pkg/es/wrapper.go
@@ -24,12 +24,13 @@ import (
// ESClient is a wrapper around elastic.Client
type ESClient struct {
- client *elastic.Client
+ client *elastic.Client
+ bulkService *elastic.BulkProcessor
}
// WrapESClient creates a ESClient out of *elastic.Client.
-func WrapESClient(client *elastic.Client) ESClient {
- return ESClient{client: client}
+func WrapESClient(client *elastic.Client, s *elastic.BulkProcessor) ESClient {
+ return ESClient{client: client, bulkService: s}
}
// IndexExists calls this function to internal client.
@@ -44,7 +45,8 @@ func (c ESClient) CreateIndex(index string) IndicesCreateService {
// Index calls this function to internal client.
func (c ESClient) Index() IndexService {
- return WrapESIndexService(c.client.Index())
+ r := elastic.NewBulkIndexRequest()
+ return WrapESIndexService(r, c.bulkService)
}
// Search calls this function to internal client.
@@ -57,6 +59,11 @@ func (c ESClient) MultiSearch() MultiSearchService {
return WrapESMultiSearchService(c.client.MultiSearch())
}
+// Close closes ESClient and flushes all data to the storage.
+func (c ESClient) Close() error {
+ return c.bulkService.Close()
+}
+
// ---
// ESIndicesExistsService is a wrapper around elastic.IndicesExistsService
@@ -100,37 +107,38 @@ func (c ESIndicesCreateService) Do(ctx context.Context) (*elastic.IndicesCreateR
// ESIndexService is a wrapper around elastic.ESIndexService
type ESIndexService struct {
- indexService *elastic.IndexService
+ bulkIndexReq *elastic.BulkIndexRequest
+ bulkService *elastic.BulkProcessor
}
// WrapESIndexService creates an ESIndexService out of *elastic.ESIndexService.
-func WrapESIndexService(indexService *elastic.IndexService) ESIndexService {
- return ESIndexService{indexService: indexService}
+func WrapESIndexService(indexService *elastic.BulkIndexRequest, bulkService *elastic.BulkProcessor) ESIndexService {
+ return ESIndexService{bulkIndexReq: indexService, bulkService: bulkService}
}
// Index calls this function to internal service.
func (i ESIndexService) Index(index string) IndexService {
- return WrapESIndexService(i.indexService.Index(index))
+ return WrapESIndexService(i.bulkIndexReq.Index(index), i.bulkService)
}
// Type calls this function to internal service.
func (i ESIndexService) Type(typ string) IndexService {
- return WrapESIndexService(i.indexService.Type(typ))
+ return WrapESIndexService(i.bulkIndexReq.Type(typ), i.bulkService)
}
// Id calls this function to internal service.
func (i ESIndexService) Id(id string) IndexService {
- return WrapESIndexService(i.indexService.Id(id))
+ return WrapESIndexService(i.bulkIndexReq.Id(id), i.bulkService)
}
// BodyJson calls this function to internal service.
func (i ESIndexService) BodyJson(body interface{}) IndexService {
- return WrapESIndexService(i.indexService.BodyJson(body))
+ return WrapESIndexService(i.bulkIndexReq.Doc(body), i.bulkService)
}
-// Do calls this function to internal service.
-func (i ESIndexService) Do(ctx context.Context) (*elastic.IndexResponse, error) {
- return i.indexService.Do(ctx)
+// Add adds the request to bulk service
+func (i ESIndexService) Add() {
+ i.bulkService.Add(i.bulkIndexReq)
}
// ---
diff --git a/plugin/storage/cassandra/spanstore/writer.go b/plugin/storage/cassandra/spanstore/writer.go
index 555e6c41adf..5cf3ab962ae 100644
--- a/plugin/storage/cassandra/spanstore/writer.go
+++ b/plugin/storage/cassandra/spanstore/writer.go
@@ -116,6 +116,12 @@ func NewSpanWriter(
}
}
+// Close closes SpanWriter
+func (s *SpanWriter) Close() error {
+ s.session.Close()
+ return nil
+}
+
// WriteSpan saves the span into Cassandra
func (s *SpanWriter) WriteSpan(span *model.Span) error {
ds := dbmodel.FromDomain(span)
diff --git a/plugin/storage/cassandra/spanstore/writer_test.go b/plugin/storage/cassandra/spanstore/writer_test.go
index 005c0126aca..67a1233a2d0 100644
--- a/plugin/storage/cassandra/spanstore/writer_test.go
+++ b/plugin/storage/cassandra/spanstore/writer_test.go
@@ -53,6 +53,14 @@ func withSpanWriter(writeCacheTTL time.Duration, fn func(w *spanWriterTest)) {
var _ spanstore.Writer = &SpanWriter{} // check API conformance
+func TestClientClose(t *testing.T) {
+ withSpanWriter(0, func(w *spanWriterTest) {
+ w.session.On("Close").Return(nil)
+ w.writer.Close()
+ w.session.AssertNumberOfCalls(t, "Close", 1)
+ })
+}
+
func TestSpanWriter(t *testing.T) {
testCases := []struct {
caption string
diff --git a/plugin/storage/es/dependencystore/storage.go b/plugin/storage/es/dependencystore/storage.go
index 34f1dfecc40..741f254a262 100644
--- a/plugin/storage/es/dependencystore/storage.go
+++ b/plugin/storage/es/dependencystore/storage.go
@@ -59,7 +59,8 @@ func (s *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.D
if err := s.createIndex(indexName); err != nil {
return err
}
- return s.writeDependencies(indexName, ts, dependencies)
+ s.writeDependencies(indexName, ts, dependencies)
+ return nil
}
func (s *DependencyStore) createIndex(indexName string) error {
@@ -70,18 +71,11 @@ func (s *DependencyStore) createIndex(indexName string) error {
return nil
}
-func (s *DependencyStore) writeDependencies(indexName string, ts time.Time, dependencies []model.DependencyLink) error {
- _, err := s.client.Index().Index(indexName).
- Type(dependencyType).
- BodyJson(&timeToDependencies{
- Timestamp: ts,
+func (s *DependencyStore) writeDependencies(indexName string, ts time.Time, dependencies []model.DependencyLink) {
+ s.client.Index().Index(indexName).Type(dependencyType).
+ BodyJson(&timeToDependencies{Timestamp: ts,
Dependencies: dependencies,
- }).
- Do(s.ctx)
- if err != nil {
- return errors.Wrap(err, "Failed to write dependencies")
- }
- return nil
+ }).Add()
}
// GetDependencies returns all interservice dependencies
diff --git a/plugin/storage/es/dependencystore/storage_test.go b/plugin/storage/es/dependencystore/storage_test.go
index 48dbf431788..5be0afed25f 100644
--- a/plugin/storage/es/dependencystore/storage_test.go
+++ b/plugin/storage/es/dependencystore/storage_test.go
@@ -64,10 +64,6 @@ func TestWriteDependencies(t *testing.T) {
createIndexError: errors.New("index not created"),
expectedError: "Failed to create index: index not created",
},
- {
- writeError: errors.New("write failed"),
- expectedError: "Failed to write dependencies: write failed",
- },
{},
}
for _, testCase := range testCases {
@@ -86,7 +82,7 @@ func TestWriteDependencies(t *testing.T) {
writeService.On("Index", stringMatcher(indexName)).Return(writeService)
writeService.On("Type", stringMatcher(dependencyType)).Return(writeService)
writeService.On("BodyJson", mock.Anything).Return(writeService)
- writeService.On("Do", mock.Anything).Return(nil, testCase.writeError)
+ writeService.On("Add", mock.Anything).Return(nil, testCase.writeError)
err := r.storage.WriteDependencies(fixedTime, []model.DependencyLink{})
if testCase.expectedError != "" {
diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go
index 5aca99e88fd..4527ffb394f 100644
--- a/plugin/storage/es/factory.go
+++ b/plugin/storage/es/factory.go
@@ -62,7 +62,7 @@ func (f *Factory) InitFromViper(v *viper.Viper) {
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error {
f.metricsFactory, f.logger = metricsFactory, logger
- primaryClient, err := f.primaryConfig.NewClient()
+ primaryClient, err := f.primaryConfig.NewClient(logger)
if err != nil {
return err
}
diff --git a/plugin/storage/es/factory_test.go b/plugin/storage/es/factory_test.go
index cc9edffae21..f8f3fb0a03f 100644
--- a/plugin/storage/es/factory_test.go
+++ b/plugin/storage/es/factory_test.go
@@ -36,7 +36,7 @@ type mockClientBuilder struct {
err error
}
-func (m *mockClientBuilder) NewClient() (es.Client, error) {
+func (m *mockClientBuilder) NewClient(logger *zap.Logger) (es.Client, error) {
if m.err == nil {
return &mocks.Client{}, nil
}
diff --git a/plugin/storage/es/options.go b/plugin/storage/es/options.go
index a46e46143ae..66a50cc95a3 100644
--- a/plugin/storage/es/options.go
+++ b/plugin/storage/es/options.go
@@ -25,13 +25,17 @@ import (
)
const (
- suffixUsername = ".username"
- suffixPassword = ".password"
- suffixSniffer = ".sniffer"
- suffixServerURLs = ".server-urls"
- suffixMaxSpanAge = ".max-span-age"
- suffixNumShards = ".num-shards"
- suffixNumReplicas = ".num-replicas"
+ suffixUsername = ".username"
+ suffixPassword = ".password"
+ suffixSniffer = ".sniffer"
+ suffixServerURLs = ".server-urls"
+ suffixMaxSpanAge = ".max-span-age"
+ suffixNumShards = ".num-shards"
+ suffixNumReplicas = ".num-replicas"
+ suffixBulkSize = ".bulk.size"
+ suffixBulkWorkers = ".bulk.workers"
+ suffixBulkActions = ".bulk.actions"
+ suffixBulkFlushInterval = ".bulk.flush-interval"
)
// TODO this should be moved next to config.Configuration struct (maybe ./flags package)
@@ -60,12 +64,16 @@ func NewOptions(primaryNamespace string, otherNamespaces ...string) *Options {
options := &Options{
primary: &namespaceConfig{
Configuration: config.Configuration{
- Username: "",
- Password: "",
- Sniffer: false,
- MaxSpanAge: 72 * time.Hour,
- NumShards: 5,
- NumReplicas: 1,
+ Username: "",
+ Password: "",
+ Sniffer: false,
+ MaxSpanAge: 72 * time.Hour,
+ NumShards: 5,
+ NumReplicas: 1,
+ BulkSize: 5 * 1000 * 1000,
+ BulkWorkers: 1,
+ BulkActions: 1000,
+ BulkFlushInterval: time.Millisecond * 200,
},
servers: "http://127.0.0.1:9200",
namespace: primaryNamespace,
@@ -117,6 +125,22 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
nsConfig.namespace+suffixNumReplicas,
nsConfig.NumReplicas,
"The number of replicas per index in ElasticSearch")
+ flagSet.Int(
+ nsConfig.namespace+suffixBulkSize,
+ nsConfig.BulkSize,
+ "The number of bytes that the bulk requests can take up before the bulk processor decides to commit")
+ flagSet.Int(
+ nsConfig.namespace+suffixBulkWorkers,
+ nsConfig.BulkWorkers,
+ "The number of workers that are able to receive bulk requests and eventually commit them to Elasticsearch")
+ flagSet.Int(
+ nsConfig.namespace+suffixBulkActions,
+ nsConfig.BulkActions,
+ "The number of requests that can be enqueued before the bulk processor decides to commit")
+ flagSet.Duration(
+ nsConfig.namespace+suffixBulkFlushInterval,
+ nsConfig.BulkFlushInterval,
+ "A time.Duration after which bulk requests are committed, regardless of other tresholds. Set to zero to disable. By default, this is disabled.")
}
// InitFromViper initializes Options with properties from viper
@@ -135,6 +159,10 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) {
cfg.MaxSpanAge = v.GetDuration(cfg.namespace + suffixMaxSpanAge)
cfg.NumShards = v.GetInt64(cfg.namespace + suffixNumShards)
cfg.NumReplicas = v.GetInt64(cfg.namespace + suffixNumReplicas)
+ cfg.BulkSize = v.GetInt(cfg.namespace + suffixBulkSize)
+ cfg.BulkWorkers = v.GetInt(cfg.namespace + suffixBulkWorkers)
+ cfg.BulkActions = v.GetInt(cfg.namespace + suffixBulkActions)
+ cfg.BulkFlushInterval = v.GetDuration(cfg.namespace + suffixBulkFlushInterval)
}
// GetPrimary returns primary configuration.
diff --git a/plugin/storage/es/spanstore/service_operation.go b/plugin/storage/es/spanstore/service_operation.go
index 8e28dec1ecb..033b6d875f4 100644
--- a/plugin/storage/es/spanstore/service_operation.go
+++ b/plugin/storage/es/spanstore/service_operation.go
@@ -55,10 +55,9 @@ func NewServiceOperationStorage(
cacheTTL time.Duration,
) *ServiceOperationStorage {
return &ServiceOperationStorage{
- ctx: ctx,
- client: client,
- metrics: storageMetrics.NewWriteMetrics(metricsFactory, "ServiceOperation"),
- logger: logger,
+ ctx: ctx,
+ client: client,
+ logger: logger,
serviceCache: cache.NewLRUWithOptions(
100000,
&cache.Options{
@@ -69,7 +68,7 @@ func NewServiceOperationStorage(
}
// Write saves a service to operation pair.
-func (s *ServiceOperationStorage) Write(indexName string, jsonSpan *jModel.Span) error {
+func (s *ServiceOperationStorage) Write(indexName string, jsonSpan *jModel.Span) {
// Insert serviceName:operationName document
service := Service{
ServiceName: jsonSpan.Process.ServiceName,
@@ -78,15 +77,9 @@ func (s *ServiceOperationStorage) Write(indexName string, jsonSpan *jModel.Span)
serviceID := fmt.Sprintf("%s|%s", service.ServiceName, service.OperationName)
cacheKey := fmt.Sprintf("%s:%s", indexName, serviceID)
if !keyInCache(cacheKey, s.serviceCache) {
- start := time.Now()
- _, err := s.client.Index().Index(indexName).Type(serviceType).Id(serviceID).BodyJson(service).Do(s.ctx)
- s.metrics.Emit(err, time.Since(start))
- if err != nil {
- return s.logError(jsonSpan, err, "Failed to insert service:operation", s.logger)
- }
+ s.client.Index().Index(indexName).Type(serviceType).Id(serviceID).BodyJson(service).Add()
writeCache(cacheKey, s.serviceCache)
}
- return nil
}
func (s *ServiceOperationStorage) getServices(indices []string) ([]string, error) {
@@ -150,9 +143,3 @@ func getOperationsAggregation() elastic.Query {
Field(operationNameField).
Size(defaultDocCount) // Must set to some large number. ES deprecated size omission for aggregating all. https://github.com/elastic/elasticsearch/issues/18838
}
-
-func (s *ServiceOperationStorage) logError(span *jModel.Span, err error, msg string, logger *zap.Logger) error {
- logger.Debug("trace info:", zap.String("trace_id", string(span.TraceID)), zap.String("span_id", string(span.SpanID)))
- logger.Error(msg, zap.Error(err))
- return errors.Wrap(err, msg)
-}
diff --git a/plugin/storage/es/spanstore/service_operation_test.go b/plugin/storage/es/spanstore/service_operation_test.go
index 6b2d8249963..b2dc7d652dc 100644
--- a/plugin/storage/es/spanstore/service_operation_test.go
+++ b/plugin/storage/es/spanstore/service_operation_test.go
@@ -15,8 +15,6 @@
package spanstore
import (
- "errors"
- "strings"
"testing"
"github.com/stretchr/testify/assert"
@@ -37,7 +35,7 @@ func TestWriteService(t *testing.T) {
indexService.On("Type", stringMatcher(serviceType)).Return(indexService)
indexService.On("Id", stringMatcher("service|operation")).Return(indexService)
indexService.On("BodyJson", mock.AnythingOfType("spanstore.Service")).Return(indexService)
- indexService.On("Do", mock.AnythingOfType("*context.emptyCtx")).Return(&elastic.IndexResponse{}, nil)
+ indexService.On("Add")
w.client.On("Index").Return(indexService)
@@ -50,16 +48,14 @@ func TestWriteService(t *testing.T) {
},
}
- err := w.writer.writeService(indexName, jsonSpan)
- require.NoError(t, err)
+ w.writer.writeService(indexName, jsonSpan)
- indexService.AssertNumberOfCalls(t, "Do", 1)
+ indexService.AssertNumberOfCalls(t, "Add", 1)
assert.Equal(t, "", w.logBuffer.String())
// test that cache works, will call the index service only once.
- err = w.writer.writeService(indexName, jsonSpan)
- require.NoError(t, err)
- indexService.AssertNumberOfCalls(t, "Do", 1)
+ w.writer.writeService(indexName, jsonSpan)
+ indexService.AssertNumberOfCalls(t, "Add", 1)
})
}
@@ -72,7 +68,7 @@ func TestWriteServiceError(t *testing.T) {
indexService.On("Type", stringMatcher(serviceType)).Return(indexService)
indexService.On("Id", stringMatcher("service|operation")).Return(indexService)
indexService.On("BodyJson", mock.AnythingOfType("spanstore.Service")).Return(indexService)
- indexService.On("Do", mock.AnythingOfType("*context.emptyCtx")).Return(nil, errors.New("service insertion error"))
+ indexService.On("Add")
w.client.On("Index").Return(indexService)
@@ -85,21 +81,7 @@ func TestWriteServiceError(t *testing.T) {
},
}
- err := w.writer.writeService(indexName, jsonSpan)
- assert.EqualError(t, err, "Failed to insert service:operation: service insertion error")
-
- indexService.AssertNumberOfCalls(t, "Do", 1)
-
- expectedLogs := []string{
- `"msg":"Failed to insert service:operation"`,
- `"trace_id":"1"`,
- `"span_id":"0"`,
- `"error":"service insertion error"`,
- }
-
- for _, expectedLog := range expectedLogs {
- assert.True(t, strings.Contains(w.logBuffer.String(), expectedLog), "Log must contain %s, but was %s", expectedLog, w.logBuffer.String())
- }
+ w.writer.writeService(indexName, jsonSpan)
})
}
diff --git a/plugin/storage/es/spanstore/writer.go b/plugin/storage/es/spanstore/writer.go
index 87985e8dc94..7fe59c377a1 100644
--- a/plugin/storage/es/spanstore/writer.go
+++ b/plugin/storage/es/spanstore/writer.go
@@ -43,10 +43,9 @@ const (
type spanWriterMetrics struct {
indexCreate *storageMetrics.WriteMetrics
- spans *storageMetrics.WriteMetrics
}
-type serviceWriter func(string, *jModel.Span) error
+type serviceWriter func(string, *jModel.Span)
// SpanWriter is a wrapper around elastic.Client
type SpanWriter struct {
@@ -98,7 +97,6 @@ func NewSpanWriter(
logger: logger,
writerMetrics: spanWriterMetrics{
indexCreate: storageMetrics.NewWriteMetrics(metricsFactory, "IndexCreate"),
- spans: storageMetrics.NewWriteMetrics(metricsFactory, "Spans"),
},
serviceWriter: serviceOperationStorage.Write,
indexCache: cache.NewLRUWithOptions(
@@ -121,13 +119,17 @@ func (s *SpanWriter) WriteSpan(span *model.Span) error {
if err := s.createIndex(serviceIndexName, serviceMapping, jsonSpan); err != nil {
return err
}
- if err := s.writeService(serviceIndexName, jsonSpan); err != nil {
- return err
- }
+ s.writeService(serviceIndexName, jsonSpan)
if err := s.createIndex(spanIndexName, spanMapping, jsonSpan); err != nil {
return err
}
- return s.writeSpan(spanIndexName, jsonSpan)
+ s.writeSpan(spanIndexName, jsonSpan)
+ return nil
+}
+
+// Close closes SpanWriter
+func (s *SpanWriter) Close() error {
+ return s.client.Close()
}
func indexNames(span *model.Span) (string, string) {
@@ -170,19 +172,14 @@ func (s *SpanWriter) fixMapping(mapping string) string {
return mapping
}
-func (s *SpanWriter) writeService(indexName string, jsonSpan *jModel.Span) error {
- return s.serviceWriter(indexName, jsonSpan)
+func (s *SpanWriter) writeService(indexName string, jsonSpan *jModel.Span) {
+ s.serviceWriter(indexName, jsonSpan)
}
-func (s *SpanWriter) writeSpan(indexName string, jsonSpan *jModel.Span) error {
- start := time.Now()
+func (s *SpanWriter) writeSpan(indexName string, jsonSpan *jModel.Span) {
elasticSpan := Span{Span: jsonSpan, StartTimeMillis: jsonSpan.StartTime / 1000} // Microseconds to milliseconds
- _, err := s.client.Index().Index(indexName).Type(spanType).BodyJson(&elasticSpan).Do(s.ctx)
- s.writerMetrics.spans.Emit(err, time.Since(start))
- if err != nil {
- return s.logError(jsonSpan, err, "Failed to insert span", s.logger)
- }
- return nil
+
+ s.client.Index().Index(indexName).Type(spanType).BodyJson(&elasticSpan).Add()
}
func (s *SpanWriter) logError(span *jModel.Span, err error, msg string, logger *zap.Logger) error {
diff --git a/plugin/storage/es/spanstore/writer_test.go b/plugin/storage/es/spanstore/writer_test.go
index ccefa514273..d2df42946e9 100644
--- a/plugin/storage/es/spanstore/writer_test.go
+++ b/plugin/storage/es/spanstore/writer_test.go
@@ -25,7 +25,6 @@ import (
"github.com/stretchr/testify/require"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"
- "gopkg.in/olivere/elastic.v5"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/model/json"
@@ -56,6 +55,14 @@ func withSpanWriter(fn func(w *spanWriterTest)) {
var _ spanstore.Writer = &SpanWriter{} // check API conformance
+func TestClientClose(t *testing.T) {
+ withSpanWriter(func(w *spanWriterTest) {
+ w.client.On("Close").Return(nil)
+ w.writer.Close()
+ w.client.AssertNumberOfCalls(t, "Close", 1)
+ })
+}
+
// This test behaves as a large test that checks WriteSpan's behavior as a whole.
// Extra tests for individual functions are below.
func TestSpanWriter_WriteSpan(t *testing.T) {
@@ -70,23 +77,6 @@ func TestSpanWriter_WriteSpan(t *testing.T) {
expectedError string
expectedLogs []string
}{
- {
- caption: "index exists query",
-
- serviceIndexExists: true,
- spanIndexExists: true,
-
- expectedError: "",
- expectedLogs: []string{},
- },
- {
- caption: "index dne/creation query",
-
- serviceIndexExists: false,
-
- expectedError: "",
- expectedLogs: []string{},
- },
{
caption: "index creation error",
@@ -101,33 +91,13 @@ func TestSpanWriter_WriteSpan(t *testing.T) {
`"error":"index creation error"`,
},
},
- {
- caption: "service insertion error",
-
- serviceIndexExists: false,
-
- servicePutError: errors.New("service insertion error"),
- expectedError: "Failed to insert service:operation: service insertion error",
- expectedLogs: []string{
- `"msg":"Failed to insert service:operation"`,
- `"trace_id":"1"`,
- `"span_id":"0"`,
- `"error":"service insertion error"`,
- },
- },
{
caption: "span insertion error",
serviceIndexExists: false,
- spanPutError: errors.New("span insertion error"),
- expectedError: "Failed to insert span: span insertion error",
- expectedLogs: []string{
- `"msg":"Failed to insert span"`,
- `"trace_id":"1"`,
- `"span_id":"0"`,
- `"error":"span insertion error"`,
- },
+ expectedError: "",
+ expectedLogs: []string{},
},
{
caption: "span index dne error",
@@ -191,11 +161,11 @@ func TestSpanWriter_WriteSpan(t *testing.T) {
indexServicePut.On("Id", stringMatcher("service|operation")).Return(indexServicePut)
indexServicePut.On("BodyJson", mock.AnythingOfType("spanstore.Service")).Return(indexServicePut)
- indexServicePut.On("Do", mock.AnythingOfType("*context.emptyCtx")).Return(nil, testCase.servicePutError)
+ indexServicePut.On("Add")
indexSpanPut.On("Id", mock.AnythingOfType("string")).Return(indexSpanPut)
indexSpanPut.On("BodyJson", mock.AnythingOfType("*spanstore.Span")).Return(indexSpanPut)
- indexSpanPut.On("Do", mock.AnythingOfType("*context.emptyCtx")).Return(nil, testCase.spanPutError)
+ indexSpanPut.On("Add")
w.client.On("IndexExists", stringMatcher(spanIndexName)).Return(spanExistsService)
w.client.On("CreateIndex", stringMatcher(spanIndexName)).Return(spanCreateService)
@@ -206,9 +176,9 @@ func TestSpanWriter_WriteSpan(t *testing.T) {
err = w.writer.WriteSpan(span)
if testCase.expectedError == "" {
- assert.NoError(t, err)
- indexServicePut.AssertNumberOfCalls(t, "Do", 1)
- indexSpanPut.AssertNumberOfCalls(t, "Do", 1)
+ require.NoError(t, err)
+ indexServicePut.AssertNumberOfCalls(t, "Add", 1)
+ indexSpanPut.AssertNumberOfCalls(t, "Add", 1)
} else {
assert.EqualError(t, err, testCase.expectedError)
}
@@ -235,75 +205,6 @@ func TestSpanIndexName(t *testing.T) {
assert.Equal(t, "jaeger-service-1995-04-21", serviceIndexName)
}
-func TestCheckAndCreateIndex(t *testing.T) {
- testCases := []struct {
- indexExists bool
- indexExistsError error
- createResult *elastic.IndicesCreateResult
- createError error
- expectedError string
- expectedLogs []string
- }{
- {
- indexExists: false,
- createResult: &elastic.IndicesCreateResult{},
- },
- {
- createError: errors.New("index creation error"),
- expectedError: "Failed to create index: index creation error",
- expectedLogs: []string{
- `"msg":"Failed to create index"`,
- `"trace_id":"1"`,
- `"span_id":"0"`,
- `"error":"index creation error"`,
- },
- },
- {
- indexExists: false,
- createError: &elastic.Error{Details: &elastic.ErrorDetails{Type: "index_already_exists_exception"}},
- indexExistsError: &elastic.Error{Details: &elastic.ErrorDetails{Type: "index_already_exists_exception"}},
- },
- }
- for _, testCase := range testCases {
- withSpanWriter(func(w *spanWriterTest) {
- existsService := &mocks.IndicesExistsService{}
- existsService.On("Do", mock.AnythingOfType("*context.emptyCtx")).Return(testCase.indexExists, testCase.indexExistsError)
-
- createService := &mocks.IndicesCreateService{}
- createService.On("Body", stringMatcher(w.writer.fixMapping(spanMapping))).Return(createService)
- createService.On("Do", mock.AnythingOfType("*context.emptyCtx")).Return(testCase.createResult, testCase.createError)
-
- indexName := "jaeger-1995-04-21"
- w.client.On("IndexExists", stringMatcher(indexName)).Return(existsService)
- w.client.On("CreateIndex", stringMatcher(indexName)).Return(createService)
-
- jsonSpan := &json.Span{
- TraceID: json.TraceID("1"),
- SpanID: json.SpanID("0"),
- }
-
- err := w.writer.createIndex(indexName, spanMapping, jsonSpan)
- createService.AssertNumberOfCalls(t, "Do", 1)
-
- if testCase.expectedError == "" {
- assert.NoError(t, err)
- // makes sure that the cache works
- _ = w.writer.createIndex(indexName, spanMapping, jsonSpan)
- createService.AssertNumberOfCalls(t, "Do", 1)
- } else {
- assert.EqualError(t, err, testCase.expectedError)
- }
-
- for _, expectedLog := range testCase.expectedLogs {
- assert.True(t, strings.Contains(w.logBuffer.String(), expectedLog), "Log must contain %s, but was %s", expectedLog, w.logBuffer.String())
- }
- if len(testCase.expectedLogs) == 0 {
- assert.Equal(t, "", w.logBuffer.String())
- }
- })
- }
-}
-
func TestFixMapping(t *testing.T) {
withSpanWriter(func(w *spanWriterTest) {
testMapping := `{
@@ -351,16 +252,14 @@ func TestWriteSpanInternal(t *testing.T) {
indexService.On("Index", stringMatcher(indexName)).Return(indexService)
indexService.On("Type", stringMatcher(spanType)).Return(indexService)
indexService.On("BodyJson", mock.AnythingOfType("*spanstore.Span")).Return(indexService)
- indexService.On("Do", mock.AnythingOfType("*context.emptyCtx")).Return(&elastic.IndexResponse{}, nil)
+ indexService.On("Add")
w.client.On("Index").Return(indexService)
jsonSpan := &json.Span{}
- err := w.writer.writeSpan(indexName, jsonSpan)
- require.NoError(t, err)
-
- indexService.AssertNumberOfCalls(t, "Do", 1)
+ w.writer.writeSpan(indexName, jsonSpan)
+ indexService.AssertNumberOfCalls(t, "Add", 1)
assert.Equal(t, "", w.logBuffer.String())
})
}
@@ -373,7 +272,7 @@ func TestWriteSpanInternalError(t *testing.T) {
indexService.On("Index", stringMatcher(indexName)).Return(indexService)
indexService.On("Type", stringMatcher(spanType)).Return(indexService)
indexService.On("BodyJson", mock.AnythingOfType("*spanstore.Span")).Return(indexService)
- indexService.On("Do", mock.AnythingOfType("*context.emptyCtx")).Return(nil, errors.New("span insertion error"))
+ indexService.On("Add")
w.client.On("Index").Return(indexService)
@@ -382,21 +281,8 @@ func TestWriteSpanInternalError(t *testing.T) {
SpanID: json.SpanID("0"),
}
- err := w.writer.writeSpan(indexName, jsonSpan)
- assert.EqualError(t, err, "Failed to insert span: span insertion error")
-
- indexService.AssertNumberOfCalls(t, "Do", 1)
-
- expectedLogs := []string{
- `"msg":"Failed to insert span"`,
- `"trace_id":"1"`,
- `"span_id":"0"`,
- `"error":"span insertion error"`,
- }
-
- for _, expectedLog := range expectedLogs {
- assert.True(t, strings.Contains(w.logBuffer.String(), expectedLog), "Log must contain %s, but was %s", expectedLog, w.logBuffer.String())
- }
+ w.writer.writeSpan(indexName, jsonSpan)
+ indexService.AssertNumberOfCalls(t, "Add", 1)
})
}
diff --git a/plugin/storage/integration/es_integration_test.go b/plugin/storage/integration/es_integration_test.go
index 923c3d2e360..9ad162d6c79 100644
--- a/plugin/storage/integration/es_integration_test.go
+++ b/plugin/storage/integration/es_integration_test.go
@@ -44,6 +44,7 @@ const (
type ESStorageIntegration struct {
client *elastic.Client
StorageIntegration
+ bulkProcessor *elastic.BulkProcessor
}
func (s *ESStorageIntegration) initializeES() error {
@@ -59,7 +60,8 @@ func (s *ESStorageIntegration) initializeES() error {
s.client = rawClient
s.logger = logger
- client := es.WrapESClient(s.client)
+ s.bulkProcessor, _ = s.client.BulkProcessor().Do(context.Background())
+ client := es.WrapESClient(s.client, s.bulkProcessor)
dependencyStore := dependencystore.NewDependencyStore(client, logger)
s.dependencyReader = dependencyStore
s.dependencyWriter = dependencyStore
@@ -77,13 +79,18 @@ func (s *ESStorageIntegration) esCleanUp() error {
}
func (s *ESStorageIntegration) initSpanstore() {
- client := es.WrapESClient(s.client)
+ bp, _ := s.client.BulkProcessor().BulkActions(1).FlushInterval(time.Nanosecond).Do(context.Background())
+ client := es.WrapESClient(s.client, bp)
s.spanWriter = spanstore.NewSpanWriter(client, s.logger, metrics.NullFactory, 0, 0)
s.spanReader = spanstore.NewSpanReader(client, s.logger, 72*time.Hour, metrics.NullFactory)
}
func (s *ESStorageIntegration) esRefresh() error {
- _, err := s.client.Refresh().Do(context.Background())
+ err := s.bulkProcessor.Flush()
+ if err != nil {
+ return err
+ }
+ _, err = s.client.Refresh().Do(context.Background())
return err
}