Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Track metrics per workload in vacuum engine
Browse files Browse the repository at this point in the history
  • Loading branch information
jgpruitt committed Dec 22, 2022
1 parent 93ed639 commit 0a01dd2
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 29 deletions.
73 changes: 45 additions & 28 deletions pkg/vacuum/vacuum.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,40 +60,48 @@ import (
)

var (
tablesVacuumedTotal = prometheus.NewCounter(prometheus.CounterOpts{
numberVacuumConnections = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: util.PromNamespace,
Subsystem: "vacuum",
Name: "tables_vacuumed_total",
Help: "Total number of tables vacuumed by the Promscale vacuum engine.",
Name: "number_vacuum_connections",
Help: "Number of database connections currently in use by the vacuum engine. One taken up by the advisory lock, the rest by vacuum commands.",
})
vacuumErrorsTotal = prometheus.NewCounter(prometheus.CounterOpts{
vacuumErrorsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: util.PromNamespace,
Subsystem: "vacuum",
Name: "vacuum_errors_total",
Help: "Total number of errors encountered by the Promscale vacuum engine while vacuuming tables.",
})
tablesNeedingVacuum = prometheus.NewGauge(prometheus.GaugeOpts{
},
[]string{"workload"},
)
tablesVacuumedTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: util.PromNamespace,
Subsystem: "vacuum",
Name: "tables_needing_vacuum",
Help: "Number of tables needing a vacuum detected on this iteration of the engine. This will never exceed 1000 even if there are more to vacuum.",
})
numberVacuumConnections = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "tables_vacuumed_total",
Help: "Total number of compressed chunks vacuumed by the Promscale vacuum engine.",
},
[]string{"workload"},
)
tablesToVacuum = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: util.PromNamespace,
Subsystem: "vacuum",
Name: "number_vacuum_connections",
Help: "Number of database connections currently in use by the vacuum engine. One taken up by the advisory lock, the rest by vacuum commands.",
})
vacuumDurationSeconds = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "tables_to_vacuum_total",
Help: "Number of compressed chunks needing a vacuum detected on this iteration of the engine. This will never exceed 1000 even if there are more to vacuum.",
},
[]string{"workload"},
)
vacuumDurationSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: util.PromNamespace,
Subsystem: "vacuum",
Name: "vacuum_duration_seconds",
Help: "Time spent vacuuming chunks.",
})
},
[]string{"workload"},
)
)

func init() {
prometheus.MustRegister(tablesVacuumedTotal, vacuumErrorsTotal, tablesNeedingVacuum, numberVacuumConnections, vacuumDurationSeconds)
prometheus.MustRegister(tablesVacuumedTotal, vacuumErrorsTotal, tablesToVacuum, numberVacuumConnections, vacuumDurationSeconds)
}

const (
Expand Down Expand Up @@ -267,26 +275,30 @@ type chunk struct {

type workload struct {
name string
workload string
query string
scaleWorkers bool
stop bool
}

// Run attempts vacuum a batch of compressed chunks
func (e *Engine) Run(ctx context.Context) {
const locking = "locking"
// grab a database connection and attempt to acquire an advisory lock
// if we get the lock, we'll hold it on this connection while the vacuum
// work is done
con, err := e.pool.Acquire(ctx)
if err != nil {
log.Error("msg", "failed to acquire a db connection", "error", err)
vacuumErrorsTotal.WithLabelValues(locking).Inc()
return
}
defer con.Release() // return the connection to the pool when finished with it
acquired := false
err = con.QueryRow(ctx, sqlAcquireLock).Scan(&acquired)
if err != nil {
log.Error("msg", "failed to attempt to acquire advisory lock", "error", err)
vacuumErrorsTotal.WithLabelValues(locking).Inc()
return
}
if !acquired {
Expand All @@ -301,6 +313,7 @@ func (e *Engine) Run(ctx context.Context) {
_, err := con.Exec(context.Background(), sqlReleaseLock)
if err != nil {
log.Error("msg", "vacuum engine failed to release advisory lock", "error", err)
vacuumErrorsTotal.WithLabelValues(locking).Inc()
}
numberVacuumConnections.Set(0)
}()
Expand All @@ -309,12 +322,14 @@ func (e *Engine) Run(ctx context.Context) {

chunksToFreeze := workload{
name: "compressed chunks to freeze",
workload: "compressed-chunks-to-freeze",
query: sqlListChunksToFreeze,
scaleWorkers: true,
stop: false,
}
chunksMissingStats := workload{
name: "compressed chunks missing stats",
workload: "compressed-chunks-missing-stats",
query: sqlListChunksMissingStats,
scaleWorkers: false,
stop: false,
Expand All @@ -338,10 +353,11 @@ func (e *Engine) Run(ctx context.Context) {
chunks, err := e.listChunks(ctx, con, w.query)
if err != nil {
log.Error("msg", fmt.Sprintf("failed to list %s", w.name), "error", err)
vacuumErrorsTotal.WithLabelValues(w.workload).Inc()
w.stop = true
continue
}
tablesNeedingVacuum.Set(float64(len(chunks)))
tablesToVacuum.WithLabelValues(w.workload).Set(float64(len(chunks)))
if len(chunks) == 0 {
log.Debug("msg", fmt.Sprintf("zero %s", w.name))
w.stop = true
Expand All @@ -355,13 +371,14 @@ func (e *Engine) Run(ctx context.Context) {
numWorkers, err = e.calcNumWorkers(ctx, con, chunks)
if err != nil {
log.Error("msg", "failed to calc num workers", "error", err)
vacuumErrorsTotal.WithLabelValues(w.workload).Inc()
w.stop = true
continue
}
}

// vacuum the chunks
runWorkers(ctx, numWorkers, chunks, e.worker)
runWorkers(ctx, w.workload, numWorkers, chunks, e.worker)
log.Debug("msg", "vacuum workers finished. delaying before next iteration...")
// in some cases, have seen it take up to 10 seconds for the stats to be updated post vacuum
time.Sleep(delay)
Expand Down Expand Up @@ -457,15 +474,15 @@ func (e *Engine) calcNumWorkers(ctx context.Context, con *pgxpool.Conn, chunks [

// runWorkers kicks off a number of goroutines to work on the chunks in parallel
// blocks until the workers complete
func runWorkers(ctx context.Context, numWorkers int, chunks []*chunk, worker func(context.Context, int, <-chan *chunk)) {
func runWorkers(ctx context.Context, workload string, numWorkers int, chunks []*chunk, worker func(context.Context, string, int, <-chan *chunk)) {
todo := make(chan *chunk, len(chunks))
var wg sync.WaitGroup
wg.Add(numWorkers)
for id := 0; id < numWorkers; id++ {
go func(ctx context.Context, id int, todo <-chan *chunk) {
go func(ctx context.Context, workload string, id int, todo <-chan *chunk) {
defer wg.Done()
worker(ctx, id, todo)
}(ctx, id, todo)
worker(ctx, workload, id, todo)
}(ctx, workload, id, todo)
}
for _, c := range chunks {
todo <- c
Expand All @@ -484,11 +501,11 @@ func (e *Engine) getAutovacuumCount(ctx context.Context, con *pgxpool.Conn, name
}

// worker pulls chunks from a channel and vacuums them
func (e *Engine) worker(ctx context.Context, id int, todo <-chan *chunk) {
func (e *Engine) worker(ctx context.Context, workload string, id int, todo <-chan *chunk) {
con, err := e.pool.Acquire(ctx)
if err != nil {
log.Error("msg", "failed to acquire a database connection", "worker", id, "error", err)
vacuumErrorsTotal.Inc()
vacuumErrorsTotal.WithLabelValues(workload).Inc()
return
}
numberVacuumConnections.Inc()
Expand All @@ -504,7 +521,7 @@ func (e *Engine) worker(ctx context.Context, id int, todo <-chan *chunk) {
autovacuumCount, err := e.getAutovacuumCount(ctx, con, c.name)
if err != nil {
log.Error("msg", "failed to get current autovacuum count", "chunk", c.name, "worker", id, "error", err)
vacuumErrorsTotal.Inc()
vacuumErrorsTotal.WithLabelValues(workload).Inc()
continue
}
if c.autovacuumCount < autovacuumCount {
Expand All @@ -519,10 +536,10 @@ func (e *Engine) worker(ctx context.Context, id int, todo <-chan *chunk) {
elapsed := time.Since(before).Seconds()
if err != nil {
log.Error("msg", "failed to vacuum chunk", "chunk", c.name, "worker", id, "error", err)
vacuumErrorsTotal.Inc()
vacuumErrorsTotal.WithLabelValues(workload).Inc()
} else {
vacuumDurationSeconds.Observe(elapsed)
tablesVacuumedTotal.Inc()
vacuumDurationSeconds.WithLabelValues(workload).Observe(elapsed)
tablesVacuumedTotal.WithLabelValues(workload).Inc()
}
}
}
2 changes: 1 addition & 1 deletion pkg/vacuum/vacuum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func Test_runWorkers(t *testing.T) {
var mu sync.Mutex
// when we "work" on a chunk, append its name to actual
actual := make([]string, 0)
runWorkers(context.Background(), tt.numWorkers, tt.chunks, func(ctx context.Context, id int, todo <-chan *chunk) {
runWorkers(context.Background(), "test", tt.numWorkers, tt.chunks, func(ctx context.Context, workload string, id int, todo <-chan *chunk) {
for c := range todo {
func(ctx context.Context, id int, c *chunk) {
mu.Lock()
Expand Down

0 comments on commit 0a01dd2

Please # to comment.