Skip to content

Commit

Permalink
Create prometheus client (#11)
Browse files Browse the repository at this point in the history
* added prometheus client for load-watcher
* resolve comments on 2021-01-24 for PR #11
* resolve 21 comments on 2021-02-04 for PR #11
  • Loading branch information
wangchen615 authored Feb 5, 2021
1 parent f521a95 commit 251e1a0
Show file tree
Hide file tree
Showing 12 changed files with 432 additions and 385 deletions.
12 changes: 11 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ The following metrics provider clients are currently supported:

1) SignalFx
2) Kubernetes Metrics Server
3) Prometheus

These clients fetch CPU usage currently, support for other resources will be added later as needed.

Expand Down Expand Up @@ -44,4 +45,13 @@ Note that load watcher runs on default port 2020. Once deployed, you can use the
GET /watcher
```

This will return metrics for all nodes. A query parameter to filter by host can be added with `host`.
This will return metrics for all nodes. A query parameter to filter by host can be added with `host`.

## Client Configuration
- To use the Kubernetes metric server client out of a cluster, please configure your `KUBE_CONFIG` environment varirables to your
kubernetes client configuration file path.

- To use the prometheus client out of a cluster, please configure `PROM_HOST` and `PROM_TOKEN` environment variables to
your Prometheus endpoint and token. Please ignore `PROM_TOKEN` as empty string if no authentication is needed to access
the Prometheus APIs. When using the prometheus in a cluster, the default endpoint is `prometheus-k8s:9090`. You need to
configure `PROM_HOST` if your Prometheus endpoint is different.
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ go 1.15

require (
github.com/francoispqt/gojay v1.2.13
github.com/sirupsen/logrus v1.4.2
github.com/prometheus/client_golang v1.9.0
github.com/prometheus/common v0.15.0
github.com/sirupsen/logrus v1.6.0
github.com/stretchr/testify v1.5.1
k8s.io/apimachinery v0.19.0
k8s.io/client-go v0.19.0
Expand Down
530 changes: 167 additions & 363 deletions go.sum

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import (
)

func main() {
client, err := metricsprovider.NewMetricsServerClient()
// client, err := metricsprovider.NewMetricsServerClient()
client, err := metricsprovider.NewPromClient()
if err != nil {
log.Fatalf("unable to create new client: %v", err)
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/metricsprovider/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ var (
)

const (
k8sClientName = "k8s"
// env variable that provides path to kube config file, if deploying from outside K8s cluster
kubeConfig = "KUBE_CONFIG"
)
Expand Down Expand Up @@ -78,6 +79,10 @@ func NewMetricsServerClient() (watcher.FetcherClient, error) {
coreClientSet: clientSet}, nil
}

func (m metricsServerClient) Name() string {
return k8sClientName
}

func (m metricsServerClient) FetchHostMetrics(host string, window *watcher.Window) ([]watcher.Metric, error) {
var metrics = []watcher.Metric{}

Expand Down
194 changes: 194 additions & 0 deletions pkg/metricsprovider/prometheus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
/*
Copyright 2020
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package metricsprovider

import (
"context"
"fmt"
"os"
"time"

"github.com/paypal/load-watcher/pkg/watcher"
log "github.com/sirupsen/logrus"
"github.com/prometheus/common/config"
"github.com/prometheus/client_golang/api"
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"

_ "k8s.io/client-go/plugin/pkg/client/auth/oidc"
)

var (
promHost string
promToken string
promTokenPresent = false
node_metric_query = map[string]string{
watcher.CPU : "instance:node_cpu:ratio",
watcher.Memory : "instance:node_memory_utilization:ratio",
}
)

const (
promClientName = "Prometheus"
promHostKey = "PROM_HOST"
promTokenKey = "PROM_TOKEN"
promStd = "stddev_over_time"
promAvg = "avg_over_time"
promCpuMetric = "instance:node_cpu:ratio"
promMemMetric = "instance:node_memory_utilisation:ratio"
allHosts = "all"
hostMetricKey = "instance"
)

func init() {
var promHostPresent bool
promHost, promHostPresent = os.LookupEnv(promHostKey)
promToken, promTokenPresent = os.LookupEnv(promTokenKey)
if !promHostPresent {
promHost = "http://prometheus-k8s:9090"
}
}

type promClient struct {
client api.Client
}

func NewPromClient() (watcher.FetcherClient, error) {
var client api.Client
var err error

if !promTokenPresent {
client, err = api.NewClient(api.Config{
Address: promHost,
})
} else {
client, err = api.NewClient(api.Config{
Address: promHost,
RoundTripper: config.NewBearerAuthRoundTripper(config.Secret(promToken), api.DefaultRoundTripper),
})
}

if err != nil {
log.Errorf("Error creating prometheus client: %v\n", err)
return nil, err
}

return promClient{client}, err
}

func (s promClient) Name() string {
return promClientName
}

func (s promClient) FetchHostMetrics(host string, window *watcher.Window) ([]watcher.Metric, error) {
var metricList []watcher.Metric
var anyerr error
for _, method := range []string{promAvg, promStd} {
for _, metric := range []string{promCpuMetric, promMemMetric} {
promQuery := s.buildPromQuery(host, metric, method, window.Duration)
promResults, err := s.getPromResults(promQuery)

if err != nil {
log.Errorf("Error querying Prometheus for query %v: %v\n", promQuery, err)
anyerr = err
continue
}

curMetricMap := s.promResults2MetricMap(promResults, metric, method, window.Duration)
metricList = append(metricList, curMetricMap[host]...)
}
}

return metricList, anyerr
}

// Fetch all host metrics with different operators (avg_over_time, stddev_over_time) and diffrent resource types (CPU, Memory)
func (s promClient) FetchAllHostsMetrics(window *watcher.Window) (map[string][]watcher.Metric, error) {
hostMetrics := make(map[string][]watcher.Metric)
var anyerr error
for _, method := range []string{promAvg, promStd} {
for _, metric := range []string{promCpuMetric, promMemMetric} {
promQuery := s.buildPromQuery(allHosts, metric, method, window.Duration)
promResults, err := s.getPromResults(promQuery)

if err != nil {
log.Errorf("Error querying Prometheus for query %v: %v\n", promQuery, err)
anyerr = err
continue
}

curMetricMap := s.promResults2MetricMap(promResults, metric, method, window.Duration)

for k, v := range curMetricMap {
hostMetrics[k] = append(hostMetrics[k], v...)
}
}
}

return hostMetrics, anyerr
}

func (s promClient) buildPromQuery(host string, metric string, method string, rollup string) string {
var promQuery string
if host == allHosts {
promQuery = fmt.Sprintf("%s(%s[%s])", method, metric, rollup)
} else {
promQuery = fmt.Sprintf("%s(%s{%s=\"%s\"}[%s])", method, metric, hostMetricKey, host, rollup)
}

return promQuery
}

func (s promClient) getPromResults(promQuery string) (model.Value, error) {
v1api := v1.NewAPI(s.client)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

results, warnings, err := v1api.Query(ctx, promQuery, time.Now())
if err != nil {
return nil, err
}
if len(warnings) > 0 {
log.Warnf("Warnings: %v\n", warnings)
}
log.Debugf("Result:\n%v\n", results)
return results, nil
}

func (s promClient) promResults2MetricMap(promresults model.Value, metric string, method string, rollup string) map[string][]watcher.Metric {
var metric_type string
curMetrics := make(map[string][]watcher.Metric)

if metric == promCpuMetric {
metric_type = watcher.CPU
} else {
metric_type = watcher.Memory
}

switch promresults.(type) {
case model.Vector:
for _, result := range promresults.(model.Vector) {
curMetric := watcher.Metric{metric, metric_type, method, rollup, float64(result.Value)}
curHost := string(result.Metric[hostMetricKey])
curMetrics[curHost] = append(curMetrics[curHost], curMetric)
}
default:
log.Errorf("Error: The Prometheus results should not be type: %v.\n", promresults.Type())
}

return curMetrics
}
5 changes: 5 additions & 0 deletions pkg/metricsprovider/signalfx.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (

const (
// SignalFX Request Params
signalFxClientName = "signalFx"
signalFxBaseUrl = "https://api.pypl-us0.signalfx.com/v1/timeserieswindow"
// SignalFx adds a suffix to hostnames if configured
signalFxHostNameSuffix = ".group.region.gcp.com"
Expand Down Expand Up @@ -64,6 +65,10 @@ func NewSignalFxClient() (watcher.FetcherClient, error) {
Transport: tlsConfig}}, nil
}

func (s signalFxClient) Name() string {
return signalFxClientName
}

func (s signalFxClient) FetchHostMetrics(host string, window *watcher.Window) ([]watcher.Metric, error) {
log.Debugf("fetching metrics for host %v", host)
var metrics []watcher.Metric
Expand Down
2 changes: 2 additions & 0 deletions pkg/watcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package watcher

// Interface to be implemented by any metrics provider client to interact with Watcher
type FetcherClient interface {
// Return the client name
Name() string
// Fetch metrics for given host
FetchHostMetrics(host string, window *Window) ([]Metric, error)
// Fetch metrics for all hosts
Expand Down
8 changes: 8 additions & 0 deletions pkg/watcher/schema/watcher-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@
"type": {
"type": "string"
},
"operator": {
"type": "string"
},
"rollup": {
"type": "string"
},
Expand All @@ -56,6 +59,7 @@
"required": [
"name",
"type",
"operator",
"rollup",
"value"
]
Expand All @@ -69,6 +73,9 @@
"type": {
"type": "string"
},
"operator": {
"type": "string"
},
"rollup": {
"type": "string"
},
Expand All @@ -80,6 +87,7 @@
"required": [
"name",
"type",
"operator",
"rollup",
"value"
]
Expand Down
5 changes: 5 additions & 0 deletions pkg/watcher/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,16 @@ var _ FetcherClient = &testServerClient{}
const (
FirstNode = "worker-1"
SecondNode = "worker-2"
TestServerClientName = "TestServerClient"
)

type testServerClient struct {
}

func (t testServerClient) Name() string {
return TestServerClientName
}

func NewTestMetricsServerClient() FetcherClient {
return testServerClient{}
}
Expand Down
Loading

0 comments on commit 251e1a0

Please # to comment.