From 0a01dd2dee0e04ba05bea68b2009a060d79791c1 Mon Sep 17 00:00:00 2001 From: John Pruitt Date: Wed, 21 Dec 2022 21:28:28 -0600 Subject: [PATCH] Track metrics per workload in vacuum engine --- pkg/vacuum/vacuum.go | 73 ++++++++++++++++++++++++--------------- pkg/vacuum/vacuum_test.go | 2 +- 2 files changed, 46 insertions(+), 29 deletions(-) diff --git a/pkg/vacuum/vacuum.go b/pkg/vacuum/vacuum.go index 88c0276703..94173d3e2a 100644 --- a/pkg/vacuum/vacuum.go +++ b/pkg/vacuum/vacuum.go @@ -60,40 +60,48 @@ import ( ) var ( - tablesVacuumedTotal = prometheus.NewCounter(prometheus.CounterOpts{ + numberVacuumConnections = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: util.PromNamespace, Subsystem: "vacuum", - Name: "tables_vacuumed_total", - Help: "Total number of tables vacuumed by the Promscale vacuum engine.", + Name: "number_vacuum_connections", + Help: "Number of database connections currently in use by the vacuum engine. One taken up by the advisory lock, the rest by vacuum commands.", }) - vacuumErrorsTotal = prometheus.NewCounter(prometheus.CounterOpts{ + vacuumErrorsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: util.PromNamespace, Subsystem: "vacuum", Name: "vacuum_errors_total", Help: "Total number of errors encountered by the Promscale vacuum engine while vacuuming tables.", - }) - tablesNeedingVacuum = prometheus.NewGauge(prometheus.GaugeOpts{ + }, + []string{"workload"}, + ) + tablesVacuumedTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: util.PromNamespace, Subsystem: "vacuum", - Name: "tables_needing_vacuum", - Help: "Number of tables needing a vacuum detected on this iteration of the engine. This will never exceed 1000 even if there are more to vacuum.", - }) - numberVacuumConnections = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "tables_vacuumed_total", + Help: "Total number of compressed chunks vacuumed by the Promscale vacuum engine.", + }, + []string{"workload"}, + ) + tablesToVacuum = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: util.PromNamespace, Subsystem: "vacuum", - Name: "number_vacuum_connections", - Help: "Number of database connections currently in use by the vacuum engine. One taken up by the advisory lock, the rest by vacuum commands.", - }) - vacuumDurationSeconds = prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "tables_to_vacuum_total", + Help: "Number of compressed chunks needing a vacuum detected on this iteration of the engine. This will never exceed 1000 even if there are more to vacuum.", + }, + []string{"workload"}, + ) + vacuumDurationSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: util.PromNamespace, Subsystem: "vacuum", Name: "vacuum_duration_seconds", Help: "Time spent vacuuming chunks.", - }) + }, + []string{"workload"}, + ) ) func init() { - prometheus.MustRegister(tablesVacuumedTotal, vacuumErrorsTotal, tablesNeedingVacuum, numberVacuumConnections, vacuumDurationSeconds) + prometheus.MustRegister(tablesVacuumedTotal, vacuumErrorsTotal, tablesToVacuum, numberVacuumConnections, vacuumDurationSeconds) } const ( @@ -267,6 +275,7 @@ type chunk struct { type workload struct { name string + workload string query string scaleWorkers bool stop bool @@ -274,12 +283,14 @@ type workload struct { // Run attempts vacuum a batch of compressed chunks func (e *Engine) Run(ctx context.Context) { + const locking = "locking" // grab a database connection and attempt to acquire an advisory lock // if we get the lock, we'll hold it on this connection while the vacuum // work is done con, err := e.pool.Acquire(ctx) if err != nil { log.Error("msg", "failed to acquire a db connection", "error", err) + vacuumErrorsTotal.WithLabelValues(locking).Inc() return } defer con.Release() // return the connection to the pool when finished with it @@ -287,6 +298,7 @@ func (e *Engine) Run(ctx context.Context) { err = con.QueryRow(ctx, sqlAcquireLock).Scan(&acquired) if err != nil { log.Error("msg", "failed to attempt to acquire advisory lock", "error", err) + vacuumErrorsTotal.WithLabelValues(locking).Inc() return } if !acquired { @@ -301,6 +313,7 @@ func (e *Engine) Run(ctx context.Context) { _, err := con.Exec(context.Background(), sqlReleaseLock) if err != nil { log.Error("msg", "vacuum engine failed to release advisory lock", "error", err) + vacuumErrorsTotal.WithLabelValues(locking).Inc() } numberVacuumConnections.Set(0) }() @@ -309,12 +322,14 @@ func (e *Engine) Run(ctx context.Context) { chunksToFreeze := workload{ name: "compressed chunks to freeze", + workload: "compressed-chunks-to-freeze", query: sqlListChunksToFreeze, scaleWorkers: true, stop: false, } chunksMissingStats := workload{ name: "compressed chunks missing stats", + workload: "compressed-chunks-missing-stats", query: sqlListChunksMissingStats, scaleWorkers: false, stop: false, @@ -338,10 +353,11 @@ func (e *Engine) Run(ctx context.Context) { chunks, err := e.listChunks(ctx, con, w.query) if err != nil { log.Error("msg", fmt.Sprintf("failed to list %s", w.name), "error", err) + vacuumErrorsTotal.WithLabelValues(w.workload).Inc() w.stop = true continue } - tablesNeedingVacuum.Set(float64(len(chunks))) + tablesToVacuum.WithLabelValues(w.workload).Set(float64(len(chunks))) if len(chunks) == 0 { log.Debug("msg", fmt.Sprintf("zero %s", w.name)) w.stop = true @@ -355,13 +371,14 @@ func (e *Engine) Run(ctx context.Context) { numWorkers, err = e.calcNumWorkers(ctx, con, chunks) if err != nil { log.Error("msg", "failed to calc num workers", "error", err) + vacuumErrorsTotal.WithLabelValues(w.workload).Inc() w.stop = true continue } } // vacuum the chunks - runWorkers(ctx, numWorkers, chunks, e.worker) + runWorkers(ctx, w.workload, numWorkers, chunks, e.worker) log.Debug("msg", "vacuum workers finished. delaying before next iteration...") // in some cases, have seen it take up to 10 seconds for the stats to be updated post vacuum time.Sleep(delay) @@ -457,15 +474,15 @@ func (e *Engine) calcNumWorkers(ctx context.Context, con *pgxpool.Conn, chunks [ // runWorkers kicks off a number of goroutines to work on the chunks in parallel // blocks until the workers complete -func runWorkers(ctx context.Context, numWorkers int, chunks []*chunk, worker func(context.Context, int, <-chan *chunk)) { +func runWorkers(ctx context.Context, workload string, numWorkers int, chunks []*chunk, worker func(context.Context, string, int, <-chan *chunk)) { todo := make(chan *chunk, len(chunks)) var wg sync.WaitGroup wg.Add(numWorkers) for id := 0; id < numWorkers; id++ { - go func(ctx context.Context, id int, todo <-chan *chunk) { + go func(ctx context.Context, workload string, id int, todo <-chan *chunk) { defer wg.Done() - worker(ctx, id, todo) - }(ctx, id, todo) + worker(ctx, workload, id, todo) + }(ctx, workload, id, todo) } for _, c := range chunks { todo <- c @@ -484,11 +501,11 @@ func (e *Engine) getAutovacuumCount(ctx context.Context, con *pgxpool.Conn, name } // worker pulls chunks from a channel and vacuums them -func (e *Engine) worker(ctx context.Context, id int, todo <-chan *chunk) { +func (e *Engine) worker(ctx context.Context, workload string, id int, todo <-chan *chunk) { con, err := e.pool.Acquire(ctx) if err != nil { log.Error("msg", "failed to acquire a database connection", "worker", id, "error", err) - vacuumErrorsTotal.Inc() + vacuumErrorsTotal.WithLabelValues(workload).Inc() return } numberVacuumConnections.Inc() @@ -504,7 +521,7 @@ func (e *Engine) worker(ctx context.Context, id int, todo <-chan *chunk) { autovacuumCount, err := e.getAutovacuumCount(ctx, con, c.name) if err != nil { log.Error("msg", "failed to get current autovacuum count", "chunk", c.name, "worker", id, "error", err) - vacuumErrorsTotal.Inc() + vacuumErrorsTotal.WithLabelValues(workload).Inc() continue } if c.autovacuumCount < autovacuumCount { @@ -519,10 +536,10 @@ func (e *Engine) worker(ctx context.Context, id int, todo <-chan *chunk) { elapsed := time.Since(before).Seconds() if err != nil { log.Error("msg", "failed to vacuum chunk", "chunk", c.name, "worker", id, "error", err) - vacuumErrorsTotal.Inc() + vacuumErrorsTotal.WithLabelValues(workload).Inc() } else { - vacuumDurationSeconds.Observe(elapsed) - tablesVacuumedTotal.Inc() + vacuumDurationSeconds.WithLabelValues(workload).Observe(elapsed) + tablesVacuumedTotal.WithLabelValues(workload).Inc() } } } diff --git a/pkg/vacuum/vacuum_test.go b/pkg/vacuum/vacuum_test.go index f660a8cd47..8427bb6024 100644 --- a/pkg/vacuum/vacuum_test.go +++ b/pkg/vacuum/vacuum_test.go @@ -42,7 +42,7 @@ func Test_runWorkers(t *testing.T) { var mu sync.Mutex // when we "work" on a chunk, append its name to actual actual := make([]string, 0) - runWorkers(context.Background(), tt.numWorkers, tt.chunks, func(ctx context.Context, id int, todo <-chan *chunk) { + runWorkers(context.Background(), "test", tt.numWorkers, tt.chunks, func(ctx context.Context, workload string, id int, todo <-chan *chunk) { for c := range todo { func(ctx context.Context, id int, c *chunk) { mu.Lock()