From 0741128558d9e01365bfaedc8cc771bfbc757b62 Mon Sep 17 00:00:00 2001 From: gransy Date: Fri, 26 Mar 2021 09:36:43 +0100 Subject: [PATCH] Added support for OpType - index, create, update --- cmd/esbulk/main.go | 2 ++ indexing.go | 16 +++++++++++----- run.go | 5 +++++ 3 files changed, 18 insertions(+), 5 deletions(-) diff --git a/cmd/esbulk/main.go b/cmd/esbulk/main.go index 7d21dbf..a6a03fe 100644 --- a/cmd/esbulk/main.go +++ b/cmd/esbulk/main.go @@ -15,6 +15,7 @@ var ( cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file") memprofile = flag.String("memprofile", "", "write heap profile to file") indexName = flag.String("index", "", "index name") + opType = flag.String("optype", "index", "optype (index - will replace existing data, create - will only create a new doc, update - create new or update existing data)") docType = flag.String("type", "", "elasticsearch doc type (deprecated since ES7)") batchSize = flag.Int("size", 1000, "bulk batch size") numWorkers = flag.Int("w", runtime.NumCPU(), "number of workers to use") @@ -65,6 +66,7 @@ func main() { Mapping: *mapping, MemProfile: *memprofile, NumWorkers: *numWorkers, + OpType: *opType, Password: password, Pipeline: *pipeline, Purge: *purge, diff --git a/indexing.go b/indexing.go index 2af2ccd..68479e5 100644 --- a/indexing.go +++ b/indexing.go @@ -22,6 +22,7 @@ var errParseCannotServerAddr = errors.New("cannot parse server address") type Options struct { Servers []string Index string + OpType string DocType string BatchSize int Verbose bool @@ -102,9 +103,9 @@ func BulkIndex(docs []string, options Options) error { } var header string if options.DocType == "" { - header = fmt.Sprintf(`{"index": {"_index": "%s"}}`, options.Index) + header = fmt.Sprintf(`{"%s": {"_index": "%s"}}`, options.OpType, options.Index) } else { - header = fmt.Sprintf(`{"index": {"_index": "%s", "_type": "%s"}}`, options.Index, options.DocType) + header = fmt.Sprintf(`{"%s": {"_index": "%s", "_type": "%s"}}`, options.OpType, options.Index, options.DocType) } // If an "-id" is given, peek into the document to extract the ID and @@ -152,10 +153,10 @@ func BulkIndex(docs []string, options Options) error { } if options.DocType == "" { - header = fmt.Sprintf(`{"index": {"_index": "%s", "_id": %q}}`, options.Index, idstr) + header = fmt.Sprintf(`{"%s": {"_index": "%s", "_id": %q}}`, options.OpType, options.Index, idstr) } else { - header = fmt.Sprintf(`{"index": {"_index": "%s", "_type": "%s", "_id": %q}}`, - options.Index, options.DocType, idstr) + header = fmt.Sprintf(`{"%s": {"_index": "%s", "_type": "%s", "_id": %q}}`, + options.OpType, options.Index, options.DocType, idstr) } // Remove the IDField if it is accidentally named '_id', since @@ -177,6 +178,11 @@ func BulkIndex(docs []string, options Options) error { doc = string(b) } } + + if options.OpType=="update" { + doc = fmt.Sprintf(`{"doc": %s, "doc_as_upsert" : true}`, doc) + } + lines = append(lines, header, doc) } diff --git a/run.go b/run.go index 8604a18..23f48e8 100644 --- a/run.go +++ b/run.go @@ -33,6 +33,7 @@ var ( type Runner struct { BatchSize int CpuProfile string + OpType string DocType string File *os.File FileGzipped bool @@ -77,6 +78,9 @@ func (r *Runner) Run() (err error) { if r.IndexName == "" { return ErrIndexNameRequired } + if r.OpType == "" { + r.OpType = "index" + } if len(r.Servers) == 0 { r.Servers = append(r.Servers, "http://localhost:9200") } @@ -86,6 +90,7 @@ func (r *Runner) Run() (err error) { options := Options{ Servers: r.Servers, Index: r.IndexName, + OpType: r.OpType, DocType: r.DocType, BatchSize: r.BatchSize, Verbose: r.Verbose,