From b77c307a82e137ddbdd2c31dec5e52aa43e7c8cc Mon Sep 17 00:00:00 2001 From: Martin Czygan Date: Sun, 30 Nov 2014 22:51:08 +0100 Subject: [PATCH] fix index communication index.refresh_interval settings weren't applied at all. Also, in -verbose mode report HTTP status codes for various requests. --- cmd/esbulk/esbulk.go | 50 +++++++++++++++++++++++++++++++++----------- 1 file changed, 38 insertions(+), 12 deletions(-) diff --git a/cmd/esbulk/esbulk.go b/cmd/esbulk/esbulk.go index d4047dd..e2d8c54 100644 --- a/cmd/esbulk/esbulk.go +++ b/cmd/esbulk/esbulk.go @@ -87,8 +87,45 @@ func main() { go esbulk.Worker(fmt.Sprintf("worker-%d", i), options, queue, &wg) } + client := &http.Client{} + + defer func() { + r := strings.NewReader(`{"index": {"refresh_interval": "1s"}}`) + req, err := http.NewRequest("PUT", fmt.Sprintf("http://%s:%d/%s/_settings", *host, *port, *indexName), r) + if err != nil { + log.Fatal(err) + } + resp, err := client.Do(req) + log.Printf("setting index.refresh_interval to 1s: %s\n", resp.Status) + if err != nil { + log.Fatal(err) + } + resp, err = http.Post(fmt.Sprintf("http://%s:%d/%s/_flush", *host, *port, *indexName), "", nil) + log.Printf("index flush: %s\n", resp.Status) + if err != nil { + log.Fatal(err) + } + }() + + // create index if not exists + req, err := http.NewRequest("PUT", fmt.Sprintf("http://%s:%d/%s/", *host, *port, *indexName), nil) + if err != nil { + log.Fatal(err) + } + resp, err := client.Do(req) + log.Printf("creating index: %s\n", resp.Status) + if err != nil { + log.Fatal(err) + } + // set refresh inteval to -1 - _, err = http.NewRequest("PUT", fmt.Sprintf("http://%s:%d/%s/_settings", *host, *port, *indexName), strings.NewReader(`{"index": {"refresh_interval": "-1"}}`)) + r := strings.NewReader(`{"index": {"refresh_interval": "-1"}}`) + req, err = http.NewRequest("PUT", fmt.Sprintf("http://%s:%d/%s/_settings", *host, *port, *indexName), r) + if err != nil { + log.Fatal(err) + } + resp, err = client.Do(req) + log.Printf("setting index.refresh_interval to -1: %s\n", resp.Status) if err != nil { log.Fatal(err) } @@ -122,17 +159,6 @@ func main() { wg.Wait() elapsed := time.Since(start) - defer func() { - _, err = http.NewRequest("PUT", fmt.Sprintf("http://%s:%d/%s/_settings", *host, *port, *indexName), strings.NewReader(`{"index": {"refresh_interval": "1s"}}`)) - if err != nil { - log.Fatal(err) - } - _, err = http.Post(fmt.Sprintf("http://%s:%d/%s/_flush", *host, *port, *indexName), "", nil) - if err != nil { - log.Fatal(err) - } - }() - if *memprofile != "" { f, err := os.Create(*memprofile) if err != nil {