Skip to content
This repository was archived by the owner on Sep 15, 2022. It is now read-only.

Prometheus Monitoring Middleware/Tripperware #12

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 159 additions & 0 deletions monitoring/prometheus/middleware.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
// Copyright 2017 Michal Witkowski. All Rights Reserved.
// See LICENSE for licensing terms.

package http_prometheus

import (
"time"

"net/http"

"github.com/mwitkow/go-httpwares"
"github.com/mwitkow/go-httpwares/tags"
"github.com/prometheus/client_golang/prometheus"
)

// Middleware is a server-side http ware for monitoring handlers using Prometheus counters and histograms.
//
// Handlers are labeled by the http_ctxtags `TagForHandlerGroup` and `TagForHandlerName` applied using the http_ctxtags
// Middleware and HandlerName methods. These values are used as labels for all requests.
//
// The following monitoring variables can be created if opted in using options:
//
// http_server_requests_total
// http_server_response_size_bytes
// http_server_response_headers_duration_seconds
// http_server_request_duration_seconds
//
//
// Please note that the instantiation of this Middleware can panic if it has been previously instantiated with other
// options due to clashes in Prometheus metric names.
func Middleware(opts ...Option) httpwares.Middleware {
return func(nextHandler http.Handler) http.Handler {
o := evaluateOptions(opts)
requestHandledCounter := buildServerHandledCounter(o)
responseSizeHistogram := buildServerResponseSizeHistogram(o)
responseHeadersHistogram := buildServerResponseHeadersHistogram(o)
requestHistogram := buildServerRequestCompletionHistogram(o)
return http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) {
handlerGroup, handlerName := handlerInfoFromRequest(req)
startTime := time.Now()
wrappedResp := httpwares.WrapResponseWriter(resp)
if responseHeadersHistogram != nil {
wrappedResp.ObserveWriteHeader(func(writer httpwares.WrappedResponseWriter, code int) {
responseHeadersHistogram.WithLabelValues(handlerGroup, handlerName, sanitizeMethod(req.Method)).Observe(timeDiffToSeconds(startTime))
})
}
nextHandler.ServeHTTP(wrappedResp, req)

requestHandledCounter.WithLabelValues(handlerGroup, handlerName, sanitizeMethod(req.Method), sanitizeCode(wrappedResp.StatusCode())).Inc()
if requestHistogram != nil {
requestHistogram.WithLabelValues(handlerGroup, handlerName, sanitizeMethod(req.Method)).Observe(timeDiffToSeconds(startTime))
}
if responseSizeHistogram != nil {
responseSizeHistogram.WithLabelValues(handlerGroup, handlerName, sanitizeMethod(req.Method)).Observe(float64(wrappedResp.MessageLength()))
}
})
}
}

func handlerInfoFromRequest(req *http.Request) (handlerGroup string, handlerName string) {
handlerGroup = "unspecified"
handlerName = "unspecified"
tags := http_ctxtags.ExtractInbound(req).Values()
if g, ok := tags[http_ctxtags.TagForHandlerGroup].(string); ok {
handlerGroup = g
}
if n, ok := tags[http_ctxtags.TagForHandlerName].(string); ok {
handlerName = n
}
return handlerGroup, handlerGroup
}

func buildServerHandledCounter(o *options) *prometheus.CounterVec {
cv := prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: o.namespace,
Subsystem: "server",
Name: "requests_total",
Help: "Total number of requests completed on the server.",
}, []string{"handler_group", "handler_name", "method", "code"})
err := o.registry.Register(cv)
if err == nil {
return cv
} else if aeErr, ok := err.(*prometheus.AlreadyRegisteredError); ok {
return aeErr.ExistingCollector.(*prometheus.CounterVec)
}
panic("failed registering handled_total error in http_prometheus: %v" + err.Error())
}

func buildServerResponseSizeHistogram(o *options) *prometheus.HistogramVec {
if len(o.sizeHistogramBuckets) == 0 {
return nil
}
cv := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: o.namespace,
Subsystem: "server",
Name: "response_size_bytes",
Help: "HTTP response size in bytes (optional).",
Buckets: o.sizeHistogramBuckets,
}, []string{"handler_group", "handler_name", "method"})
err := o.registry.Register(cv)
if err == nil {
return cv
} else if aeErr, ok := err.(*prometheus.AlreadyRegisteredError); ok {
return aeErr.ExistingCollector.(*prometheus.HistogramVec)
}
panic("failed registering response_size_bytes error in http_prometheus: %v" + err.Error())
}

func buildServerResponseHeadersHistogram(o *options) *prometheus.HistogramVec {
if len(o.responseHeadersHistogramBuckets) == 0 {
return nil
}
cv := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: o.namespace,
Subsystem: "server",
Name: "response_headers_duration_seconds",
Help: "Latency (seconds) until HTTP response headers are returned (optional).",
Buckets: o.responseHeadersHistogramBuckets,
}, []string{"handler_group", "handler_name", "method"})
err := o.registry.Register(cv)
if err == nil {
return cv
} else if aeErr, ok := err.(*prometheus.AlreadyRegisteredError); ok {
return aeErr.ExistingCollector.(*prometheus.HistogramVec)
}
panic("failed registering response_headers_duration_seconds error in http_prometheus: %v" + err.Error())
}

func buildServerRequestCompletionHistogram(o *options) *prometheus.HistogramVec {
if len(o.requestHistogramBuckets) == 0 {
return nil
}
cv := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: o.namespace,
Subsystem: "server",
Name: "request_duration_seconds",
Help: "Latency (seconds) until HTTP request is fully handled by the server (optional).",
Buckets: o.requestHistogramBuckets,
}, []string{"handler_group", "handler_name", "method"})
err := o.registry.Register(cv)
if err == nil {
return cv
} else if aeErr, ok := err.(*prometheus.AlreadyRegisteredError); ok {
return aeErr.ExistingCollector.(*prometheus.HistogramVec)
}
panic("failed registering request_duration_seconds error in http_prometheus: %v" + err.Error())
}

func timeDiffToSeconds(start time.Time) float64 {
d := time.Now().Sub(start).Seconds()
if d < 0.0 {
return 0.0
}
return d
}
151 changes: 151 additions & 0 deletions monitoring/prometheus/middleware_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package http_prometheus_test

import (
"bufio"
"io"
"net/http"
"net/http/httptest"
"strconv"
"strings"
"testing"

"fmt"

"github.com/mwitkow/go-httpwares"
"github.com/mwitkow/go-httpwares/monitoring/prometheus"
"github.com/mwitkow/go-httpwares/tags"
"github.com/mwitkow/go-httpwares/testing"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)

var (
testHandlerGroup = "testingHandler"
testHandlerName = "testingHandler"
testServiceName = "testingExternalService"
)

func TestPrometheusSuite(t *testing.T) {
s := &PrometheusSuite{
WaresTestSuite: &httpwares_testing.WaresTestSuite{
Handler: http_ctxtags.HandlerName(testHandlerName)(httpwares_testing.PingBackHandler(httpwares_testing.DefaultPingBackStatusCode)),
ServerMiddleware: []httpwares.Middleware{
http_ctxtags.Middleware(testHandlerGroup),
http_prometheus.Middleware(
http_prometheus.WithNamespace("http"),
http_prometheus.WithResponseSizeHistogram(),
http_prometheus.WithResponseHeadersLatencyHistogram(),
http_prometheus.WithRequestCompletionLatencyHistogram(),
),
},
ClientTripperware: httpwares.TripperwareChain{
http_ctxtags.Tripperware(http_ctxtags.WithServiceName(testServiceName)),
},
},
}
suite.Run(t, s)
}

type PrometheusSuite struct {
*httpwares_testing.WaresTestSuite
}

func (s *PrometheusSuite) SetupTest() {
}

func (s *PrometheusSuite) makeCall(t *testing.T, method string, expectedCode int) {
client := s.NewClient() // client always dials localhost.
req, _ := http.NewRequest(method, fmt.Sprintf("https://fakeaddress.fakeaddress.com/someurl?code=%d", expectedCode), nil)
req = req.WithContext(s.SimpleCtx())
_, err := client.Do(req)
require.NoError(t, err, "call shouldn't fail")
}

func (s *PrometheusSuite) TestHandledCounterCountsValues() {
for _, tcase := range []struct {
method string
code int
}{
{method: "GET", code: 200},
{method: "POST", code: 201},
{method: "POST", code: 350},
{method: "HEAD", code: 403},
} {
s.T().Run(fmt.Sprintf("%s_%d", tcase.method, tcase.code), func(t *testing.T) {
codeStr := strconv.Itoa(tcase.code)
lowerMethod := strings.ToLower(tcase.method)
beforeRequestCounter := sumCountersForMetricAndLabels(t, "http_server_requests_total", testHandlerGroup, testHandlerName, codeStr, lowerMethod)
s.makeCall(t, tcase.method, tcase.code)
afterRequestCounter := sumCountersForMetricAndLabels(t, "http_server_requests_total", testHandlerGroup, testHandlerName, codeStr, lowerMethod)
assert.Equal(t, beforeRequestCounter+1, afterRequestCounter, "request counter for this handler should increase")
})
}
}

func (s *PrometheusSuite) TestMiddlewareResponseHeaderDurations() {
beforeBucketCount := sumCountersForMetricAndLabels(s.T(), "http_server_response_headers_duration_seconds_count", testHandlerGroup, testHandlerName, "get")
s.makeCall(s.T(), "GET", 201)
afterBucketCount := sumCountersForMetricAndLabels(s.T(), "http_server_response_headers_duration_seconds_count", testHandlerGroup, testHandlerName, "get")
assert.Equal(s.T(), beforeBucketCount+1, afterBucketCount, "we should increment at least one bucket")
}

func (s *PrometheusSuite) TestMiddlewareRequestCompleteDuration() {
beforeBucketCount := sumCountersForMetricAndLabels(s.T(), "http_server_request_duration_seconds_count", testHandlerGroup, testHandlerName, "head")
s.makeCall(s.T(), "HEAD", 201)
afterBucketCount := sumCountersForMetricAndLabels(s.T(), "http_server_request_duration_seconds_count", testHandlerGroup, testHandlerName, "head")
assert.Equal(s.T(), beforeBucketCount+1, afterBucketCount, "we should increment at least one bucket")
}

func (s *PrometheusSuite) TestMiddlewareResponseSize() {
beforeBucketSum := sumCountersForMetricAndLabels(s.T(), "http_server_response_size_bytes_sum", testHandlerGroup, testHandlerName, "get")
beforeBucketCount := sumCountersForMetricAndLabels(s.T(), "http_server_response_size_bytes_count", testHandlerGroup, testHandlerName, "get")
s.makeCall(s.T(), "GET", 201)
afterBucketSum := sumCountersForMetricAndLabels(s.T(), "http_server_response_size_bytes_sum", testHandlerGroup, testHandlerName, "get")
afterBucketCount := sumCountersForMetricAndLabels(s.T(), "http_server_response_size_bytes_count", testHandlerGroup, testHandlerName, "get")
assert.Equal(s.T(), beforeBucketCount+1, afterBucketCount, "we should increment at least one bucket")
assert.True(s.T(), beforeBucketSum < afterBucketSum, "our sum should have increased by non zero sum of bytes transferred.")
}

func fetchPrometheusLines(t *testing.T, metricName string, matchingLabelValues ...string) []string {
resp := httptest.NewRecorder()
req, err := http.NewRequest("GET", "/", nil)
require.NoError(t, err, "failed creating request for Prometheus handler")
prometheus.Handler().ServeHTTP(resp, req)
reader := bufio.NewReader(resp.Body)
ret := []string{}
for {
line, err := reader.ReadString('\n')
if err == io.EOF {
break
} else {
require.NoError(t, err, "error reading stuff")
}
if !strings.HasPrefix(line, metricName) {
continue
}
matches := true
for _, labelValue := range matchingLabelValues {
if !strings.Contains(line, `"`+labelValue+`"`) {
matches = false
}
}
if matches {
ret = append(ret, line)
}

}
return ret
}

func sumCountersForMetricAndLabels(t *testing.T, metricName string, matchingLabelValues ...string) int {
count := 0
for _, line := range fetchPrometheusLines(t, metricName, matchingLabelValues...) {
valueString := line[strings.LastIndex(line, " ")+1: len(line)-1]
valueFloat, err := strconv.ParseFloat(valueString, 32)
require.NoError(t, err, "failed parsing value for line: %v", line)
count += int(valueFloat)
}
return count
}
Loading