-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathexporter.go
110 lines (96 loc) · 2.71 KB
/
exporter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package resqueExporter
import (
"fmt"
"strconv"
"sync"
"github.com/prometheus/client_golang/prometheus"
"gopkg.in/redis.v3"
)
const namespace = "resque"
type exporter struct {
config *Config
mut *sync.Mutex
scrapeFailures prometheus.Counter
processed prometheus.Gauge
failed prometheus.Gauge
queueStatus *prometheus.GaugeVec
}
func newExporter(config *Config) (*exporter, error) {
return &exporter{
mut: new(sync.Mutex),
config: config,
queueStatus: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "jobs_in_queue",
Help: "Number of remained jobs in queue",
},
[]string{"queue_name"},
),
processed: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Name: "processed",
Help: "Number of processed jobs",
}),
failed: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Name: "failed",
Help: "Number of failed jobs",
}),
scrapeFailures: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Name: "exporter_scrape_failures_total",
Help: "Number of errors while scraping resque.",
}),
}, nil
}
func (e *exporter) Describe(ch chan<- *prometheus.Desc) {
e.scrapeFailures.Describe(ch)
e.queueStatus.Describe(ch)
}
func (e *exporter) Collect(ch chan<- prometheus.Metric) {
e.mut.Lock() // To protect metrics from concurrent collects.
defer e.mut.Unlock()
redisConfig := e.config.Redis
redisOpt := &redis.Options{
Addr: fmt.Sprintf("%s:%d", redisConfig.Host, redisConfig.Port),
Password: redisConfig.Password,
DB: redisConfig.DB,
}
redis := redis.NewClient(redisOpt)
defer redis.Close()
queues, err := redis.SMembers(fmt.Sprintf("%s:queues", e.config.ResqueNamespace)).Result()
if err != nil {
e.incrementFailures(ch)
return
}
for _, q := range queues {
n, err := redis.LLen(fmt.Sprintf("%s:queue:%s", e.config.ResqueNamespace, q)).Result()
if err != nil {
e.incrementFailures(ch)
return
}
e.queueStatus.WithLabelValues(q).Set(float64(n))
}
processed, err := redis.Get(fmt.Sprintf("%s:stat:processed", e.config.ResqueNamespace)).Result()
if err != nil {
e.incrementFailures(ch)
return
}
processedCnt, _ := strconv.ParseFloat(processed, 64)
e.processed.Set(processedCnt)
failed, err := redis.Get(fmt.Sprintf("%s:stat:failed", e.config.ResqueNamespace)).Result()
if err != nil {
e.incrementFailures(ch)
return
}
failedCnt, _ := strconv.ParseFloat(failed, 64)
e.failed.Set(failedCnt)
e.queueStatus.Collect(ch)
e.processed.Collect(ch)
e.failed.Collect(ch)
}
func (e *exporter) incrementFailures(ch chan<- prometheus.Metric) {
e.scrapeFailures.Inc()
e.scrapeFailures.Collect(ch)
}