Skip to content

Commit

Permalink
fix: adjusts to use retry_on_conflict on bulk
Browse files Browse the repository at this point in the history
Signed-off-by: Nicolas Barbosa <nicolas.barbosa@fretebras.com.br>
  • Loading branch information
nicolascb authored and ncbfretebras committed Jan 18, 2023
1 parent 65e452b commit a0c0438
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 0 deletions.
2 changes: 2 additions & 0 deletions opensearchutil/bulk_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ type bulkActionMetadata struct {
WaitForActiveShards interface{} `json:"wait_for_active_shards,omitempty"`
Refresh *string `json:"refresh,omitempty"`
RequireAlias *bool `json:"require_alias,omitempty"`
RetryOnConflict *int `json:"retry_on_conflict,omitempty"`
}

// BulkIndexerResponse represents the OpenSearch response.
Expand Down Expand Up @@ -447,6 +448,7 @@ func (w *worker) writeMeta(item BulkIndexerItem) error {
WaitForActiveShards: item.WaitForActiveShards,
Refresh: item.Refresh,
RequireAlias: item.RequireAlias,
RetryOnConflict: item.RetryOnConflict,
}
// Can not specify version or seq num if no document ID is passed
if meta.DocumentID == "" {
Expand Down
16 changes: 16 additions & 0 deletions opensearchutil/bulk_indexer_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,18 @@ func TestBulkIndexer(t *testing.T) {
}},
`{"index":{"_index":"test","_id":"42","version":25,"version_type":"external","wait_for_active_shards":"all"}}` + "\n",
},
{
"with retry_on_conflict",
args{BulkIndexerItem{
Action: "index",
DocumentID: "42",
Index: "test",
Version: int64Pointer(25),
VersionType: strPointer("external"),
RetryOnConflict: intPointer(5),
}},
`{"index":{"_index":"test","_id":"42","version":25,"version_type":"external","retry_on_conflict":5}}` + "\n",
},
}
for _, tt := range tests {
tt := tt
Expand Down Expand Up @@ -777,3 +789,7 @@ func strPointer(s string) *string {
func int64Pointer(i int64) *int64 {
return &i
}

func intPointer(i int) *int {
return &i
}

0 comments on commit a0c0438

Please # to comment.