-
Notifications
You must be signed in to change notification settings - Fork 31
/
Copy pathelastic.go
109 lines (104 loc) · 2.55 KB
/
elastic.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package elastic
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"strings"
"time"
es "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/maddevsio/ariadna/config"
"github.com/sirupsen/logrus"
)
type Client struct {
conn *es.Client
config *config.Ariadna
createdIndex string
logger *logrus.Logger
}
func New(conf *config.Ariadna) (*Client, error) {
c, err := es.NewClient(es.Config{
Addresses: conf.ElasticURLs,
})
if err != nil {
return nil, err
}
return &Client{conn: c, config: conf, logger: logrus.New()}, nil
}
func (c *Client) UpdateIndex() error {
c.createdIndex = fmt.Sprintf("%s-%d", c.config.ElasticIndex, time.Now().Unix())
r := &esapi.IndicesCreateRequest{Index: c.createdIndex}
data := `
{
"mappings": {
"properties": {
"location": {"type":"geo_point"}
}
}
}`
r.Body = bytes.NewReader([]byte(data))
res, err := r.Do(context.TODO(), c.conn.Transport)
if err != nil {
return err
}
if res.IsError() {
return fmt.Errorf("could not update settings: %v", res)
}
c.logger.Infof("created index %s", c.createdIndex)
res, err = c.conn.Indices.PutAlias([]string{c.createdIndex}, c.config.ElasticIndex)
if err != nil {
return err
}
if res.IsError() {
return fmt.Errorf("could not create alias: %v", res)
}
c.logger.Info("alias was created")
return nil
}
func (c *Client) DeleteIndices() error {
var indicesToDelete []string
r := esapi.IndicesGetAliasRequest{Name: []string{c.config.ElasticIndex}}
res, err := r.Do(context.TODO(), c.conn.Transport)
if err != nil {
return err
}
if res.IsError() {
return fmt.Errorf("could not create alias: %v", res)
}
data, err := ioutil.ReadAll(res.Body)
if err != nil {
return err
}
var schema map[string]interface{}
err = json.Unmarshal(data, &schema)
if err != nil {
return err
}
for key := range schema {
if key != c.createdIndex && strings.Contains(key, c.config.ElasticIndex) {
indicesToDelete = append(indicesToDelete, key)
}
}
res, err = c.conn.Indices.Delete(indicesToDelete)
if err != nil {
return err
}
if res.IsError() {
return fmt.Errorf("could not delete indices: %v", res)
}
c.logger.Infof("deleted indices: %v", indicesToDelete)
return nil
}
func (c *Client) BulkWrite(buf bytes.Buffer) error {
res, err := c.conn.Bulk(bytes.NewReader(buf.Bytes()), c.conn.Bulk.WithIndex(c.createdIndex))
if err != nil {
return err
}
if res.IsError() {
return fmt.Errorf("could not perform bulk insert: %v", res)
}
c.logger.Info("bulk insert is finished")
return nil
}