diff --git a/monitoring/prometheus/middleware.go b/monitoring/prometheus/middleware.go new file mode 100644 index 0000000..a99940f --- /dev/null +++ b/monitoring/prometheus/middleware.go @@ -0,0 +1,159 @@ +// Copyright 2017 Michal Witkowski. All Rights Reserved. +// See LICENSE for licensing terms. + +package http_prometheus + +import ( + "time" + + "net/http" + + "github.com/mwitkow/go-httpwares" + "github.com/mwitkow/go-httpwares/tags" + "github.com/prometheus/client_golang/prometheus" +) + +// Middleware is a server-side http ware for monitoring handlers using Prometheus counters and histograms. +// +// Handlers are labeled by the http_ctxtags `TagForHandlerGroup` and `TagForHandlerName` applied using the http_ctxtags +// Middleware and HandlerName methods. These values are used as labels for all requests. +// +// The following monitoring variables can be created if opted in using options: +// +// http_server_requests_total +// http_server_response_size_bytes +// http_server_response_headers_duration_seconds +// http_server_request_duration_seconds +// +// +// Please note that the instantiation of this Middleware can panic if it has been previously instantiated with other +// options due to clashes in Prometheus metric names. +func Middleware(opts ...Option) httpwares.Middleware { + return func(nextHandler http.Handler) http.Handler { + o := evaluateOptions(opts) + requestHandledCounter := buildServerHandledCounter(o) + responseSizeHistogram := buildServerResponseSizeHistogram(o) + responseHeadersHistogram := buildServerResponseHeadersHistogram(o) + requestHistogram := buildServerRequestCompletionHistogram(o) + return http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) { + handlerGroup, handlerName := handlerInfoFromRequest(req) + startTime := time.Now() + wrappedResp := httpwares.WrapResponseWriter(resp) + if responseHeadersHistogram != nil { + wrappedResp.ObserveWriteHeader(func(writer httpwares.WrappedResponseWriter, code int) { + responseHeadersHistogram.WithLabelValues(handlerGroup, handlerName, sanitizeMethod(req.Method)).Observe(timeDiffToSeconds(startTime)) + }) + } + nextHandler.ServeHTTP(wrappedResp, req) + + requestHandledCounter.WithLabelValues(handlerGroup, handlerName, sanitizeMethod(req.Method), sanitizeCode(wrappedResp.StatusCode())).Inc() + if requestHistogram != nil { + requestHistogram.WithLabelValues(handlerGroup, handlerName, sanitizeMethod(req.Method)).Observe(timeDiffToSeconds(startTime)) + } + if responseSizeHistogram != nil { + responseSizeHistogram.WithLabelValues(handlerGroup, handlerName, sanitizeMethod(req.Method)).Observe(float64(wrappedResp.MessageLength())) + } + }) + } +} + +func handlerInfoFromRequest(req *http.Request) (handlerGroup string, handlerName string) { + handlerGroup = "unspecified" + handlerName = "unspecified" + tags := http_ctxtags.ExtractInbound(req).Values() + if g, ok := tags[http_ctxtags.TagForHandlerGroup].(string); ok { + handlerGroup = g + } + if n, ok := tags[http_ctxtags.TagForHandlerName].(string); ok { + handlerName = n + } + return handlerGroup, handlerGroup +} + +func buildServerHandledCounter(o *options) *prometheus.CounterVec { + cv := prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: o.namespace, + Subsystem: "server", + Name: "requests_total", + Help: "Total number of requests completed on the server.", + }, []string{"handler_group", "handler_name", "method", "code"}) + err := o.registry.Register(cv) + if err == nil { + return cv + } else if aeErr, ok := err.(*prometheus.AlreadyRegisteredError); ok { + return aeErr.ExistingCollector.(*prometheus.CounterVec) + } + panic("failed registering handled_total error in http_prometheus: %v" + err.Error()) +} + +func buildServerResponseSizeHistogram(o *options) *prometheus.HistogramVec { + if len(o.sizeHistogramBuckets) == 0 { + return nil + } + cv := prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: o.namespace, + Subsystem: "server", + Name: "response_size_bytes", + Help: "HTTP response size in bytes (optional).", + Buckets: o.sizeHistogramBuckets, + }, []string{"handler_group", "handler_name", "method"}) + err := o.registry.Register(cv) + if err == nil { + return cv + } else if aeErr, ok := err.(*prometheus.AlreadyRegisteredError); ok { + return aeErr.ExistingCollector.(*prometheus.HistogramVec) + } + panic("failed registering response_size_bytes error in http_prometheus: %v" + err.Error()) +} + +func buildServerResponseHeadersHistogram(o *options) *prometheus.HistogramVec { + if len(o.responseHeadersHistogramBuckets) == 0 { + return nil + } + cv := prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: o.namespace, + Subsystem: "server", + Name: "response_headers_duration_seconds", + Help: "Latency (seconds) until HTTP response headers are returned (optional).", + Buckets: o.responseHeadersHistogramBuckets, + }, []string{"handler_group", "handler_name", "method"}) + err := o.registry.Register(cv) + if err == nil { + return cv + } else if aeErr, ok := err.(*prometheus.AlreadyRegisteredError); ok { + return aeErr.ExistingCollector.(*prometheus.HistogramVec) + } + panic("failed registering response_headers_duration_seconds error in http_prometheus: %v" + err.Error()) +} + +func buildServerRequestCompletionHistogram(o *options) *prometheus.HistogramVec { + if len(o.requestHistogramBuckets) == 0 { + return nil + } + cv := prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: o.namespace, + Subsystem: "server", + Name: "request_duration_seconds", + Help: "Latency (seconds) until HTTP request is fully handled by the server (optional).", + Buckets: o.requestHistogramBuckets, + }, []string{"handler_group", "handler_name", "method"}) + err := o.registry.Register(cv) + if err == nil { + return cv + } else if aeErr, ok := err.(*prometheus.AlreadyRegisteredError); ok { + return aeErr.ExistingCollector.(*prometheus.HistogramVec) + } + panic("failed registering request_duration_seconds error in http_prometheus: %v" + err.Error()) +} + +func timeDiffToSeconds(start time.Time) float64 { + d := time.Now().Sub(start).Seconds() + if d < 0.0 { + return 0.0 + } + return d +} diff --git a/monitoring/prometheus/middleware_test.go b/monitoring/prometheus/middleware_test.go new file mode 100644 index 0000000..c8e9105 --- /dev/null +++ b/monitoring/prometheus/middleware_test.go @@ -0,0 +1,151 @@ +package http_prometheus_test + +import ( + "bufio" + "io" + "net/http" + "net/http/httptest" + "strconv" + "strings" + "testing" + + "fmt" + + "github.com/mwitkow/go-httpwares" + "github.com/mwitkow/go-httpwares/monitoring/prometheus" + "github.com/mwitkow/go-httpwares/tags" + "github.com/mwitkow/go-httpwares/testing" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" +) + +var ( + testHandlerGroup = "testingHandler" + testHandlerName = "testingHandler" + testServiceName = "testingExternalService" +) + +func TestPrometheusSuite(t *testing.T) { + s := &PrometheusSuite{ + WaresTestSuite: &httpwares_testing.WaresTestSuite{ + Handler: http_ctxtags.HandlerName(testHandlerName)(httpwares_testing.PingBackHandler(httpwares_testing.DefaultPingBackStatusCode)), + ServerMiddleware: []httpwares.Middleware{ + http_ctxtags.Middleware(testHandlerGroup), + http_prometheus.Middleware( + http_prometheus.WithNamespace("http"), + http_prometheus.WithResponseSizeHistogram(), + http_prometheus.WithResponseHeadersLatencyHistogram(), + http_prometheus.WithRequestCompletionLatencyHistogram(), + ), + }, + ClientTripperware: httpwares.TripperwareChain{ + http_ctxtags.Tripperware(http_ctxtags.WithServiceName(testServiceName)), + }, + }, + } + suite.Run(t, s) +} + +type PrometheusSuite struct { + *httpwares_testing.WaresTestSuite +} + +func (s *PrometheusSuite) SetupTest() { +} + +func (s *PrometheusSuite) makeCall(t *testing.T, method string, expectedCode int) { + client := s.NewClient() // client always dials localhost. + req, _ := http.NewRequest(method, fmt.Sprintf("https://fakeaddress.fakeaddress.com/someurl?code=%d", expectedCode), nil) + req = req.WithContext(s.SimpleCtx()) + _, err := client.Do(req) + require.NoError(t, err, "call shouldn't fail") +} + +func (s *PrometheusSuite) TestHandledCounterCountsValues() { + for _, tcase := range []struct { + method string + code int + }{ + {method: "GET", code: 200}, + {method: "POST", code: 201}, + {method: "POST", code: 350}, + {method: "HEAD", code: 403}, + } { + s.T().Run(fmt.Sprintf("%s_%d", tcase.method, tcase.code), func(t *testing.T) { + codeStr := strconv.Itoa(tcase.code) + lowerMethod := strings.ToLower(tcase.method) + beforeRequestCounter := sumCountersForMetricAndLabels(t, "http_server_requests_total", testHandlerGroup, testHandlerName, codeStr, lowerMethod) + s.makeCall(t, tcase.method, tcase.code) + afterRequestCounter := sumCountersForMetricAndLabels(t, "http_server_requests_total", testHandlerGroup, testHandlerName, codeStr, lowerMethod) + assert.Equal(t, beforeRequestCounter+1, afterRequestCounter, "request counter for this handler should increase") + }) + } +} + +func (s *PrometheusSuite) TestMiddlewareResponseHeaderDurations() { + beforeBucketCount := sumCountersForMetricAndLabels(s.T(), "http_server_response_headers_duration_seconds_count", testHandlerGroup, testHandlerName, "get") + s.makeCall(s.T(), "GET", 201) + afterBucketCount := sumCountersForMetricAndLabels(s.T(), "http_server_response_headers_duration_seconds_count", testHandlerGroup, testHandlerName, "get") + assert.Equal(s.T(), beforeBucketCount+1, afterBucketCount, "we should increment at least one bucket") +} + +func (s *PrometheusSuite) TestMiddlewareRequestCompleteDuration() { + beforeBucketCount := sumCountersForMetricAndLabels(s.T(), "http_server_request_duration_seconds_count", testHandlerGroup, testHandlerName, "head") + s.makeCall(s.T(), "HEAD", 201) + afterBucketCount := sumCountersForMetricAndLabels(s.T(), "http_server_request_duration_seconds_count", testHandlerGroup, testHandlerName, "head") + assert.Equal(s.T(), beforeBucketCount+1, afterBucketCount, "we should increment at least one bucket") +} + +func (s *PrometheusSuite) TestMiddlewareResponseSize() { + beforeBucketSum := sumCountersForMetricAndLabels(s.T(), "http_server_response_size_bytes_sum", testHandlerGroup, testHandlerName, "get") + beforeBucketCount := sumCountersForMetricAndLabels(s.T(), "http_server_response_size_bytes_count", testHandlerGroup, testHandlerName, "get") + s.makeCall(s.T(), "GET", 201) + afterBucketSum := sumCountersForMetricAndLabels(s.T(), "http_server_response_size_bytes_sum", testHandlerGroup, testHandlerName, "get") + afterBucketCount := sumCountersForMetricAndLabels(s.T(), "http_server_response_size_bytes_count", testHandlerGroup, testHandlerName, "get") + assert.Equal(s.T(), beforeBucketCount+1, afterBucketCount, "we should increment at least one bucket") + assert.True(s.T(), beforeBucketSum < afterBucketSum, "our sum should have increased by non zero sum of bytes transferred.") +} + +func fetchPrometheusLines(t *testing.T, metricName string, matchingLabelValues ...string) []string { + resp := httptest.NewRecorder() + req, err := http.NewRequest("GET", "/", nil) + require.NoError(t, err, "failed creating request for Prometheus handler") + prometheus.Handler().ServeHTTP(resp, req) + reader := bufio.NewReader(resp.Body) + ret := []string{} + for { + line, err := reader.ReadString('\n') + if err == io.EOF { + break + } else { + require.NoError(t, err, "error reading stuff") + } + if !strings.HasPrefix(line, metricName) { + continue + } + matches := true + for _, labelValue := range matchingLabelValues { + if !strings.Contains(line, `"`+labelValue+`"`) { + matches = false + } + } + if matches { + ret = append(ret, line) + } + + } + return ret +} + +func sumCountersForMetricAndLabels(t *testing.T, metricName string, matchingLabelValues ...string) int { + count := 0 + for _, line := range fetchPrometheusLines(t, metricName, matchingLabelValues...) { + valueString := line[strings.LastIndex(line, " ")+1: len(line)-1] + valueFloat, err := strconv.ParseFloat(valueString, 32) + require.NoError(t, err, "failed parsing value for line: %v", line) + count += int(valueFloat) + } + return count +} diff --git a/monitoring/prometheus/options.go b/monitoring/prometheus/options.go new file mode 100644 index 0000000..56c80f2 --- /dev/null +++ b/monitoring/prometheus/options.go @@ -0,0 +1,88 @@ +// Copyright 2017 Michal Witkowski. All Rights Reserved. +// See LICENSE for licensing terms. + +package http_prometheus + +import "github.com/prometheus/client_golang/prometheus" + +var ( + // DefaultResponseSizeHistogram are the default (if enabled) buckets for response size histograms. + DefaultResponseSizeHistogram = []float64{32.0, 256.0, 2048.0, 16384.0, 131072.0, 1048576} + // DefaultLatencyHistogram defines the default (if enabled) buckets for latency histograms. + DefaultLatencyHistogram = prometheus.DefBuckets + disabledHistogram = []float64{} + + defaultOptions = &options{ + namespace: "http", + sizeHistogramBuckets: disabledHistogram, + responseHeadersHistogramBuckets: disabledHistogram, + requestHistogramBuckets: disabledHistogram, + } +) + +type options struct { + namespace string + registry prometheus.Registerer + + sizeHistogramBuckets []float64 + responseHeadersHistogramBuckets []float64 + requestHistogramBuckets []float64 +} + +func evaluateOptions(opts []Option) *options { + optCopy := &options{} + *optCopy = *defaultOptions + optCopy.registry = prometheus.DefaultRegisterer + for _, o := range opts { + o(optCopy) + } + return optCopy +} + +type Option func(*options) + +// WithNamespace customizes the Prometheus namespace (first component before first underscore) of all the metrics. +func WithNamespace(prometheusNamespace string) Option { + return func(o *options) { + o.namespace = prometheusNamespace + } +} + +// WithResponseSizeHistogram enables the middleware to record the sizes of response messages in bytes. +// +// Optionally, you can provide your own histogram buckets for the measurements. If not provided DefaultResponseSizeHistogram is used. +func WithResponseSizeHistogram(bucketValues ...float64) Option { + return func(o *options) { + if len(bucketValues) == 0 { + o.sizeHistogramBuckets = DefaultResponseSizeHistogram + } else { + o.sizeHistogramBuckets = bucketValues + } + } +} + +// WithResponseHeadersLatencyHistogram enables the middleware to record the latency to headers response in seconds. +// +// Optionally, you can provide your own histogram buckets for the measurements. If not provided DefaultLatencyHistogram is used. +func WithResponseHeadersLatencyHistogram(bucketValues ...float64) Option { + return func(o *options) { + if len(bucketValues) == 0 { + o.responseHeadersHistogramBuckets = DefaultResponseSizeHistogram + } else { + o.responseHeadersHistogramBuckets = bucketValues + } + } +} + +// WithRequestCompletionLatencyHistogram enables the middleware to record the latency to completion of request. +// +// Optionally, you can provide your own histogram buckets for the measurements. If not provided DefaultLatencyHistogram is used. +func WithRequestCompletionLatencyHistogram(bucketValues ...float64) Option { + return func(o *options) { + if len(bucketValues) == 0 { + o.requestHistogramBuckets = DefaultResponseSizeHistogram + } else { + o.requestHistogramBuckets = bucketValues + } + } +} diff --git a/monitoring/prometheus/sanity.go b/monitoring/prometheus/sanity.go new file mode 100644 index 0000000..438c768 --- /dev/null +++ b/monitoring/prometheus/sanity.go @@ -0,0 +1,130 @@ +package http_prometheus + +import ( + "strconv" + "strings" +) + +func sanitizeMethod(m string) string { + switch m { + case "GET", "get": + return "get" + case "PUT", "put": + return "put" + case "HEAD", "head": + return "head" + case "POST", "post": + return "post" + case "DELETE", "delete": + return "delete" + case "CONNECT", "connect": + return "connect" + case "OPTIONS", "options": + return "options" + case "NOTIFY", "notify": + return "notify" + default: + return strings.ToLower(m) + } +} + +func sanitizeCode(s int) string { + switch s { + case 100: + return "100" + case 101: + return "101" + + case 200: + return "200" + case 201: + return "201" + case 202: + return "202" + case 203: + return "203" + case 204: + return "204" + case 205: + return "205" + case 206: + return "206" + + case 300: + return "300" + case 301: + return "301" + case 302: + return "302" + case 304: + return "304" + case 305: + return "305" + case 307: + return "307" + + case 400: + return "400" + case 401: + return "401" + case 402: + return "402" + case 403: + return "403" + case 404: + return "404" + case 405: + return "405" + case 406: + return "406" + case 407: + return "407" + case 408: + return "408" + case 409: + return "409" + case 410: + return "410" + case 411: + return "411" + case 412: + return "412" + case 413: + return "413" + case 414: + return "414" + case 415: + return "415" + case 416: + return "416" + case 417: + return "417" + case 418: + return "418" + + case 500: + return "500" + case 501: + return "501" + case 502: + return "502" + case 503: + return "503" + case 504: + return "504" + case 505: + return "505" + + case 428: + return "428" + case 429: + return "429" + case 431: + return "431" + case 511: + return "511" + + default: + return strconv.Itoa(s) + } +} diff --git a/monitoring/prometheus/tripperware.go b/monitoring/prometheus/tripperware.go new file mode 100644 index 0000000..85636cf --- /dev/null +++ b/monitoring/prometheus/tripperware.go @@ -0,0 +1,150 @@ +// Copyright 2017 Michal Witkowski. All Rights Reserved. +// See LICENSE for licensing terms. + +package http_prometheus + +import ( + "time" + + "net/http" + + "net" + "os" + "syscall" + + "github.com/mwitkow/go-httpwares" + "github.com/mwitkow/go-httpwares/tags" + "github.com/prometheus/client_golang/prometheus" + "golang.org/x/net/context" +) + +const ( + failedResolution = "resolution" + failedConnRefused = "refused" + failedTimeout = "timeout" + failedUnknown = "unknown" +) + +// Tripperware is a client-side http ware for monitoring calls to http services. +// +// Calls are labeled by the http_ctxtags `TagForCallService` http_ctxtags.Tripperware. By default, these are inferred +// from hostnames. +// +// The following monitoring variables can be created if opted in using options: +// +// http_client_requests_total +// http_client_response_headers_duration_seconds +// Please note: errors in handled respo +// +// Please note that the instantiation of this Tripperware can panic if it has been previously instantiated with other +// options due to clashes in Prometheus metric names. +func Tripperware(opts ...Option) httpwares.Tripperware { + o := evaluateOptions(opts) + requestHandledCounter := buildClientHandledCounter(o) + requestErredCounter := buildClientErroredCounter(o) + responseHeadersHistogram := buildClientResponseHeadersHistogram(o) + return func(next http.RoundTripper) http.RoundTripper { + return httpwares.RoundTripperFunc(func(req *http.Request) (*http.Response, error) { + serviceName := serviceNameFromTags(req) + startTime := time.Now() + resp, err := next.RoundTrip(req) + if err != nil { + failCode, failReason := httpErrorToLabelAndCode(err) + requestHandledCounter.WithLabelValues(serviceName, sanitizeMethod(req.Method), sanitizeCode(failCode)).Inc() + requestErredCounter.WithLabelValues(serviceName, sanitizeMethod(req.Method), failReason).Inc() + } else { + requestHandledCounter.WithLabelValues(serviceName, sanitizeMethod(req.Method), sanitizeCode(resp.StatusCode)).Inc() + if responseHeadersHistogram != nil { + responseHeadersHistogram.WithLabelValues(serviceName, sanitizeMethod(req.Method)).Observe(timeDiffToSeconds(startTime)) + } + } + return resp, err + }) + } +} + +func serviceNameFromTags(req *http.Request) string { + serviceName := "unspecified" + tags := http_ctxtags.ExtractOutbound(req).Values() + if s, ok := tags[http_ctxtags.TagForCallService].(string); ok { + serviceName = s + } + return serviceName +} + +func buildClientHandledCounter(o *options) *prometheus.CounterVec { + cv := prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: o.namespace, + Subsystem: "client", + Name: "requests_total", + Help: "Total number of requests completed on the server.", + }, []string{"service_name", "method", "code"}) + err := o.registry.Register(cv) + if err == nil { + return cv + } else if aeErr, ok := err.(*prometheus.AlreadyRegisteredError); ok { + return aeErr.ExistingCollector.(*prometheus.CounterVec) + } + panic("failed registering handled_total error in http_prometheus: %v" + err.Error()) +} + +func buildClientErroredCounter(o *options) *prometheus.CounterVec { + cv := prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: o.namespace, + Subsystem: "client", + Name: "request_errors_total", + Help: "Total number of requests failed on the client side.", + }, []string{"service_name", "method", "fail_reason"}) + err := o.registry.Register(cv) + if err == nil { + return cv + } else if aeErr, ok := err.(*prometheus.AlreadyRegisteredError); ok { + return aeErr.ExistingCollector.(*prometheus.CounterVec) + } + panic("failed registering request_errors_total error in http_prometheus: %v" + err.Error()) +} + +func buildClientResponseHeadersHistogram(o *options) *prometheus.HistogramVec { + if len(o.responseHeadersHistogramBuckets) == 0 { + return nil + } + cv := prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: o.namespace, + Subsystem: "client", + Name: "response_headers_duration_seconds", + Help: "Latency (seconds) until HTTP response headers are received by the client.", + Buckets: o.responseHeadersHistogramBuckets, + }, []string{"service_name", "method"}) + err := o.registry.Register(cv) + if err == nil { + return cv + } else if aeErr, ok := err.(*prometheus.AlreadyRegisteredError); ok { + return aeErr.ExistingCollector.(*prometheus.HistogramVec) + } + panic("failed registering response_headers_duration_seconds error in http_prometheus: %v" + err.Error()) +} + +func httpErrorToLabelAndCode(err error) (int, string) { + // For list of informal code mappings: + // https://en.wikipedia.org/wiki/List_of_HTTP_status_codes + if netErr, ok := err.(*net.OpError); ok { + switch nestErr := netErr.Err.(type) { + case *net.DNSError: + return 599, failedResolution + case *os.SyscallError: + if nestErr.Err == syscall.ECONNREFUSED { + return 599, failedConnRefused + } + return 599, failedUnknown + } + if netErr.Timeout() { + return 598, failedTimeout + } + } else if err == context.Canceled || err == context.DeadlineExceeded { + return 598, failedTimeout + } + return 599, failedUnknown +}