From fe230a3502f61930fee62623e626aeb33132b8cf Mon Sep 17 00:00:00 2001 From: Vincent Rischmann Date: Mon, 22 Jun 2015 12:11:58 +0200 Subject: [PATCH] Update for InfluxDB 0.9 --- influxdb/influxdb.go | 132 +++++++++++++++++++++++++++---------------- 1 file changed, 82 insertions(+), 50 deletions(-) diff --git a/influxdb/influxdb.go b/influxdb/influxdb.go index 0163c9b..bd45d49 100644 --- a/influxdb/influxdb.go +++ b/influxdb/influxdb.go @@ -2,10 +2,12 @@ package influxdb import ( "fmt" - influxClient "github.com/influxdb/influxdb/client" - "github.com/rcrowley/go-metrics" "log" + "net/url" "time" + + influxClient "github.com/influxdb/influxdb/client" + "github.com/rcrowley/go-metrics" ) type Config struct { @@ -16,97 +18,127 @@ type Config struct { } func Influxdb(r metrics.Registry, d time.Duration, config *Config) { - client, err := influxClient.NewClient(&influxClient.ClientConfig{ - Host: config.Host, - Database: config.Database, + url, err := url.Parse(fmt.Sprintf("http://%s", config.Host)) + if err != nil { + log.Println(err) + return + } + + client, err := influxClient.NewClient(influxClient.Config{ + URL: *url, Username: config.Username, Password: config.Password, }) + if err != nil { log.Println(err) return } for _ = range time.Tick(d) { - if err := send(r, client); err != nil { + if err := send(r, client, config.Database); err != nil { log.Println(err) } } } -func send(r metrics.Registry, client *influxClient.Client) error { - series := []*influxClient.Series{} +func send(r metrics.Registry, client *influxClient.Client, database string) error { + var points []influxClient.Point + now := time.Now() r.Each(func(name string, i interface{}) { - now := getCurrentTime() switch metric := i.(type) { case metrics.Counter: - series = append(series, &influxClient.Series{ - Name: fmt.Sprintf("%s.count", name), - Columns: []string{"time", "count"}, - Points: [][]interface{}{ - {now, metric.Count()}, + points = append(points, influxClient.Point{ + Measurement: fmt.Sprintf("%s.count", name), + Fields: map[string]interface{}{ + "value": metric.Count(), }, + Time: now, }) case metrics.Gauge: - series = append(series, &influxClient.Series{ - Name: fmt.Sprintf("%s.value", name), - Columns: []string{"time", "value"}, - Points: [][]interface{}{ - {now, metric.Value()}, + points = append(points, influxClient.Point{ + Measurement: fmt.Sprintf("%s.value", name), + Fields: map[string]interface{}{ + "value": metric.Value(), }, + Time: now, }) case metrics.GaugeFloat64: - series = append(series, &influxClient.Series{ - Name: fmt.Sprintf("%s.value", name), - Columns: []string{"time", "value"}, - Points: [][]interface{}{ - {now, metric.Value()}, + points = append(points, influxClient.Point{ + Measurement: fmt.Sprintf("%s.value", name), + Fields: map[string]interface{}{ + "value": metric.Value(), }, + Time: now, }) case metrics.Histogram: h := metric.Snapshot() ps := h.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999}) - series = append(series, &influxClient.Series{ - Name: fmt.Sprintf("%s.histogram", name), - Columns: []string{"time", "count", "min", "max", "mean", "std-dev", - "50-percentile", "75-percentile", "95-percentile", - "99-percentile", "999-percentile"}, - Points: [][]interface{}{ - {now, h.Count(), h.Min(), h.Max(), h.Mean(), h.StdDev(), - ps[0], ps[1], ps[2], ps[3], ps[4]}, + + points = append(points, influxClient.Point{ + Measurement: fmt.Sprintf("%s.value", name), + Fields: map[string]interface{}{ + "count": h.Count(), + "min": h.Min(), + "max": h.Max(), + "mean": h.Mean(), + "std-dev": h.StdDev(), + "50-percentile": ps[0], + "75-percentile": ps[1], + "95-percentile": ps[2], + "99-percentile": ps[3], + "999-percentile": ps[4], }, + Time: now, }) case metrics.Meter: m := metric.Snapshot() - series = append(series, &influxClient.Series{ - Name: fmt.Sprintf("%s.meter", name), - Columns: []string{"count", "one-minute", - "five-minute", "fifteen-minute", "mean"}, - Points: [][]interface{}{ - {m.Count(), m.Rate1(), m.Rate5(), m.Rate15(), m.RateMean()}, + + points = append(points, influxClient.Point{ + Measurement: fmt.Sprintf("%s.meter", name), + Fields: map[string]interface{}{ + "count": m.Count(), + "one-minute": m.Rate1(), + "five-minute": m.Rate5(), + "fifteen-minute": m.Rate15(), + "mean": m.RateMean(), }, }) case metrics.Timer: h := metric.Snapshot() ps := h.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999}) - series = append(series, &influxClient.Series{ - Name: fmt.Sprintf("%s.timer", name), - Columns: []string{"count", "min", "max", "mean", "std-dev", - "50-percentile", "75-percentile", "95-percentile", - "99-percentile", "999-percentile", "one-minute", "five-minute", "fifteen-minute", "mean-rate"}, - Points: [][]interface{}{ - {h.Count(), h.Min(), h.Max(), h.Mean(), h.StdDev(), - ps[0], ps[1], ps[2], ps[3], ps[4], - h.Rate1(), h.Rate5(), h.Rate15(), h.RateMean()}, + + points = append(points, influxClient.Point{ + Measurement: fmt.Sprintf("%s.timer", name), + Fields: map[string]interface{}{ + "count": h.Count(), + "min": h.Min(), + "max": h.Max(), + "mean": h.Mean(), + "std-dev": h.StdDev(), + "50-percentile": ps[0], + "75-percentile": ps[1], + "95-percentile": ps[2], + "99-percentile": ps[3], + "999-percentile": ps[4], + "one-minute": h.Rate1(), + "five-minute": h.Rate5(), + "fifteen-minute": h.Rate15(), + "mean-rate": h.RateMean(), }, + Time: now, }) } }) - if err := client.WriteSeries(series); err != nil { - log.Println(err) + + bps := influxClient.BatchPoints{ + Points: points, + Database: database, } - return nil + + _, err := client.Write(bps) + return err } func getCurrentTime() int64 {