diff --git a/common/elasticsearch/backoff.go b/common/elasticsearch/bulk/backoff.go similarity index 99% rename from common/elasticsearch/backoff.go rename to common/elasticsearch/bulk/backoff.go index a72c763b9b6..a173542ab12 100644 --- a/common/elasticsearch/backoff.go +++ b/common/elasticsearch/bulk/backoff.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package elasticsearch +package bulk import ( "math" diff --git a/common/elasticsearch/bulk/bulk.go b/common/elasticsearch/bulk/bulk.go new file mode 100644 index 00000000000..64d8663bca1 --- /dev/null +++ b/common/elasticsearch/bulk/bulk.go @@ -0,0 +1,123 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package bulk + +import ( + "context" + "fmt" + "time" +) + +const UnknownStatusCode = -1 + +type GenericBulkableRequestType int + +const ( + BulkableIndexRequest GenericBulkableRequestType = iota + BulkableDeleteRequest + BulkableCreateRequest +) + +type ( + // GenericBulkProcessor is a bulk processor + GenericBulkProcessor interface { + Start(ctx context.Context) error + Stop() error + Close() error + Add(request *GenericBulkableAddRequest) + Flush() error + } + + // BulkProcessorParameters holds all required and optional parameters for executing bulk service + BulkProcessorParameters struct { + Name string + NumOfWorkers int + BulkActions int + BulkSize int + FlushInterval time.Duration + Backoff GenericBackoff + BeforeFunc GenericBulkBeforeFunc + AfterFunc GenericBulkAfterFunc + } + + // GenericBackoff allows callers to implement their own Backoff strategy. + GenericBackoff interface { + // Next implements a BackoffFunc. + Next(retry int) (time.Duration, bool) + } + + // GenericBulkBeforeFunc defines the signature of callbacks that are executed + // before a commit to Elasticsearch. + GenericBulkBeforeFunc func(executionId int64, requests []GenericBulkableRequest) + + // GenericBulkAfterFunc defines the signature of callbacks that are executed + // after a commit to Elasticsearch. The err parameter signals an error. + GenericBulkAfterFunc func(executionId int64, requests []GenericBulkableRequest, response *GenericBulkResponse, err *GenericError) + + // GenericBulkableRequest is a generic interface to bulkable requests. + GenericBulkableRequest interface { + fmt.Stringer + Source() ([]string, error) + } + + // GenericBulkableAddRequest a struct to hold a bulk request + GenericBulkableAddRequest struct { + Index string + Type string + ID string + VersionType string + Version int64 + // request types can be index, delete or create + RequestType GenericBulkableRequestType + // should be nil if IsDelete is true + Doc interface{} + } + + // GenericBulkResponse is generic struct of bulk response + GenericBulkResponse struct { + Took int `json:"took,omitempty"` + Errors bool `json:"errors,omitempty"` + Items []map[string]*GenericBulkResponseItem `json:"items,omitempty"` + } + + // GenericError encapsulates error status and details returned from Elasticsearch. + GenericError struct { + Status int `json:"status"` + Details error `json:"error,omitempty"` + } + + // GenericBulkResponseItem is the result of a single bulk request. + GenericBulkResponseItem struct { + Index string `json:"_index,omitempty"` + Type string `json:"_type,omitempty"` + ID string `json:"_id,omitempty"` + Version int64 `json:"_version,omitempty"` + Result string `json:"result,omitempty"` + SeqNo int64 `json:"_seq_no,omitempty"` + PrimaryTerm int64 `json:"_primary_term,omitempty"` + Status int `json:"status,omitempty"` + ForcedRefresh bool `json:"forced_refresh,omitempty"` + // the error details + Error interface{} + } +) diff --git a/common/elasticsearch/mocks/GenericBulkProcessor.go b/common/elasticsearch/bulk/mocks/GenericBulkProcessor.go similarity index 94% rename from common/elasticsearch/mocks/GenericBulkProcessor.go rename to common/elasticsearch/bulk/mocks/GenericBulkProcessor.go index 2f860ef8143..53947df3900 100644 --- a/common/elasticsearch/mocks/GenericBulkProcessor.go +++ b/common/elasticsearch/bulk/mocks/GenericBulkProcessor.go @@ -29,7 +29,7 @@ import ( mock "github.com/stretchr/testify/mock" - elasticsearch "github.com/uber/cadence/common/elasticsearch" + bulk "github.com/uber/cadence/common/elasticsearch/bulk" ) // GenericBulkProcessor is an autogenerated mock type for the GenericBulkProcessor type @@ -38,7 +38,7 @@ type GenericBulkProcessor struct { } // Add provides a mock function with given fields: request -func (_m *GenericBulkProcessor) Add(request *elasticsearch.GenericBulkableAddRequest) { +func (_m *GenericBulkProcessor) Add(request *bulk.GenericBulkableAddRequest) { _m.Called(request) } diff --git a/common/elasticsearch/mocks/GenericBulkableRequest.go b/common/elasticsearch/bulk/mocks/GenericBulkableRequest.go similarity index 100% rename from common/elasticsearch/mocks/GenericBulkableRequest.go rename to common/elasticsearch/bulk/mocks/GenericBulkableRequest.go diff --git a/common/elasticsearch/client/client.go b/common/elasticsearch/client/client.go index d670c586e9d..efdc19ed0fb 100644 --- a/common/elasticsearch/client/client.go +++ b/common/elasticsearch/client/client.go @@ -26,7 +26,7 @@ import ( "context" "encoding/json" - "github.com/uber/cadence/common/elasticsearch" + "github.com/uber/cadence/common/elasticsearch/bulk" ) // Client is a generic ES client implementation. @@ -45,7 +45,7 @@ type Client interface { PutMapping(ctx context.Context, index, body string) error // RunBulkProcessor starts bulk indexing processor // @TODO consider to extract Bulk Processor as a separate entity - RunBulkProcessor(ctx context.Context, p *elasticsearch.BulkProcessorParameters) (elasticsearch.GenericBulkProcessor, error) + RunBulkProcessor(ctx context.Context, p *bulk.BulkProcessorParameters) (bulk.GenericBulkProcessor, error) // Scroll retrieves the next batch of results for a scrolling search. Scroll(ctx context.Context, index, body, scrollID string) (*Response, error) // Search returns Elasticsearch hit bytes and additional metadata @@ -56,8 +56,21 @@ type Client interface { type Response struct { TookInMillis int64 TotalHits int64 - Hits [][]byte // response from ES server as bytes, used to unmarshal to internal structs + Hits *SearchHits // response from ES server as bytes, used to unmarshal to internal structs Aggregations map[string]json.RawMessage Sort []interface{} ScrollID string } + +// SearchHits specifies the list of search hits. +type SearchHits struct { + Hits []*SearchHit `json:"hits,omitempty"` // the actual hits returned +} + +// SearchHit is a single hit. +type SearchHit struct { + Index string `json:"_index,omitempty"` // index name + ID string `json:"_id,omitempty"` // external or internal + Sort []interface{} `json:"sort,omitempty"` // sort information + Source json.RawMessage `json:"_source,omitempty"` // stored document source +} diff --git a/common/elasticsearch/client/v6/client.go b/common/elasticsearch/client/v6/client.go new file mode 100644 index 00000000000..23df006564f --- /dev/null +++ b/common/elasticsearch/client/v6/client.go @@ -0,0 +1,194 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package v6 + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "time" + + "github.com/olivere/elastic" + + "github.com/uber/cadence/common/config" + "github.com/uber/cadence/common/elasticsearch/client" + "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/types" +) + +type ( + // ElasticV6 implements Client + ElasticV6 struct { + client *elastic.Client + logger log.Logger + } +) + +func (c *ElasticV6) IsNotFoundError(err error) bool { + return elastic.IsNotFound(err) +} + +// NewV6Client returns a new implementation of GenericClient +func NewV6Client( + connectConfig *config.ElasticSearchConfig, + logger log.Logger, + tlsClient *http.Client, + awsSigningClient *http.Client, +) (*ElasticV6, error) { + clientOptFuncs := []elastic.ClientOptionFunc{ + elastic.SetURL(connectConfig.URL.String()), + elastic.SetRetrier(elastic.NewBackoffRetrier(elastic.NewExponentialBackoff(128*time.Millisecond, 513*time.Millisecond))), + elastic.SetDecoder(&elastic.NumberDecoder{}), // critical to ensure decode of int64 won't lose precise) + } + if connectConfig.DisableSniff { + clientOptFuncs = append(clientOptFuncs, elastic.SetSniff(false)) + } + if connectConfig.DisableHealthCheck { + clientOptFuncs = append(clientOptFuncs, elastic.SetHealthcheck(false)) + } + + if awsSigningClient != nil { + clientOptFuncs = append(clientOptFuncs, elastic.SetHttpClient(awsSigningClient)) + } + + if tlsClient != nil { + clientOptFuncs = append(clientOptFuncs, elastic.SetHttpClient(tlsClient)) + } + + client, err := elastic.NewClient(clientOptFuncs...) + if err != nil { + return nil, err + } + + return &ElasticV6{ + client: client, + logger: logger, + }, nil +} + +func (c *ElasticV6) PutMapping(ctx context.Context, index, body string) error { + _, err := c.client.PutMapping().Index(index).Type("_doc").BodyString(body).Do(ctx) + return err +} + +func (c *ElasticV6) CreateIndex(ctx context.Context, index string) error { + _, err := c.client.CreateIndex(index).Do(ctx) + return err +} + +func (c *ElasticV6) Count(ctx context.Context, index, query string) (int64, error) { + return c.client.Count(index).BodyString(query).Do(ctx) +} + +func (c *ElasticV6) ClearScroll(ctx context.Context, scrollID string) error { + return elastic.NewScrollService(c.client).ScrollId(scrollID).Clear(ctx) +} +func (c *ElasticV6) Scroll(ctx context.Context, index, body, scrollID string) (*client.Response, error) { + scrollService := elastic.NewScrollService(c.client) + var esResult *elastic.SearchResult + var err error + + // we are not returning error immediately here, as result + error combination is possible + if len(scrollID) == 0 { + esResult, err = scrollService.Index(index).Body(body).Do(ctx) + } else { + esResult, err = scrollService.ScrollId(scrollID).Do(ctx) + } + + if esResult == nil { + return nil, err + } + + var hits []*client.SearchHit + if esResult.Hits != nil { + for _, h := range esResult.Hits.Hits { + if h.Source != nil { + hits = append(hits, &client.SearchHit{Source: *h.Source}) + } + } + } + + result := &client.Response{ + TookInMillis: esResult.TookInMillis, + TotalHits: esResult.TotalHits(), + Hits: &client.SearchHits{Hits: hits}, + ScrollID: esResult.ScrollId, + } + + if len(esResult.Aggregations) > 0 { + result.Aggregations = make(map[string]json.RawMessage, len(esResult.Aggregations)) + for key, agg := range esResult.Aggregations { + if agg != nil { + result.Aggregations[key] = *agg + } + } + } + + return result, err +} + +func (c *ElasticV6) Search(ctx context.Context, index, body string) (*client.Response, error) { + esResult, err := c.client.Search(index).Source(body).Do(ctx) + if err != nil { + return nil, err + } + + if esResult.Error != nil { + return nil, types.InternalServiceError{ + Message: fmt.Sprintf("ElasticSearch Error: %#v", esResult.Error), + } + } else if esResult.TimedOut { + return nil, types.InternalServiceError{ + Message: fmt.Sprintf("ElasticSearch Error: Request timed out: %v ms", esResult.TookInMillis), + } + } + + var sort []interface{} + var hits []*client.SearchHit + + if esResult != nil && esResult.Hits != nil { + for _, h := range esResult.Hits.Hits { + if h.Source != nil { + hits = append(hits, &client.SearchHit{Source: *h.Source}) + } + sort = h.Sort + } + } + + result := &client.Response{ + TookInMillis: esResult.TookInMillis, + TotalHits: esResult.TotalHits(), + Hits: &client.SearchHits{Hits: hits}, + Sort: sort, + } + + if len(esResult.Aggregations) > 0 { + result.Aggregations = make(map[string]json.RawMessage, len(esResult.Aggregations)) + for key, agg := range esResult.Aggregations { + if agg != nil { + result.Aggregations[key] = *agg + } + } + } + + return result, nil +} diff --git a/common/elasticsearch/client/v6/client_bulk.go b/common/elasticsearch/client/v6/client_bulk.go new file mode 100644 index 00000000000..5196a912ffd --- /dev/null +++ b/common/elasticsearch/client/v6/client_bulk.go @@ -0,0 +1,210 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package v6 + +import ( + "context" + "time" + + "github.com/olivere/elastic" + + "github.com/uber/cadence/common/elasticsearch/bulk" + "github.com/uber/cadence/common/log" +) + +// bulkProcessorParametersV6 holds all required and optional parameters for executing bulk service +type bulkProcessorParametersV6 struct { + Name string + NumOfWorkers int + BulkActions int + BulkSize int + FlushInterval time.Duration + Backoff elastic.Backoff + BeforeFunc elastic.BulkBeforeFunc + AfterFunc elastic.BulkAfterFunc +} + +type v6BulkProcessor struct { + processor *elastic.BulkProcessor + logger log.Logger +} + +func (v *v6BulkProcessor) Start(ctx context.Context) error { + return v.processor.Start(ctx) +} + +func (v *v6BulkProcessor) Stop() error { + return v.processor.Stop() +} + +func (v *v6BulkProcessor) Close() error { + return v.processor.Close() +} + +func (v *v6BulkProcessor) Add(request *bulk.GenericBulkableAddRequest) { + var req elastic.BulkableRequest + switch request.RequestType { + case bulk.BulkableDeleteRequest: + req = elastic.NewBulkDeleteRequest(). + Index(request.Index). + Type(request.Type). + Id(request.ID). + VersionType(request.VersionType). + Version(request.Version) + case bulk.BulkableIndexRequest: + req = elastic.NewBulkIndexRequest(). + Index(request.Index). + Type(request.Type). + Id(request.ID). + VersionType(request.VersionType). + Version(request.Version). + Doc(request.Doc) + case bulk.BulkableCreateRequest: + //for bulk create request still calls the bulk index method + //with providing operation type + req = elastic.NewBulkIndexRequest(). + OpType("create"). + Index(request.Index). + Type(request.Type). + Id(request.ID). + VersionType("internal"). + Doc(request.Doc) + } + v.processor.Add(req) +} + +func (v *v6BulkProcessor) Flush() error { + return v.processor.Flush() +} + +func (c *ElasticV6) runBulkProcessor(ctx context.Context, p *bulkProcessorParametersV6) (*v6BulkProcessor, error) { + processor, err := c.client.BulkProcessor(). + Name(p.Name). + Workers(p.NumOfWorkers). + BulkActions(p.BulkActions). + BulkSize(p.BulkSize). + FlushInterval(p.FlushInterval). + Backoff(p.Backoff). + Before(p.BeforeFunc). + After(p.AfterFunc). + Do(ctx) + if err != nil { + return nil, err + } + return &v6BulkProcessor{ + processor: processor, + }, nil +} + +func (c *ElasticV6) RunBulkProcessor(ctx context.Context, parameters *bulk.BulkProcessorParameters) (bulk.GenericBulkProcessor, error) { + beforeFunc := func(executionId int64, requests []elastic.BulkableRequest) { + parameters.BeforeFunc(executionId, fromV6ToGenericBulkableRequests(requests)) + } + + afterFunc := func(executionId int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) { + gerr := convertV6ErrorToGenericError(err) + parameters.AfterFunc( + executionId, + fromV6ToGenericBulkableRequests(requests), + fromV6toGenericBulkResponse(response), + gerr) + } + + return c.runBulkProcessor(ctx, &bulkProcessorParametersV6{ + Name: parameters.Name, + NumOfWorkers: parameters.NumOfWorkers, + BulkActions: parameters.BulkActions, + BulkSize: parameters.BulkSize, + FlushInterval: parameters.FlushInterval, + Backoff: parameters.Backoff, + BeforeFunc: beforeFunc, + AfterFunc: afterFunc, + }) +} + +func convertV6ErrorToGenericError(err error) *bulk.GenericError { + if err == nil { + return nil + } + status := bulk.UnknownStatusCode + switch e := err.(type) { + case *elastic.Error: + status = e.Status + } + return &bulk.GenericError{ + Status: status, + Details: err, + } +} + +func fromV6toGenericBulkResponse(response *elastic.BulkResponse) *bulk.GenericBulkResponse { + if response == nil { + return &bulk.GenericBulkResponse{} + } + return &bulk.GenericBulkResponse{ + Took: response.Took, + Errors: response.Errors, + Items: fromV6ToGenericBulkResponseItemMaps(response.Items), + } +} + +func fromV6ToGenericBulkResponseItemMaps(items []map[string]*elastic.BulkResponseItem) []map[string]*bulk.GenericBulkResponseItem { + var gitems []map[string]*bulk.GenericBulkResponseItem + for _, it := range items { + gitems = append(gitems, fromV6ToGenericBulkResponseItemMap(it)) + } + return gitems +} + +func fromV6ToGenericBulkResponseItemMap(m map[string]*elastic.BulkResponseItem) map[string]*bulk.GenericBulkResponseItem { + if m == nil { + return nil + } + gm := make(map[string]*bulk.GenericBulkResponseItem, len(m)) + for k, v := range m { + gm[k] = fromV6ToGenericBulkResponseItem(v) + } + return gm +} + +func fromV6ToGenericBulkResponseItem(v *elastic.BulkResponseItem) *bulk.GenericBulkResponseItem { + return &bulk.GenericBulkResponseItem{ + Index: v.Index, + Type: v.Type, + ID: v.Id, + Version: v.Version, + Result: v.Result, + SeqNo: v.SeqNo, + PrimaryTerm: v.PrimaryTerm, + Status: v.Status, + ForcedRefresh: v.ForcedRefresh, + } +} + +func fromV6ToGenericBulkableRequests(requests []elastic.BulkableRequest) []bulk.GenericBulkableRequest { + var v6Reqs []bulk.GenericBulkableRequest + for _, req := range requests { + v6Reqs = append(v6Reqs, req) + } + return v6Reqs +} diff --git a/common/elasticsearch/client/v7/client.go b/common/elasticsearch/client/v7/client.go new file mode 100644 index 00000000000..d3a24c51a64 --- /dev/null +++ b/common/elasticsearch/client/v7/client.go @@ -0,0 +1,169 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package v7 + +import ( + "context" + "fmt" + "net/http" + "time" + + "github.com/olivere/elastic/v7" + + "github.com/uber/cadence/common/config" + "github.com/uber/cadence/common/elasticsearch/client" + "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/types" +) + +type ( + // ElasticV7 implements ES7 + ElasticV7 struct { + client *elastic.Client + logger log.Logger + } +) + +// NewV7Client returns a new implementation of GenericClient +func NewV7Client( + connectConfig *config.ElasticSearchConfig, + logger log.Logger, + tlsClient *http.Client, + awsSigningClient *http.Client, +) (*ElasticV7, error) { + clientOptFuncs := []elastic.ClientOptionFunc{ + elastic.SetURL(connectConfig.URL.String()), + elastic.SetRetrier(elastic.NewBackoffRetrier(elastic.NewExponentialBackoff(128*time.Millisecond, 513*time.Millisecond))), + elastic.SetDecoder(&elastic.NumberDecoder{}), // critical to ensure decode of int64 won't lose precise + } + if connectConfig.DisableSniff { + clientOptFuncs = append(clientOptFuncs, elastic.SetSniff(false)) + } + if connectConfig.DisableHealthCheck { + clientOptFuncs = append(clientOptFuncs, elastic.SetHealthcheck(false)) + } + + if awsSigningClient != nil { + clientOptFuncs = append(clientOptFuncs, elastic.SetHttpClient(awsSigningClient)) + } + + if tlsClient != nil { + clientOptFuncs = append(clientOptFuncs, elastic.SetHttpClient(tlsClient)) + } + + client, err := elastic.NewClient(clientOptFuncs...) + if err != nil { + return nil, err + } + + return &ElasticV7{ + client: client, + logger: logger, + }, nil +} + +func (c *ElasticV7) IsNotFoundError(err error) bool { + return elastic.IsNotFound(err) +} + +func (c *ElasticV7) PutMapping(ctx context.Context, index, body string) error { + _, err := c.client.PutMapping().Index(index).BodyString(body).Do(ctx) + return err +} +func (c *ElasticV7) CreateIndex(ctx context.Context, index string) error { + _, err := c.client.CreateIndex(index).Do(ctx) + return err +} +func (c *ElasticV7) Count(ctx context.Context, index, query string) (int64, error) { + return c.client.Count(index).BodyString(query).Do(ctx) +} + +func (c *ElasticV7) ClearScroll(ctx context.Context, scrollID string) error { + return elastic.NewScrollService(c.client).ScrollId(scrollID).Clear(ctx) +} + +func (c *ElasticV7) Search(ctx context.Context, index string, body string) (*client.Response, error) { + esResult, err := c.client.Search(index).Source(body).Do(ctx) + if err != nil { + return nil, err + } + + if esResult.Error != nil { + return nil, types.InternalServiceError{ + Message: fmt.Sprintf("ElasticSearch Error: %#v", esResult.Error), + } + } else if esResult.TimedOut { + return nil, types.InternalServiceError{ + Message: fmt.Sprintf("ElasticSearch Error: Request timed out: %v ms", esResult.TookInMillis), + } + } + + var sort []interface{} + var hits []*client.SearchHit + + if esResult != nil && esResult.Hits != nil { + for _, h := range esResult.Hits.Hits { + hits = append(hits, &client.SearchHit{Source: h.Source}) + sort = h.Sort + } + } + + return &client.Response{ + TookInMillis: esResult.TookInMillis, + TotalHits: esResult.TotalHits(), + Hits: &client.SearchHits{Hits: hits}, + Aggregations: esResult.Aggregations, + Sort: sort, + }, nil + +} + +func (c *ElasticV7) Scroll(ctx context.Context, index, body, scrollID string) (*client.Response, error) { + scrollService := elastic.NewScrollService(c.client) + var esResult *elastic.SearchResult + var err error + + // we are not returning error immediately here, as result + error combination is possible + if len(scrollID) == 0 { + esResult, err = scrollService.Index(index).Body(body).Do(ctx) + } else { + esResult, err = scrollService.ScrollId(scrollID).Do(ctx) + } + + if esResult == nil { + return nil, err + } + + var hits []*client.SearchHit + if esResult.Hits != nil { + for _, h := range esResult.Hits.Hits { + hits = append(hits, &client.SearchHit{Source: h.Source}) + } + } + + return &client.Response{ + TookInMillis: esResult.TookInMillis, + TotalHits: esResult.TotalHits(), + Hits: &client.SearchHits{Hits: hits}, + Aggregations: esResult.Aggregations, + ScrollID: esResult.ScrollId, + }, err +} diff --git a/common/elasticsearch/client/v7/client_bulk.go b/common/elasticsearch/client/v7/client_bulk.go new file mode 100644 index 00000000000..a947dab2c4e --- /dev/null +++ b/common/elasticsearch/client/v7/client_bulk.go @@ -0,0 +1,207 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package v7 + +import ( + "context" + "time" + + "github.com/olivere/elastic/v7" + + "github.com/uber/cadence/common/elasticsearch/bulk" +) + +type // bulkProcessorParametersV7 holds all required and optional parameters for executing bulk service +bulkProcessorParametersV7 struct { + Name string + NumOfWorkers int + BulkActions int + BulkSize int + FlushInterval time.Duration + Backoff elastic.Backoff + BeforeFunc elastic.BulkBeforeFunc + AfterFunc elastic.BulkAfterFunc +} + +type v7BulkProcessor struct { + processor *elastic.BulkProcessor +} + +func (v *v7BulkProcessor) Start(ctx context.Context) error { + return v.processor.Start(ctx) +} + +func (v *v7BulkProcessor) Stop() error { + return v.processor.Stop() +} + +func (v *v7BulkProcessor) Close() error { + return v.processor.Close() +} + +func (v *v7BulkProcessor) Add(request *bulk.GenericBulkableAddRequest) { + var req elastic.BulkableRequest + switch request.RequestType { + case bulk.BulkableDeleteRequest: + req = elastic.NewBulkDeleteRequest(). + Index(request.Index). + Type(request.Type). + Id(request.ID). + VersionType(request.VersionType). + Version(request.Version) + case bulk.BulkableIndexRequest: + req = elastic.NewBulkIndexRequest(). + Index(request.Index). + Type(request.Type). + Id(request.ID). + VersionType(request.VersionType). + Version(request.Version). + Doc(request.Doc) + case bulk.BulkableCreateRequest: + //for bulk create request still calls the bulk index method + //with providing operation type + req = elastic.NewBulkIndexRequest(). + OpType("create"). + Index(request.Index). + Type(request.Type). + Id(request.ID). + VersionType("internal"). + Doc(request.Doc) + } + v.processor.Add(req) +} + +func (v *v7BulkProcessor) Flush() error { + return v.processor.Flush() +} + +func (c *ElasticV7) runBulkProcessor(ctx context.Context, p *bulkProcessorParametersV7) (*v7BulkProcessor, error) { + processor, err := c.client.BulkProcessor(). + Name(p.Name). + Workers(p.NumOfWorkers). + BulkActions(p.BulkActions). + BulkSize(p.BulkSize). + FlushInterval(p.FlushInterval). + Backoff(p.Backoff). + Before(p.BeforeFunc). + After(p.AfterFunc). + Do(ctx) + if err != nil { + return nil, err + } + return &v7BulkProcessor{ + processor: processor, + }, nil +} + +func fromV7toGenericBulkResponse(response *elastic.BulkResponse) *bulk.GenericBulkResponse { + if response == nil { + return &bulk.GenericBulkResponse{} + } + return &bulk.GenericBulkResponse{ + Took: response.Took, + Errors: response.Errors, + Items: fromV7ToGenericBulkResponseItemMaps(response.Items), + } +} + +func fromV7ToGenericBulkResponseItemMaps(items []map[string]*elastic.BulkResponseItem) []map[string]*bulk.GenericBulkResponseItem { + var gitems []map[string]*bulk.GenericBulkResponseItem + for _, it := range items { + gitems = append(gitems, fromV7ToGenericBulkResponseItemMap(it)) + } + return gitems +} + +func fromV7ToGenericBulkResponseItemMap(m map[string]*elastic.BulkResponseItem) map[string]*bulk.GenericBulkResponseItem { + if m == nil { + return nil + } + gm := make(map[string]*bulk.GenericBulkResponseItem, len(m)) + for k, v := range m { + gm[k] = fromV7ToGenericBulkResponseItem(v) + } + return gm +} + +func fromV7ToGenericBulkResponseItem(v *elastic.BulkResponseItem) *bulk.GenericBulkResponseItem { + return &bulk.GenericBulkResponseItem{ + Index: v.Index, + Type: v.Type, + ID: v.Id, + Version: v.Version, + Result: v.Result, + SeqNo: v.SeqNo, + PrimaryTerm: v.PrimaryTerm, + Status: v.Status, + ForcedRefresh: v.ForcedRefresh, + } +} + +func fromV7ToGenericBulkableRequests(requests []elastic.BulkableRequest) []bulk.GenericBulkableRequest { + var v7Reqs []bulk.GenericBulkableRequest + for _, req := range requests { + v7Reqs = append(v7Reqs, req) + } + return v7Reqs +} + +func (c *ElasticV7) RunBulkProcessor(ctx context.Context, parameters *bulk.BulkProcessorParameters) (bulk.GenericBulkProcessor, error) { + beforeFunc := func(executionId int64, requests []elastic.BulkableRequest) { + parameters.BeforeFunc(executionId, fromV7ToGenericBulkableRequests(requests)) + } + + afterFunc := func(executionId int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) { + gerr := errorToGenericError(err) + parameters.AfterFunc( + executionId, + fromV7ToGenericBulkableRequests(requests), + fromV7toGenericBulkResponse(response), + gerr) + } + + return c.runBulkProcessor(ctx, &bulkProcessorParametersV7{ + Name: parameters.Name, + NumOfWorkers: parameters.NumOfWorkers, + BulkActions: parameters.BulkActions, + BulkSize: parameters.BulkSize, + FlushInterval: parameters.FlushInterval, + Backoff: parameters.Backoff, + BeforeFunc: beforeFunc, + AfterFunc: afterFunc, + }) +} +func errorToGenericError(err error) *bulk.GenericError { + if err == nil { + return nil + } + status := bulk.UnknownStatusCode + switch e := err.(type) { + case *elastic.Error: + status = e.Status + } + return &bulk.GenericError{ + Status: status, + Details: err, + } +} diff --git a/common/elasticsearch/client_v6_bulk.go b/common/elasticsearch/client_v6_bulk.go index dcdeab3dd34..3ced58a7038 100644 --- a/common/elasticsearch/client_v6_bulk.go +++ b/common/elasticsearch/client_v6_bulk.go @@ -25,16 +25,18 @@ package elasticsearch import ( "context" + "github.com/uber/cadence/common/elasticsearch/bulk" + "github.com/olivere/elastic" ) -var _ GenericBulkProcessor = (*v6BulkProcessor)(nil) +var _ bulk.GenericBulkProcessor = (*v6BulkProcessor)(nil) type v6BulkProcessor struct { processor *elastic.BulkProcessor } -func (c *elasticV6) RunBulkProcessor(ctx context.Context, parameters *BulkProcessorParameters) (GenericBulkProcessor, error) { +func (c *elasticV6) RunBulkProcessor(ctx context.Context, parameters *bulk.BulkProcessorParameters) (bulk.GenericBulkProcessor, error) { beforeFunc := func(executionId int64, requests []elastic.BulkableRequest) { parameters.BeforeFunc(executionId, fromV6ToGenericBulkableRequests(requests)) } @@ -79,17 +81,17 @@ func (v *v6BulkProcessor) Close() error { return v.processor.Close() } -func (v *v6BulkProcessor) Add(request *GenericBulkableAddRequest) { +func (v *v6BulkProcessor) Add(request *bulk.GenericBulkableAddRequest) { var req elastic.BulkableRequest switch request.RequestType { - case BulkableDeleteRequest: + case bulk.BulkableDeleteRequest: req = elastic.NewBulkDeleteRequest(). Index(request.Index). Type(request.Type). Id(request.ID). VersionType(request.VersionType). Version(request.Version) - case BulkableIndexRequest: + case bulk.BulkableIndexRequest: req = elastic.NewBulkIndexRequest(). Index(request.Index). Type(request.Type). @@ -97,7 +99,7 @@ func (v *v6BulkProcessor) Add(request *GenericBulkableAddRequest) { VersionType(request.VersionType). Version(request.Version). Doc(request.Doc) - case BulkableCreateRequest: + case bulk.BulkableCreateRequest: //for bulk create request still calls the bulk index method //with providing operation type req = elastic.NewBulkIndexRequest(). @@ -115,53 +117,53 @@ func (v *v6BulkProcessor) Flush() error { return v.processor.Flush() } -func convertV6ErrorToGenericError(err error) *GenericError { +func convertV6ErrorToGenericError(err error) *bulk.GenericError { if err == nil { return nil } - status := unknownStatusCode + status := bulk.UnknownStatusCode switch e := err.(type) { case *elastic.Error: status = e.Status } - return &GenericError{ + return &bulk.GenericError{ Status: status, Details: err, } } -func fromV6toGenericBulkResponse(response *elastic.BulkResponse) *GenericBulkResponse { +func fromV6toGenericBulkResponse(response *elastic.BulkResponse) *bulk.GenericBulkResponse { if response == nil { - return &GenericBulkResponse{} + return &bulk.GenericBulkResponse{} } - return &GenericBulkResponse{ + return &bulk.GenericBulkResponse{ Took: response.Took, Errors: response.Errors, Items: fromV6ToGenericBulkResponseItemMaps(response.Items), } } -func fromV6ToGenericBulkResponseItemMaps(items []map[string]*elastic.BulkResponseItem) []map[string]*GenericBulkResponseItem { - var gitems []map[string]*GenericBulkResponseItem +func fromV6ToGenericBulkResponseItemMaps(items []map[string]*elastic.BulkResponseItem) []map[string]*bulk.GenericBulkResponseItem { + var gitems []map[string]*bulk.GenericBulkResponseItem for _, it := range items { gitems = append(gitems, fromV6ToGenericBulkResponseItemMap(it)) } return gitems } -func fromV6ToGenericBulkResponseItemMap(m map[string]*elastic.BulkResponseItem) map[string]*GenericBulkResponseItem { +func fromV6ToGenericBulkResponseItemMap(m map[string]*elastic.BulkResponseItem) map[string]*bulk.GenericBulkResponseItem { if m == nil { return nil } - gm := make(map[string]*GenericBulkResponseItem, len(m)) + gm := make(map[string]*bulk.GenericBulkResponseItem, len(m)) for k, v := range m { gm[k] = fromV6ToGenericBulkResponseItem(v) } return gm } -func fromV6ToGenericBulkResponseItem(v *elastic.BulkResponseItem) *GenericBulkResponseItem { - return &GenericBulkResponseItem{ +func fromV6ToGenericBulkResponseItem(v *elastic.BulkResponseItem) *bulk.GenericBulkResponseItem { + return &bulk.GenericBulkResponseItem{ Index: v.Index, Type: v.Type, ID: v.Id, @@ -174,8 +176,8 @@ func fromV6ToGenericBulkResponseItem(v *elastic.BulkResponseItem) *GenericBulkRe } } -func fromV6ToGenericBulkableRequests(requests []elastic.BulkableRequest) []GenericBulkableRequest { - var v6Reqs []GenericBulkableRequest +func fromV6ToGenericBulkableRequests(requests []elastic.BulkableRequest) []bulk.GenericBulkableRequest { + var v6Reqs []bulk.GenericBulkableRequest for _, req := range requests { v6Reqs = append(v6Reqs, req) } diff --git a/common/elasticsearch/client_v7_bulk.go b/common/elasticsearch/client_v7_bulk.go index 298aa005473..30490fda3d6 100644 --- a/common/elasticsearch/client_v7_bulk.go +++ b/common/elasticsearch/client_v7_bulk.go @@ -25,16 +25,18 @@ package elasticsearch import ( "context" + "github.com/uber/cadence/common/elasticsearch/bulk" + "github.com/olivere/elastic/v7" ) -var _ GenericBulkProcessor = (*v7BulkProcessor)(nil) +var _ bulk.GenericBulkProcessor = (*v7BulkProcessor)(nil) type v7BulkProcessor struct { processor *elastic.BulkProcessor } -func (c *elasticV7) RunBulkProcessor(ctx context.Context, parameters *BulkProcessorParameters) (GenericBulkProcessor, error) { +func (c *elasticV7) RunBulkProcessor(ctx context.Context, parameters *bulk.BulkProcessorParameters) (bulk.GenericBulkProcessor, error) { beforeFunc := func(executionId int64, requests []elastic.BulkableRequest) { parameters.BeforeFunc(executionId, fromV7ToGenericBulkableRequests(requests)) } @@ -83,23 +85,23 @@ func (v *v7BulkProcessor) Close() error { return v.processor.Close() } -func (v *v7BulkProcessor) Add(request *GenericBulkableAddRequest) { +func (v *v7BulkProcessor) Add(request *bulk.GenericBulkableAddRequest) { var req elastic.BulkableRequest switch request.RequestType { - case BulkableDeleteRequest: + case bulk.BulkableDeleteRequest: req = elastic.NewBulkDeleteRequest(). Index(request.Index). Id(request.ID). VersionType(request.VersionType). Version(request.Version) - case BulkableIndexRequest: + case bulk.BulkableIndexRequest: req = elastic.NewBulkIndexRequest(). Index(request.Index). Id(request.ID). VersionType(request.VersionType). Version(request.Version). Doc(request.Doc) - case BulkableCreateRequest: + case bulk.BulkableCreateRequest: //for bulk create request still calls the bulk index method //with providing operation type req = elastic.NewBulkIndexRequest(). @@ -112,53 +114,53 @@ func (v *v7BulkProcessor) Add(request *GenericBulkableAddRequest) { v.processor.Add(req) } -func convertV7ErrorToGenericError(err error) *GenericError { +func convertV7ErrorToGenericError(err error) *bulk.GenericError { if err == nil { return nil } - status := unknownStatusCode + status := bulk.UnknownStatusCode switch e := err.(type) { case *elastic.Error: status = e.Status } - return &GenericError{ + return &bulk.GenericError{ Status: status, Details: err, } } -func fromV7toGenericBulkResponse(response *elastic.BulkResponse) *GenericBulkResponse { +func fromV7toGenericBulkResponse(response *elastic.BulkResponse) *bulk.GenericBulkResponse { if response == nil { - return &GenericBulkResponse{} + return &bulk.GenericBulkResponse{} } - return &GenericBulkResponse{ + return &bulk.GenericBulkResponse{ Took: response.Took, Errors: response.Errors, Items: fromV7ToGenericBulkResponseItemMaps(response.Items), } } -func fromV7ToGenericBulkResponseItemMaps(items []map[string]*elastic.BulkResponseItem) []map[string]*GenericBulkResponseItem { - var gitems []map[string]*GenericBulkResponseItem +func fromV7ToGenericBulkResponseItemMaps(items []map[string]*elastic.BulkResponseItem) []map[string]*bulk.GenericBulkResponseItem { + var gitems []map[string]*bulk.GenericBulkResponseItem for _, it := range items { gitems = append(gitems, fromV7ToGenericBulkResponseItemMap(it)) } return gitems } -func fromV7ToGenericBulkResponseItemMap(m map[string]*elastic.BulkResponseItem) map[string]*GenericBulkResponseItem { +func fromV7ToGenericBulkResponseItemMap(m map[string]*elastic.BulkResponseItem) map[string]*bulk.GenericBulkResponseItem { if m == nil { return nil } - gm := make(map[string]*GenericBulkResponseItem, len(m)) + gm := make(map[string]*bulk.GenericBulkResponseItem, len(m)) for k, v := range m { gm[k] = fromV7ToGenericBulkResponseItem(v) } return gm } -func fromV7ToGenericBulkResponseItem(v *elastic.BulkResponseItem) *GenericBulkResponseItem { - return &GenericBulkResponseItem{ +func fromV7ToGenericBulkResponseItem(v *elastic.BulkResponseItem) *bulk.GenericBulkResponseItem { + return &bulk.GenericBulkResponseItem{ Index: v.Index, Type: v.Type, ID: v.Id, @@ -171,8 +173,8 @@ func fromV7ToGenericBulkResponseItem(v *elastic.BulkResponseItem) *GenericBulkRe } } -func fromV7ToGenericBulkableRequests(requests []elastic.BulkableRequest) []GenericBulkableRequest { - var v7Reqs []GenericBulkableRequest +func fromV7ToGenericBulkableRequests(requests []elastic.BulkableRequest) []bulk.GenericBulkableRequest { + var v7Reqs []bulk.GenericBulkableRequest for _, req := range requests { v7Reqs = append(v7Reqs, req) } diff --git a/common/elasticsearch/common.go b/common/elasticsearch/common.go index 6d8e366f082..7bbb29c76fa 100644 --- a/common/elasticsearch/common.go +++ b/common/elasticsearch/common.go @@ -33,8 +33,6 @@ import ( ) const ( - unknownStatusCode = -1 - // TODO https://github.com/uber/cadence/issues/3686 oneMicroSecondInNano = int64(time.Microsecond / time.Nanosecond) diff --git a/common/elasticsearch/interfaces.go b/common/elasticsearch/interfaces.go index 5c0fa48f5da..034da2b7ba6 100644 --- a/common/elasticsearch/interfaces.go +++ b/common/elasticsearch/interfaces.go @@ -25,10 +25,10 @@ import ( "encoding/json" "fmt" "net/http" - "time" workflow "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/common/config" + "github.com/uber/cadence/common/elasticsearch/bulk" "github.com/uber/cadence/common/log" p "github.com/uber/cadence/common/persistence" ) @@ -70,14 +70,6 @@ func NewGenericClient( } } -type GenericBulkableRequestType int - -const ( - BulkableIndexRequest GenericBulkableRequestType = iota - BulkableDeleteRequest - BulkableCreateRequest -) - type ( // GenericClient is a generic interface for all versions of ElasticSearch clients GenericClient interface { @@ -97,7 +89,7 @@ type ( CountByQuery(ctx context.Context, index, query string) (int64, error) // RunBulkProcessor returns a processor for adding/removing docs into ElasticSearch index - RunBulkProcessor(ctx context.Context, p *BulkProcessorParameters) (GenericBulkProcessor, error) + RunBulkProcessor(ctx context.Context, p *bulk.BulkProcessorParameters) (bulk.GenericBulkProcessor, error) // PutMapping adds new field type to the index PutMapping(ctx context.Context, index, root, key, valueType string) error @@ -150,91 +142,6 @@ type ( // SearchForOneClosedExecutionResponse is response for SearchForOneClosedExecution SearchForOneClosedExecutionResponse = p.InternalGetClosedWorkflowExecutionResponse - // GenericBulkProcessor is a bulk processor - GenericBulkProcessor interface { - Start(ctx context.Context) error - Stop() error - Close() error - Add(request *GenericBulkableAddRequest) - Flush() error - } - - // BulkProcessorParameters holds all required and optional parameters for executing bulk service - BulkProcessorParameters struct { - Name string - NumOfWorkers int - BulkActions int - BulkSize int - FlushInterval time.Duration - Backoff GenericBackoff - BeforeFunc GenericBulkBeforeFunc - AfterFunc GenericBulkAfterFunc - } - - // GenericBackoff allows callers to implement their own Backoff strategy. - GenericBackoff interface { - // Next implements a BackoffFunc. - Next(retry int) (time.Duration, bool) - } - - // GenericBulkBeforeFunc defines the signature of callbacks that are executed - // before a commit to Elasticsearch. - GenericBulkBeforeFunc func(executionId int64, requests []GenericBulkableRequest) - - // GenericBulkAfterFunc defines the signature of callbacks that are executed - // after a commit to Elasticsearch. The err parameter signals an error. - GenericBulkAfterFunc func(executionId int64, requests []GenericBulkableRequest, response *GenericBulkResponse, err *GenericError) - - // IsRecordValidFilter is a function to filter visibility records - IsRecordValidFilter func(rec *p.InternalVisibilityWorkflowExecutionInfo) bool - - // GenericBulkableRequest is a generic interface to bulkable requests. - GenericBulkableRequest interface { - fmt.Stringer - Source() ([]string, error) - } - - // GenericBulkableAddRequest a struct to hold a bulk request - GenericBulkableAddRequest struct { - Index string - Type string - ID string - VersionType string - Version int64 - // request types can be index, delete or create - RequestType GenericBulkableRequestType - // should be nil if IsDelete is true - Doc interface{} - } - - // GenericBulkResponse is generic struct of bulk response - GenericBulkResponse struct { - Took int `json:"took,omitempty"` - Errors bool `json:"errors,omitempty"` - Items []map[string]*GenericBulkResponseItem `json:"items,omitempty"` - } - - // GenericError encapsulates error status and details returned from Elasticsearch. - GenericError struct { - Status int `json:"status"` - Details error `json:"error,omitempty"` - } - - // GenericBulkResponseItem is the result of a single bulk request. - GenericBulkResponseItem struct { - Index string `json:"_index,omitempty"` - Type string `json:"_type,omitempty"` - ID string `json:"_id,omitempty"` - Version int64 `json:"_version,omitempty"` - Result string `json:"result,omitempty"` - SeqNo int64 `json:"_seq_no,omitempty"` - PrimaryTerm int64 `json:"_primary_term,omitempty"` - Status int `json:"status,omitempty"` - ForcedRefresh bool `json:"forced_refresh,omitempty"` - // the error details - Error interface{} - } - // VisibilityRecord is a struct of doc for deserialization VisibilityRecord struct { WorkflowID string @@ -265,4 +172,7 @@ type ( Hits SearchHits Aggregations map[string]json.RawMessage } + + // IsRecordValidFilter is a function to filter visibility records + IsRecordValidFilter func(rec *p.InternalVisibilityWorkflowExecutionInfo) bool ) diff --git a/common/elasticsearch/mocks/GenericClient.go b/common/elasticsearch/mocks/GenericClient.go index 51458af2dee..86827ae310e 100644 --- a/common/elasticsearch/mocks/GenericClient.go +++ b/common/elasticsearch/mocks/GenericClient.go @@ -28,6 +28,7 @@ import ( mock "github.com/stretchr/testify/mock" elasticsearch "github.com/uber/cadence/common/elasticsearch" + bulk "github.com/uber/cadence/common/elasticsearch/bulk" persistence "github.com/uber/cadence/common/persistence" ) @@ -101,20 +102,20 @@ func (_m *GenericClient) PutMapping(ctx context.Context, index string, root stri } // RunBulkProcessor provides a mock function with given fields: ctx, p -func (_m *GenericClient) RunBulkProcessor(ctx context.Context, p *elasticsearch.BulkProcessorParameters) (elasticsearch.GenericBulkProcessor, error) { +func (_m *GenericClient) RunBulkProcessor(ctx context.Context, p *bulk.BulkProcessorParameters) (bulk.GenericBulkProcessor, error) { ret := _m.Called(ctx, p) - var r0 elasticsearch.GenericBulkProcessor - if rf, ok := ret.Get(0).(func(context.Context, *elasticsearch.BulkProcessorParameters) elasticsearch.GenericBulkProcessor); ok { + var r0 bulk.GenericBulkProcessor + if rf, ok := ret.Get(0).(func(context.Context, *bulk.BulkProcessorParameters) bulk.GenericBulkProcessor); ok { r0 = rf(ctx, p) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(elasticsearch.GenericBulkProcessor) + r0 = ret.Get(0).(bulk.GenericBulkProcessor) } } var r1 error - if rf, ok := ret.Get(1).(func(context.Context, *elasticsearch.BulkProcessorParameters) error); ok { + if rf, ok := ret.Get(1).(func(context.Context, *bulk.BulkProcessorParameters) error); ok { r1 = rf(ctx, p) } else { r1 = ret.Error(1) diff --git a/service/worker/indexer/esProcessor.go b/service/worker/indexer/esProcessor.go index 3f7089f3dcd..bdb2801b66b 100644 --- a/service/worker/indexer/esProcessor.go +++ b/service/worker/indexer/esProcessor.go @@ -26,6 +26,8 @@ import ( "fmt" "time" + "github.com/uber/cadence/common/elasticsearch/bulk" + "github.com/uber/cadence/.gen/go/indexer" "github.com/uber/cadence/common" "github.com/uber/cadence/common/codec" @@ -47,7 +49,7 @@ const ( type ( // ESProcessorImpl implements ESProcessor, it's an agent of GenericBulkProcessor ESProcessorImpl struct { - bulkProcessor es.GenericBulkProcessor + bulkProcessor bulk.GenericBulkProcessor mapToKafkaMsg collection.ConcurrentTxMap // used to map ES request to kafka message config *Config logger log.Logger @@ -76,13 +78,13 @@ func newESProcessor( msgEncoder: defaultEncoder, } - params := &es.BulkProcessorParameters{ + params := &bulk.BulkProcessorParameters{ Name: name, NumOfWorkers: config.ESProcessorNumOfWorkers(), BulkActions: config.ESProcessorBulkActions(), BulkSize: config.ESProcessorBulkSize(), FlushInterval: config.ESProcessorFlushInterval(), - Backoff: es.NewExponentialBackoff(esProcessorInitialRetryInterval, esProcessorMaxRetryInterval), + Backoff: bulk.NewExponentialBackoff(esProcessorInitialRetryInterval, esProcessorMaxRetryInterval), BeforeFunc: p.bulkBeforeAction, AfterFunc: p.bulkAfterAction, } @@ -106,7 +108,7 @@ func (p *ESProcessorImpl) Stop() { } // Add an ES request, and an map item for kafka message -func (p *ESProcessorImpl) Add(request *es.GenericBulkableAddRequest, key string, kafkaMsg messaging.Message) { +func (p *ESProcessorImpl) Add(request *bulk.GenericBulkableAddRequest, key string, kafkaMsg messaging.Message) { actionWhenFoundDuplicates := func(key interface{}, value interface{}) error { return kafkaMsg.Ack() } @@ -120,12 +122,12 @@ func (p *ESProcessorImpl) Add(request *es.GenericBulkableAddRequest, key string, } // bulkBeforeAction is triggered before bulk bulkProcessor commit -func (p *ESProcessorImpl) bulkBeforeAction(executionID int64, requests []es.GenericBulkableRequest) { +func (p *ESProcessorImpl) bulkBeforeAction(executionID int64, requests []bulk.GenericBulkableRequest) { p.scope.AddCounter(metrics.ESProcessorRequests, int64(len(requests))) } // bulkAfterAction is triggered after bulk bulkProcessor commit -func (p *ESProcessorImpl) bulkAfterAction(id int64, requests []es.GenericBulkableRequest, response *es.GenericBulkResponse, err *es.GenericError) { +func (p *ESProcessorImpl) bulkAfterAction(id int64, requests []bulk.GenericBulkableRequest, response *bulk.GenericBulkResponse, err *bulk.GenericError) { if err != nil { // This happens after configured retry, which means something bad happens on cluster or index // When cluster back to live, bulkProcessor will re-commit those failure requests @@ -214,7 +216,7 @@ func (p *ESProcessorImpl) getKafkaMsg(key string) (kafkaMsg *kafkaMessageWithMet return kafkaMsg, ok } -func (p *ESProcessorImpl) retrieveKafkaKey(request es.GenericBulkableRequest) string { +func (p *ESProcessorImpl) retrieveKafkaKey(request bulk.GenericBulkableRequest) string { req, err := request.Source() if err != nil { p.logger.Error("Get request source err.", tag.Error(err), tag.ESRequest(request.String())) @@ -309,7 +311,7 @@ func isResponseRetriable(status int) bool { return ok } -func getErrorMsgFromESResp(resp *es.GenericBulkResponseItem) string { +func getErrorMsgFromESResp(resp *bulk.GenericBulkResponseItem) string { var errMsg string if resp.Error != nil { errMsg = fmt.Sprintf("%v", resp.Error) diff --git a/service/worker/indexer/esProcessor_test.go b/service/worker/indexer/esProcessor_test.go index 9ab7eaebc76..14e34d0d681 100644 --- a/service/worker/indexer/esProcessor_test.go +++ b/service/worker/indexer/esProcessor_test.go @@ -26,6 +26,9 @@ import ( "testing" "time" + "github.com/uber/cadence/common/elasticsearch/bulk" + mocks2 "github.com/uber/cadence/common/elasticsearch/bulk/mocks" + "go.uber.org/zap/zaptest" "github.com/stretchr/testify/mock" @@ -36,7 +39,6 @@ import ( "github.com/uber/cadence/common/collection" "github.com/uber/cadence/common/dynamicconfig" "github.com/uber/cadence/common/elasticsearch" - es "github.com/uber/cadence/common/elasticsearch" esMocks "github.com/uber/cadence/common/elasticsearch/mocks" "github.com/uber/cadence/common/log/loggerimpl" msgMocks "github.com/uber/cadence/common/messaging/mocks" @@ -47,7 +49,7 @@ import ( type esProcessorSuite struct { suite.Suite esProcessor *ESProcessorImpl - mockBulkProcessor *esMocks.GenericBulkProcessor + mockBulkProcessor *mocks2.GenericBulkProcessor mockESClient *esMocks.GenericClient mockScope *mocks.Scope } @@ -77,7 +79,7 @@ func (s *esProcessorSuite) SetupTest() { ESProcessorBulkSize: dynamicconfig.GetIntPropertyFn(2 << 20), ESProcessorFlushInterval: dynamicconfig.GetDurationPropertyFn(1 * time.Minute), } - s.mockBulkProcessor = &esMocks.GenericBulkProcessor{} + s.mockBulkProcessor = &mocks2.GenericBulkProcessor{} zapLogger := zaptest.NewLogger(s.T()) s.mockScope = &mocks.Scope{} @@ -109,7 +111,7 @@ func (s *esProcessorSuite) TestNewESProcessorAndStart() { } processorName := "test-bulkProcessor" - s.mockESClient.On("RunBulkProcessor", mock.Anything, mock.MatchedBy(func(input *es.BulkProcessorParameters) bool { + s.mockESClient.On("RunBulkProcessor", mock.Anything, mock.MatchedBy(func(input *bulk.BulkProcessorParameters) bool { s.Equal(processorName, input.Name) s.Equal(config.ESProcessorNumOfWorkers(), input.NumOfWorkers) s.Equal(config.ESProcessorBulkActions(), input.BulkActions) @@ -118,7 +120,7 @@ func (s *esProcessorSuite) TestNewESProcessorAndStart() { s.NotNil(input.Backoff) s.NotNil(input.AfterFunc) return true - })).Return(&esMocks.GenericBulkProcessor{}, nil).Once() + })).Return(&mocks2.GenericBulkProcessor{}, nil).Once() processor, err := newESProcessor(processorName, config, s.mockESClient, s.esProcessor.logger, metrics.NewNoopMetricsClient()) s.NoError(err) @@ -132,7 +134,7 @@ func (s *esProcessorSuite) TestStop() { } func (s *esProcessorSuite) TestAdd() { - request := &es.GenericBulkableAddRequest{RequestType: es.BulkableIndexRequest} + request := &bulk.GenericBulkableAddRequest{RequestType: bulk.BulkableIndexRequest} mockKafkaMsg := &msgMocks.Message{} key := "test-key" s.Equal(0, s.esProcessor.mapToKafkaMsg.Len()) @@ -152,7 +154,7 @@ func (s *esProcessorSuite) TestAdd() { } func (s *esProcessorSuite) TestAdd_ConcurrentAdd() { - request := &es.GenericBulkableAddRequest{RequestType: es.BulkableIndexRequest} + request := &bulk.GenericBulkableAddRequest{RequestType: bulk.BulkableIndexRequest} mockKafkaMsg := &msgMocks.Message{} key := "test-key" @@ -176,13 +178,13 @@ func (s *esProcessorSuite) TestAdd_ConcurrentAdd() { func (s *esProcessorSuite) TestBulkAfterActionX() { version := int64(3) testKey := "testKey" - request := &esMocks.GenericBulkableRequest{} + request := &mocks2.GenericBulkableRequest{} request.On("String").Return("") request.On("Source").Return([]string{string(`{"delete":{"_id":"testKey"}}`)}, nil) - requests := []es.GenericBulkableRequest{request} + requests := []bulk.GenericBulkableRequest{request} - mSuccess := map[string]*es.GenericBulkResponseItem{ + mSuccess := map[string]*bulk.GenericBulkResponseItem{ "index": { Index: testIndex, Type: testType, @@ -191,10 +193,10 @@ func (s *esProcessorSuite) TestBulkAfterActionX() { Status: 200, }, } - response := &es.GenericBulkResponse{ + response := &bulk.GenericBulkResponse{ Took: 3, Errors: false, - Items: []map[string]*es.GenericBulkResponseItem{mSuccess}, + Items: []map[string]*bulk.GenericBulkResponseItem{mSuccess}, } mockKafkaMsg := &msgMocks.Message{} @@ -208,12 +210,12 @@ func (s *esProcessorSuite) TestBulkAfterActionX() { func (s *esProcessorSuite) TestBulkAfterAction_Nack() { version := int64(3) testKey := "testKey" - request := &esMocks.GenericBulkableRequest{} + request := &mocks2.GenericBulkableRequest{} request.On("String").Return("") request.On("Source").Return([]string{string(`{"delete":{"_id":"testKey"}}`)}, nil) - requests := []es.GenericBulkableRequest{request} + requests := []bulk.GenericBulkableRequest{request} - mFailed := map[string]*es.GenericBulkResponseItem{ + mFailed := map[string]*bulk.GenericBulkResponseItem{ "index": { Index: testIndex, Type: testType, @@ -222,10 +224,10 @@ func (s *esProcessorSuite) TestBulkAfterAction_Nack() { Status: 400, }, } - response := &es.GenericBulkResponse{ + response := &bulk.GenericBulkResponse{ Took: 3, Errors: false, - Items: []map[string]*es.GenericBulkResponseItem{mFailed}, + Items: []map[string]*bulk.GenericBulkResponseItem{mFailed}, } wid := "test-workflowID" @@ -246,12 +248,12 @@ func (s *esProcessorSuite) TestBulkAfterAction_Nack() { func (s *esProcessorSuite) TestBulkAfterAction_Error() { version := int64(3) testKey := "testKey" - request := &esMocks.GenericBulkableRequest{} + request := &mocks2.GenericBulkableRequest{} request.On("String").Return("") request.On("Source").Return([]string{string(`{"delete":{"_id":"testKey"}}`)}, nil) - requests := []es.GenericBulkableRequest{request} + requests := []bulk.GenericBulkableRequest{request} - mFailed := map[string]*es.GenericBulkResponseItem{ + mFailed := map[string]*bulk.GenericBulkResponseItem{ "index": { Index: testIndex, Type: testType, @@ -260,10 +262,10 @@ func (s *esProcessorSuite) TestBulkAfterAction_Error() { Status: 400, }, } - response := &es.GenericBulkResponse{ + response := &bulk.GenericBulkResponse{ Took: 3, Errors: true, - Items: []map[string]*es.GenericBulkResponseItem{mFailed}, + Items: []map[string]*bulk.GenericBulkResponseItem{mFailed}, } wid := "test-workflowID" @@ -277,7 +279,7 @@ func (s *esProcessorSuite) TestBulkAfterAction_Error() { mockKafkaMsg.On("Nack").Return(nil).Once() mockKafkaMsg.On("Value").Return(payload).Once() s.mockScope.On("IncCounter", metrics.ESProcessorFailures).Once() - s.esProcessor.bulkAfterAction(0, requests, response, &es.GenericError{Details: fmt.Errorf("some error")}) + s.esProcessor.bulkAfterAction(0, requests, response, &bulk.GenericError{Details: fmt.Errorf("some error")}) } func (s *esProcessorSuite) TestAckKafkaMsg() { @@ -285,7 +287,7 @@ func (s *esProcessorSuite) TestAckKafkaMsg() { // no msg in map, nothing called s.esProcessor.ackKafkaMsg(key) - request := &es.GenericBulkableAddRequest{} + request := &bulk.GenericBulkableAddRequest{} mockKafkaMsg := &msgMocks.Message{} s.mockScope.On("StartTimer", testMetric).Return(testStopWatch).Once() s.mockBulkProcessor.On("Add", request).Return().Once() @@ -303,7 +305,7 @@ func (s *esProcessorSuite) TestNackKafkaMsg() { // no msg in map, nothing called s.esProcessor.nackKafkaMsg(key) - request := &es.GenericBulkableAddRequest{} + request := &bulk.GenericBulkableAddRequest{} mockKafkaMsg := &msgMocks.Message{} s.mockBulkProcessor.On("Add", request).Return().Once() s.mockScope.On("StartTimer", testMetric).Return(testStopWatch).Once() @@ -384,19 +386,19 @@ func (s *esProcessorSuite) TestIsResponseRetriable() { func (s *esProcessorSuite) TestIsErrorRetriable() { tests := []struct { - input *es.GenericError + input *bulk.GenericError expected bool }{ { - input: &es.GenericError{Status: 400}, + input: &bulk.GenericError{Status: 400}, expected: false, }, { - input: &es.GenericError{Status: 408}, + input: &bulk.GenericError{Status: 408}, expected: true, }, { - input: &es.GenericError{}, + input: &bulk.GenericError{}, expected: false, }, } diff --git a/service/worker/indexer/indexer.go b/service/worker/indexer/indexer.go index b03944ed59e..ae6bf14c63c 100644 --- a/service/worker/indexer/indexer.go +++ b/service/worker/indexer/indexer.go @@ -33,6 +33,7 @@ import ( "github.com/uber/cadence/common/definition" "github.com/uber/cadence/common/dynamicconfig" es "github.com/uber/cadence/common/elasticsearch" + "github.com/uber/cadence/common/elasticsearch/bulk" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/messaging" @@ -53,7 +54,7 @@ var ( type ( ESProcessor interface { common.Daemon - Add(request *es.GenericBulkableAddRequest, key string, kafkaMsg messaging.Message) + Add(request *bulk.GenericBulkableAddRequest, key string, kafkaMsg messaging.Message) } // Indexer used to consumer data from kafka then send to ElasticSearch Indexer struct { @@ -223,7 +224,7 @@ func (i *Indexer) addMessageToES(indexMsg *indexer.Message, kafkaMsg messaging.M } var keyToKafkaMsg string - req := &es.GenericBulkableAddRequest{ + req := &bulk.GenericBulkableAddRequest{ Index: i.esIndexName, Type: es.GetESDocType(), ID: docID, @@ -235,15 +236,15 @@ func (i *Indexer) addMessageToES(indexMsg *indexer.Message, kafkaMsg messaging.M keyToKafkaMsg = fmt.Sprintf("%v-%v", kafkaMsg.Partition(), kafkaMsg.Offset()) doc := i.generateESDoc(indexMsg, keyToKafkaMsg) req.Doc = doc - req.RequestType = es.BulkableIndexRequest + req.RequestType = bulk.BulkableIndexRequest case indexer.MessageTypeDelete: keyToKafkaMsg = docID - req.RequestType = es.BulkableDeleteRequest + req.RequestType = bulk.BulkableDeleteRequest case indexer.MessageTypeCreate: keyToKafkaMsg = fmt.Sprintf("%v-%v", kafkaMsg.Partition(), kafkaMsg.Offset()) doc := i.generateESDoc(indexMsg, keyToKafkaMsg) req.Doc = doc - req.RequestType = es.BulkableCreateRequest + req.RequestType = bulk.BulkableCreateRequest default: logger.Error("Unknown message type") i.scope.IncCounter(metrics.IndexProcessorCorruptedData)