Skip to content

Commit

Permalink
Added support for OpType - index, create, update
Browse files Browse the repository at this point in the history
  • Loading branch information
gransy committed Mar 26, 2021
1 parent 940a6c2 commit 0741128
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 5 deletions.
2 changes: 2 additions & 0 deletions cmd/esbulk/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ var (
cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
memprofile = flag.String("memprofile", "", "write heap profile to file")
indexName = flag.String("index", "", "index name")
opType = flag.String("optype", "index", "optype (index - will replace existing data, create - will only create a new doc, update - create new or update existing data)")
docType = flag.String("type", "", "elasticsearch doc type (deprecated since ES7)")
batchSize = flag.Int("size", 1000, "bulk batch size")
numWorkers = flag.Int("w", runtime.NumCPU(), "number of workers to use")
Expand Down Expand Up @@ -65,6 +66,7 @@ func main() {
Mapping: *mapping,
MemProfile: *memprofile,
NumWorkers: *numWorkers,
OpType: *opType,
Password: password,
Pipeline: *pipeline,
Purge: *purge,
Expand Down
16 changes: 11 additions & 5 deletions indexing.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ var errParseCannotServerAddr = errors.New("cannot parse server address")
type Options struct {
Servers []string
Index string
OpType string
DocType string
BatchSize int
Verbose bool
Expand Down Expand Up @@ -102,9 +103,9 @@ func BulkIndex(docs []string, options Options) error {
}
var header string
if options.DocType == "" {
header = fmt.Sprintf(`{"index": {"_index": "%s"}}`, options.Index)
header = fmt.Sprintf(`{"%s": {"_index": "%s"}}`, options.OpType, options.Index)
} else {
header = fmt.Sprintf(`{"index": {"_index": "%s", "_type": "%s"}}`, options.Index, options.DocType)
header = fmt.Sprintf(`{"%s": {"_index": "%s", "_type": "%s"}}`, options.OpType, options.Index, options.DocType)
}

// If an "-id" is given, peek into the document to extract the ID and
Expand Down Expand Up @@ -152,10 +153,10 @@ func BulkIndex(docs []string, options Options) error {
}

if options.DocType == "" {
header = fmt.Sprintf(`{"index": {"_index": "%s", "_id": %q}}`, options.Index, idstr)
header = fmt.Sprintf(`{"%s": {"_index": "%s", "_id": %q}}`, options.OpType, options.Index, idstr)
} else {
header = fmt.Sprintf(`{"index": {"_index": "%s", "_type": "%s", "_id": %q}}`,
options.Index, options.DocType, idstr)
header = fmt.Sprintf(`{"%s": {"_index": "%s", "_type": "%s", "_id": %q}}`,
options.OpType, options.Index, options.DocType, idstr)
}

// Remove the IDField if it is accidentally named '_id', since
Expand All @@ -177,6 +178,11 @@ func BulkIndex(docs []string, options Options) error {
doc = string(b)
}
}

if options.OpType=="update" {
doc = fmt.Sprintf(`{"doc": %s, "doc_as_upsert" : true}`, doc)
}

lines = append(lines, header, doc)
}

Expand Down
5 changes: 5 additions & 0 deletions run.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ var (
type Runner struct {
BatchSize int
CpuProfile string
OpType string
DocType string
File *os.File
FileGzipped bool
Expand Down Expand Up @@ -77,6 +78,9 @@ func (r *Runner) Run() (err error) {
if r.IndexName == "" {
return ErrIndexNameRequired
}
if r.OpType == "" {
r.OpType = "index"
}
if len(r.Servers) == 0 {
r.Servers = append(r.Servers, "http://localhost:9200")
}
Expand All @@ -86,6 +90,7 @@ func (r *Runner) Run() (err error) {
options := Options{
Servers: r.Servers,
Index: r.IndexName,
OpType: r.OpType,
DocType: r.DocType,
BatchSize: r.BatchSize,
Verbose: r.Verbose,
Expand Down

0 comments on commit 0741128

Please # to comment.