From 48dd604eaa3d263401db8e30e8b94e354f2779bd Mon Sep 17 00:00:00 2001 From: Igor Shishkin Date: Tue, 3 Sep 2024 14:33:47 +0300 Subject: [PATCH] Use unified metrics & probes handler for all of components (#235) Signed-off-by: Igor Shishkin --- appmetrics/appmetrics.go | 72 +++++++++++++ appmetrics/appmetrics_test.go | 194 ++++++++++++++++++++++++++++++++++ cmd/exporter/main.go | 69 +++++------- cmd/manager/main.go | 63 ++++------- cmd/publisher/main.go | 90 +++++----------- 5 files changed, 342 insertions(+), 146 deletions(-) create mode 100644 appmetrics/appmetrics.go create mode 100644 appmetrics/appmetrics_test.go diff --git a/appmetrics/appmetrics.go b/appmetrics/appmetrics.go new file mode 100644 index 0000000..a29c7cf --- /dev/null +++ b/appmetrics/appmetrics.go @@ -0,0 +1,72 @@ +package appmetrics + +import ( + "net/http" + + "github.com/labstack/echo/v4" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +const ( + livenessProbeURL = "/healthz/liveness" + readinessProbeURL = "/healthz/readiness" + startupProbeURL = "/healthz/startup" + metricsURL = "/metrics" +) + +type AppMetrics interface { + Register(e *echo.Echo) +} + +type appMetrics struct { + livenessProbeFn func() error + readinessProbeFn func() error + startupProbeFn func() error +} + +func New(livenessProbeFn, readinessProbeFn, startupProbeFn func() error) AppMetrics { + return &appMetrics{ + livenessProbeFn: livenessProbeFn, + readinessProbeFn: readinessProbeFn, + startupProbeFn: startupProbeFn, + } +} + +func (m *appMetrics) livenessProbe(c echo.Context) error { + return check(c, m.livenessProbeFn) +} + +func (m *appMetrics) readinessProbe(c echo.Context) error { + return check(c, m.readinessProbeFn) +} + +func (m *appMetrics) startupProbe(c echo.Context) error { + return check(c, m.startupProbeFn) +} + +func (m *appMetrics) metrics(c echo.Context) error { + return echo.WrapHandler(promhttp.Handler())(c) +} + +func (m *appMetrics) Register(e *echo.Echo) { + e.GET(livenessProbeURL, m.livenessProbe) + e.GET(readinessProbeURL, m.readinessProbe) + e.GET(startupProbeURL, m.startupProbe) + e.GET(metricsURL, m.metrics) +} + +func check(c echo.Context, fn func() error) error { + if fn == nil { + return c.JSON(http.StatusNotImplemented, echo.Map{ + "status": "failed", "error": "not implemented: check function is not provided", + }) + } + + if err := fn(); err != nil { + return c.JSON(http.StatusServiceUnavailable, echo.Map{ + "status": "failed", "error": err.Error(), + }) + } + + return c.JSON(http.StatusOK, echo.Map{"status": "ok"}) +} diff --git a/appmetrics/appmetrics_test.go b/appmetrics/appmetrics_test.go new file mode 100644 index 0000000..885a3e0 --- /dev/null +++ b/appmetrics/appmetrics_test.go @@ -0,0 +1,194 @@ +package appmetrics + +import ( + "context" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + + echo "github.com/labstack/echo/v4" + "github.com/labstack/echo/v4/middleware" + "github.com/pkg/errors" + "github.com/stretchr/testify/require" +) + +func TestAll(t *testing.T) { + type testCase struct { + name string + livenessProbeFn func() error + readinessProbeFn func() error + startupProbeFn func() error + url string + expCode int + expData map[string]any + } + + tcs := []testCase{ + // Happy path + { + name: "liveness probe", + livenessProbeFn: func() error { return nil }, + url: livenessProbeURL, + expCode: http.StatusOK, + expData: map[string]any{ + "status": "ok", + }, + }, + { + name: "readiness probe", + readinessProbeFn: func() error { return nil }, + url: readinessProbeURL, + expCode: http.StatusOK, + expData: map[string]any{ + "status": "ok", + }, + }, + { + name: "startup probe", + startupProbeFn: func() error { return nil }, + url: startupProbeURL, + expCode: http.StatusOK, + expData: map[string]any{ + "status": "ok", + }, + }, + + // Not implemented + { + name: "liveness probe not implemented", + readinessProbeFn: func() error { return nil }, + startupProbeFn: func() error { return nil }, + url: livenessProbeURL, + expCode: http.StatusNotImplemented, + expData: map[string]any{ + "status": "failed", + "error": "not implemented: check function is not provided", + }, + }, + { + name: "readiness probe not implemented", + livenessProbeFn: func() error { return nil }, + startupProbeFn: func() error { return nil }, + url: readinessProbeURL, + expCode: http.StatusNotImplemented, + expData: map[string]any{ + "status": "failed", + "error": "not implemented: check function is not provided", + }, + }, + { + name: "startup probe not implemented", + livenessProbeFn: func() error { return nil }, + readinessProbeFn: func() error { return nil }, + url: startupProbeURL, + expCode: http.StatusNotImplemented, + expData: map[string]any{ + "status": "failed", + "error": "not implemented: check function is not provided", + }, + }, + + // Check error + { + name: "liveness probe error", + livenessProbeFn: func() error { return errors.New("blah") }, + url: livenessProbeURL, + expCode: http.StatusServiceUnavailable, + expData: map[string]any{ + "status": "failed", + "error": "blah", + }, + }, + { + name: "readiness probe error", + readinessProbeFn: func() error { return errors.New("blah") }, + url: readinessProbeURL, + expCode: http.StatusServiceUnavailable, + expData: map[string]any{ + "status": "failed", + "error": "blah", + }, + }, + { + name: "startup probe error", + startupProbeFn: func() error { return errors.New("blah") }, + url: startupProbeURL, + expCode: http.StatusServiceUnavailable, + expData: map[string]any{ + "status": "failed", + "error": "blah", + }, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + r := require.New(t) + + e := echo.New() + e.Use(middleware.Logger()) + e.Use(middleware.Recover()) + + appMetrics := New(tc.livenessProbeFn, tc.readinessProbeFn, tc.startupProbeFn) + appMetrics.Register(e) + + srv := httptest.NewServer(e) + defer srv.Close() + + ctx := context.TODO() + + code, v, err := get(ctx, srv.URL+tc.url) + r.NoError(err) + r.Equal(tc.expCode, code) + r.Equal(tc.expData, v) + }) + } +} + +func TestMetrics(t *testing.T) { + r := require.New(t) + + e := echo.New() + e.Use(middleware.Logger()) + e.Use(middleware.Recover()) + + appMetrics := New(nil, nil, nil) + appMetrics.Register(e) + + srv := httptest.NewServer(e) + defer srv.Close() + + req, err := http.NewRequestWithContext(context.TODO(), http.MethodGet, srv.URL+metricsURL, nil) + r.NoError(err) + + resp, err := http.DefaultClient.Do(req) + r.NoError(err) + defer resp.Body.Close() + + data, err := io.ReadAll(resp.Body) + r.NoError(err) + r.True(strings.HasPrefix(string(data), "# HELP")) +} + +func get(ctx context.Context, url string) (int, map[string]any, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return 0, nil, err + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return 0, nil, err + } + defer resp.Body.Close() + + v := map[string]any{} + if err := json.NewDecoder(resp.Body).Decode(&v); err != nil { + return 0, nil, err + } + + return resp.StatusCode, v, nil +} diff --git a/cmd/exporter/main.go b/cmd/exporter/main.go index d726399..9650871 100644 --- a/cmd/exporter/main.go +++ b/cmd/exporter/main.go @@ -6,11 +6,14 @@ import ( "net/http" "github.com/kelseyhightower/envconfig" + "github.com/labstack/echo-contrib/echoprometheus" + echo "github.com/labstack/echo/v4" + "github.com/labstack/echo/v4/middleware" _ "github.com/lib/pq" - "github.com/prometheus/client_golang/prometheus/promhttp" log "github.com/sirupsen/logrus" "golang.org/x/sync/errgroup" + "github.com/teran/archived/appmetrics" "github.com/teran/archived/exporter/service" "github.com/teran/archived/repositories/metadata/postgresql" ) @@ -62,49 +65,29 @@ func main() { return svc.Run(ctx) }) + me := echo.New() + me.Use(middleware.Logger()) + me.Use(echoprometheus.NewMiddleware("exporter_metrics")) + me.Use(middleware.Recover()) + + checkFn := func() error { + if err := db.Ping(); err != nil { + return err + } + + return nil + } + + metrics := appmetrics.New(checkFn, checkFn, checkFn) + metrics.Register(me) + g.Go(func() error { - http.Handle("/metrics", promhttp.Handler()) - - http.HandleFunc("/healthz/startup", func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - if _, err := w.Write([]byte("ok\n")); err != nil { - panic(err) - } - }) - - http.HandleFunc("/healthz/readiness", func(w http.ResponseWriter, r *http.Request) { - if err := db.Ping(); err != nil { - log.Warnf("db.Ping() error on readiness probe: %s", err) - - w.WriteHeader(http.StatusServiceUnavailable) - if _, err := w.Write([]byte("failed\n")); err != nil { - panic(err) - } - } else { - w.WriteHeader(http.StatusOK) - if _, err := w.Write([]byte("ok\n")); err != nil { - panic(err) - } - } - }) - - http.HandleFunc("/healthz/liveness", func(w http.ResponseWriter, r *http.Request) { - if err := db.Ping(); err != nil { - log.Warnf("db.Ping() error on liveness probe: %s", err) - - w.WriteHeader(http.StatusServiceUnavailable) - if _, err := w.Write([]byte("failed\n")); err != nil { - panic(err) - } - } else { - w.WriteHeader(http.StatusOK) - if _, err := w.Write([]byte("ok\n")); err != nil { - panic(err) - } - } - }) - - return http.ListenAndServe(cfg.Addr, nil) + srv := http.Server{ + Addr: cfg.Addr, + Handler: me, + } + + return srv.ListenAndServe() }) if err := g.Wait(); err != nil { diff --git a/cmd/manager/main.go b/cmd/manager/main.go index 594f346..86a4b9c 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -17,9 +17,11 @@ import ( "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery" "github.com/kelseyhightower/envconfig" + "github.com/labstack/echo-contrib/echoprometheus" + echo "github.com/labstack/echo/v4" + "github.com/labstack/echo/v4/middleware" _ "github.com/lib/pq" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" log "github.com/sirupsen/logrus" "go.opentelemetry.io/otel/trace" "golang.org/x/sync/errgroup" @@ -27,6 +29,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "github.com/teran/archived/appmetrics" grpcManagePresenter "github.com/teran/archived/manager/presenter/grpc" awsBlobRepo "github.com/teran/archived/repositories/blob/aws" "github.com/teran/archived/repositories/metadata/postgresql" @@ -147,49 +150,29 @@ func main() { return gs.Serve(listener) }) - g.Go(func() error { - http.Handle("/metrics", promhttp.Handler()) + me := echo.New() + me.Use(middleware.Logger()) + me.Use(echoprometheus.NewMiddleware("manager_metrics")) + me.Use(middleware.Recover()) - http.HandleFunc("/healthz/startup", func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - if _, err := w.Write([]byte("ok\n")); err != nil { - panic(err) - } - }) + checkFn := func() error { + if err := db.Ping(); err != nil { + return err + } - http.HandleFunc("/healthz/readiness", func(w http.ResponseWriter, r *http.Request) { - if err := db.Ping(); err != nil { - log.Warnf("db.Ping() error on readiness probe: %s", err) - - w.WriteHeader(http.StatusServiceUnavailable) - if _, err := w.Write([]byte("failed\n")); err != nil { - panic(err) - } - } else { - w.WriteHeader(http.StatusOK) - if _, err := w.Write([]byte("ok\n")); err != nil { - panic(err) - } - } - }) + return nil + } - http.HandleFunc("/healthz/liveness", func(w http.ResponseWriter, r *http.Request) { - if err := db.Ping(); err != nil { - log.Warnf("db.Ping() error on liveness probe: %s", err) - - w.WriteHeader(http.StatusServiceUnavailable) - if _, err := w.Write([]byte("failed\n")); err != nil { - panic(err) - } - } else { - w.WriteHeader(http.StatusOK) - if _, err := w.Write([]byte("ok\n")); err != nil { - panic(err) - } - } - }) + metrics := appmetrics.New(checkFn, checkFn, checkFn) + metrics.Register(me) + + g.Go(func() error { + srv := http.Server{ + Addr: cfg.MetricsAddr, + Handler: me, + } - return http.ListenAndServe(cfg.MetricsAddr, nil) + return srv.ListenAndServe() }) if err := g.Wait(); err != nil { diff --git a/cmd/publisher/main.go b/cmd/publisher/main.go index 172fbab..a6c8622 100644 --- a/cmd/publisher/main.go +++ b/cmd/publisher/main.go @@ -16,10 +16,10 @@ import ( echo "github.com/labstack/echo/v4" "github.com/labstack/echo/v4/middleware" _ "github.com/lib/pq" - "github.com/prometheus/client_golang/prometheus/promhttp" log "github.com/sirupsen/logrus" "golang.org/x/sync/errgroup" + "github.com/teran/archived/appmetrics" htmlPresenter "github.com/teran/archived/publisher/presenter/html" awsBlobRepo "github.com/teran/archived/repositories/blob/aws" "github.com/teran/archived/repositories/cache/metadata/memcache" @@ -136,71 +136,35 @@ func main() { return srv.ListenAndServe() }) - g.Go(func() error { - http.Handle("/metrics", promhttp.Handler()) + me := echo.New() + me.Use(middleware.Logger()) + me.Use(echoprometheus.NewMiddleware("publisher_metrics")) + me.Use(middleware.Recover()) - http.HandleFunc("/healthz/startup", func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - if _, err := w.Write([]byte("ok\n")); err != nil { - panic(err) - } - }) - - http.HandleFunc("/healthz/readiness", func(w http.ResponseWriter, r *http.Request) { - if len(cfg.MemcacheServers) > 0 { - if err := cli.Ping(); err != nil { - log.Warnf("memcache.Ping() error on readiness probe: %s", err) - - w.WriteHeader(http.StatusServiceUnavailable) - if _, err := w.Write([]byte("failed\n")); err != nil { - panic(err) - } - return - } - } - if err := db.Ping(); err != nil { - log.Warnf("db.Ping() error on readiness probe: %s", err) - - w.WriteHeader(http.StatusServiceUnavailable) - if _, err := w.Write([]byte("failed\n")); err != nil { - panic(err) - } - return - } - w.WriteHeader(http.StatusOK) - if _, err := w.Write([]byte("ok\n")); err != nil { - panic(err) - } - }) - - http.HandleFunc("/healthz/liveness", func(w http.ResponseWriter, r *http.Request) { - if len(cfg.MemcacheServers) > 0 { - if err := cli.Ping(); err != nil { - log.Warnf("memcache.Ping() error on readiness probe: %s", err) - - w.WriteHeader(http.StatusServiceUnavailable) - if _, err := w.Write([]byte("failed\n")); err != nil { - panic(err) - } - return - } + checkFn := func() error { + if len(cfg.MemcacheServers) > 0 { + if err := cli.Ping(); err != nil { + return err } - if err := db.Ping(); err != nil { - log.Warnf("db.Ping() error on readiness probe: %s", err) - - w.WriteHeader(http.StatusServiceUnavailable) - if _, err := w.Write([]byte("failed\n")); err != nil { - panic(err) - } - return - } - w.WriteHeader(http.StatusOK) - if _, err := w.Write([]byte("ok\n")); err != nil { - panic(err) - } - }) + } - return http.ListenAndServe(cfg.MetricsAddr, nil) + if err := db.Ping(); err != nil { + return err + } + + return nil + } + + metrics := appmetrics.New(checkFn, checkFn, checkFn) + metrics.Register(me) + + g.Go(func() error { + srv := http.Server{ + Addr: cfg.MetricsAddr, + Handler: me, + } + + return srv.ListenAndServe() }) if err := g.Wait(); err != nil {