-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
Use elasticsearch bulk API #656
Conversation
could you elaborate in the description what exactly this change is doing? The SpanWriter API is still accepting one span at a time, so what do we mean by bulk? Does the client now batch the write requests? If so, how does this affect the success/failure metrics emitted? |
API is collecting spans until a threshold is reached - size, number of operations, time. Then it reports what it has in the buffer. I have removed metrics for now e.g. span write duration as there is no way how to find out the number. There is a way how to get stats from bulk service, but that is not per index name (e.g. spans, dependencies). I would prefer to add it in a separate PR https://github.com/olivere/elastic/wiki/BulkProcessor#stats |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the bulk functionality always going to be on, or configurable?
Makefile
Outdated
.PHONY: install-mockery | ||
install-mockery: | ||
go get github.com/vektra/mockery | ||
go build github.com/vektra/mockery |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
go get should install it already
@@ -235,75 +197,6 @@ func TestSpanIndexName(t *testing.T) { | |||
assert.Equal(t, "jaeger-service-1995-04-21", serviceIndexName) | |||
} | |||
|
|||
func TestCheckAndCreateIndex(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the argument for deleting this test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does not affect test coverage, I think some of them were repeating stuff
awesome perf bump |
Bulk API is always enabled. There are flags to configure it e.g. flush interval, size or flush period. |
pkg/es/client.go
Outdated
@@ -27,6 +27,7 @@ type Client interface { | |||
Index() IndexService | |||
Search(indices ...string) SearchService | |||
MultiSearch() MultiSearchService | |||
Close() error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about embedding io.Closer here?
I would like to close this in in main case of sigterm, however there is present only spanwriter. Do we want to add close to spanwriter too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lately I prefer not adding Close to interfaces. The main often creates services with explicit type, i.e. *SomeStruct
, in which case it already has access to Close(). In cases when we use an abstract factory pattern and get an interface back, we can still call close via conditional cast:
if closer, ok := someComponent.(io.Closer); ok {
closer.Close()
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to add close
here to be able close the bulk service it in es span writer.
plugin/storage/es/options.go
Outdated
BulkSize: 5 * 1000 * 1000, | ||
BulkWorkers: 1, | ||
BulkActions: 1000, | ||
BulkFlushInterval: 0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe set the default to 100ms?
I think that with 0 spans might not be flushed if someone is just testing and sends less than 1000 spans. Or the added delay will be high at low throughputs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
100ms seems quite low to leverage bulk API. If somebody is testing even 1s could be fine, until you open a browser and refresh spans will be there.
Here are some tests, flush interval: test duration
100ms: 100s
200ms: 80s
300ms: 77s
500ms: ~76s
1s: ~75s
I would go with at least 200ms
It should be ready for review |
BulkSize int | ||
BulkWorkers int | ||
BulkActions int | ||
BulkFlushInterval time.Duration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we want to add yaml tags for these? Because they are multi-word names, they will look differently otherwise
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where are these tags used?
do we cover the insert behavior in the integration tests with real ES? |
do you mean store and query data? Then yes https://github.com/jaegertracing/jaeger/blob/master/plugin/storage/integration/es_integration_test.go#L100 |
b59f73b
to
47c4587
Compare
pkg/es/wrapper.go
Outdated
} | ||
|
||
// Id calls this function to internal service. | ||
func (i ESIndexService) Id(id string) IndexService { | ||
return WrapESIndexService(i.indexService.Id(id)) | ||
a := WrapESIndexService(i.bulkIndexReq.Id(id), i.bulkService) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: inline
pkg/es/wrapper.go
Outdated
// Do calls this function to internal service. | ||
func (i ESIndexService) Do(ctx context.Context) (*elastic.IndexResponse, error) { | ||
return i.indexService.Do(ctx) | ||
// Add add the request to bulk service |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/Add add/Add adds
I suggest rebasing from master, there is a fix for flaky travis install |
Signed-off-by: Pavol Loffay <ploffay@redhat.com>
47c4587
to
a5fca44
Compare
Signed-off-by: Pavol Loffay <ploffay@redhat.com>
I will merge, if there are more comments I will apply them in a separate PR |
Fixes #439
Add BulkService API for storing documents. It caches requests until a threshold is reached - size, time, number of operations. Then it reports bulk to ES.
Performance: store 300k spans
Without bulk API ~15min
WIth bulk API: ~40s
This currently removes metrics, once this is merged I will do PR to include stats https://github.com/olivere/elastic/wiki/BulkProcessor#stats
Signed-off-by: Pavol Loffay ploffay@redhat.com