diff --git a/collector/collector_test.go b/collector/collector_test.go index 061de8895..40371fde5 100644 --- a/collector/collector_test.go +++ b/collector/collector_test.go @@ -49,8 +49,12 @@ func readMetric(m prometheus.Metric) MetricResult { func sanitizeQuery(q string) string { q = strings.Join(strings.Fields(q), " ") q = strings.Replace(q, "(", "\\(", -1) + q = strings.Replace(q, "?", "\\?", -1) q = strings.Replace(q, ")", "\\)", -1) + q = strings.Replace(q, "[", "\\[", -1) + q = strings.Replace(q, "]", "\\]", -1) q = strings.Replace(q, "*", "\\*", -1) + q = strings.Replace(q, "^", "\\^", -1) q = strings.Replace(q, "$", "\\$", -1) return q } diff --git a/collector/pg_archiver.go b/collector/pg_archiver.go new file mode 100644 index 000000000..9c33a0e44 --- /dev/null +++ b/collector/pg_archiver.go @@ -0,0 +1,81 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package collector + +import ( + "context" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" +) + +func init() { + registerCollector("archiver", defaultDisabled, NewPGArchiverCollector) +} + +type PGArchiverCollector struct { + log log.Logger +} + +const archiverSubsystem = "archiver" + +func NewPGArchiverCollector(config collectorConfig) (Collector, error) { + return &PGArchiverCollector{log: config.logger}, nil +} + +var ( + pgArchiverPendingWalCount = prometheus.NewDesc( + prometheus.BuildFQName(namespace, archiverSubsystem, "pending_wals"), + "Number of WAL files waiting to be archived", + []string{}, prometheus.Labels{}, + ) + + pgArchiverQuery = ` + WITH + current_wal_file AS ( + SELECT CASE WHEN NOT pg_is_in_recovery() THEN pg_walfile_name(pg_current_wal_insert_lsn()) ELSE NULL END pg_walfile_name + ), + current_wal AS ( + SELECT + ('x'||substring(pg_walfile_name,9,8))::bit(32)::int log, + ('x'||substring(pg_walfile_name,17,8))::bit(32)::int seg, + pg_walfile_name + FROM current_wal_file + ), + archive_wal AS( + SELECT + ('x'||substring(last_archived_wal,9,8))::bit(32)::int log, + ('x'||substring(last_archived_wal,17,8))::bit(32)::int seg, + last_archived_wal + FROM pg_stat_archiver + ) + SELECT coalesce(((cw.log - aw.log) * 256) + (cw.seg-aw.seg),'NaN'::float) as pending_wal_count FROM current_wal cw, archive_wal aw + ` +) + +func (c *PGArchiverCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() + row := db.QueryRowContext(ctx, + pgArchiverQuery) + var pendingWalCount float64 + err := row.Scan(&pendingWalCount) + if err != nil { + return err + } + ch <- prometheus.MustNewConstMetric( + pgArchiverPendingWalCount, + prometheus.GaugeValue, + pendingWalCount, + ) + return nil +} diff --git a/collector/pg_archiver_test.go b/collector/pg_archiver_test.go new file mode 100644 index 000000000..e2bd5969d --- /dev/null +++ b/collector/pg_archiver_test.go @@ -0,0 +1,96 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package collector + +import ( + "context" + "math" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/smartystreets/goconvey/convey" +) + +func TestPgArchiverCollector(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + + inst := &instance{db: db} + mock.ExpectQuery(sanitizeQuery(pgArchiverQuery)).WillReturnRows(sqlmock.NewRows([]string{"pending_wal_count"}). + AddRow(5)) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGArchiverCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGArchiverCollector.Update: %s", err) + } + }() + + expected := []MetricResult{ + {labels: labelMap{}, value: 5, metricType: dto.MetricType_GAUGE}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} + +func TestPgArchiverNaNCollector(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + + inst := &instance{db: db} + mock.ExpectQuery(sanitizeQuery(pgArchiverQuery)).WillReturnRows(sqlmock.NewRows([]string{"pending_wal_count"}). + AddRow(math.NaN())) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGArchiverCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGArchiverCollector.Update: %s", err) + } + }() + + expected := []MetricResult{ + {labels: labelMap{}, value: math.NaN(), metricType: dto.MetricType_GAUGE}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect.labels, convey.ShouldResemble, m.labels) + convey.So(math.IsNaN(m.value), convey.ShouldResemble, math.IsNaN(expect.value)) + convey.So(expect.metricType, convey.ShouldEqual, m.metricType) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} diff --git a/collector/pg_blocked.go b/collector/pg_blocked.go new file mode 100644 index 000000000..f215ed8c7 --- /dev/null +++ b/collector/pg_blocked.go @@ -0,0 +1,101 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "context" + "database/sql" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" +) + +const blockedSubsystem = "blocked" + +func init() { + registerCollector(blockedSubsystem, defaultDisabled, NewPGBlockedCollector) +} + +type PGBlockedCollector struct { + log log.Logger +} + +func NewPGBlockedCollector(config collectorConfig) (Collector, error) { + return &PGBlockedCollector{log: config.logger}, nil +} + +var ( + blockedQueries = prometheus.NewDesc( + prometheus.BuildFQName(namespace, blockedSubsystem, "queries"), + "The current number of blocked queries", + []string{"table"}, + prometheus.Labels{}, + ) + + blockedQuery = ` + SELECT + count(blocked.transactionid) AS queries, + '__transaction__' AS table + FROM pg_catalog.pg_locks blocked + WHERE NOT blocked.granted AND locktype = 'transactionid' + GROUP BY locktype + UNION + SELECT + count(blocked.relation) AS queries, + blocked.relation::regclass::text AS table + FROM pg_catalog.pg_locks blocked + WHERE NOT blocked.granted AND locktype != 'transactionid' + GROUP BY relation + ` +) + +func (PGBlockedCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() + rows, err := db.QueryContext(ctx, + blockedQuery) + + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + var table sql.NullString + var queries sql.NullFloat64 + + if err := rows.Scan(&queries, &table); err != nil { + return err + } + + tableLabel := "unknown" + if table.Valid { + tableLabel = table.String + } + + queriesMetric := 0.0 + if queries.Valid { + queriesMetric = queries.Float64 + } + ch <- prometheus.MustNewConstMetric( + blockedQueries, + prometheus.GaugeValue, + queriesMetric, + tableLabel, + ) + } + if err := rows.Err(); err != nil { + return err + } + return nil +} diff --git a/collector/pg_blocked_test.go b/collector/pg_blocked_test.go new file mode 100644 index 000000000..281edc824 --- /dev/null +++ b/collector/pg_blocked_test.go @@ -0,0 +1,101 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package collector + +import ( + "context" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/smartystreets/goconvey/convey" +) + +func TestPgBlockedCollector(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + inst := &instance{db: db} + columns := []string{ + "queries", + "table", + } + rows := sqlmock.NewRows(columns). + AddRow(1000, "pgbouncer") + + mock.ExpectQuery(sanitizeQuery(blockedQuery)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGBlockedCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGBlockedCollector.Update: %s", err) + } + }() + expected := []MetricResult{ + {labels: labelMap{"table": "pgbouncer"}, value: 1000, metricType: dto.MetricType_GAUGE}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} + +func TestPgBlockedCollectorNull(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + inst := &instance{db: db} + columns := []string{ + "queries", + "table", + } + rows := sqlmock.NewRows(columns). + AddRow(nil, nil) + + mock.ExpectQuery(sanitizeQuery(blockedQuery)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGBlockedCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGBlockedCollector.Update: %s", err) + } + }() + expected := []MetricResult{ + {labels: labelMap{"table": "unknown"}, value: 0, metricType: dto.MetricType_GAUGE}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} diff --git a/collector/pg_database_wraparound.go b/collector/pg_database_wraparound.go new file mode 100644 index 000000000..19f618f83 --- /dev/null +++ b/collector/pg_database_wraparound.go @@ -0,0 +1,112 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "context" + "database/sql" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" +) + +const databaseWraparoundSubsystem = "database_wraparound" + +func init() { + registerCollector(databaseWraparoundSubsystem, defaultDisabled, NewPGDatabaseWraparoundCollector) +} + +type PGDatabaseWraparoundCollector struct { + log log.Logger +} + +func NewPGDatabaseWraparoundCollector(config collectorConfig) (Collector, error) { + return &PGDatabaseWraparoundCollector{log: config.logger}, nil +} + +var ( + databaseWraparoundAgeDatfrozenxid = prometheus.NewDesc( + prometheus.BuildFQName(namespace, databaseWraparoundSubsystem, "age_datfrozenxid_seconds"), + "Age of the oldest transaction ID that has not been frozen.", + []string{"datname"}, + prometheus.Labels{}, + ) + databaseWraparoundAgeDatminmxid = prometheus.NewDesc( + prometheus.BuildFQName(namespace, databaseWraparoundSubsystem, "age_datminmxid"), + "Age of the oldest multi-transaction ID that has been replaced with a transaction ID.", + []string{"datname"}, + prometheus.Labels{}, + ) + + databaseWraparoundQuery = ` + SELECT + datname, + age(d.datfrozenxid) as age_datfrozenxid, + mxid_age(d.datminmxid) as age_datminmxid + FROM + pg_catalog.pg_database d + WHERE + d.datallowconn + ` +) + +func (PGDatabaseWraparoundCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() + rows, err := db.QueryContext(ctx, + databaseWraparoundQuery) + + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + var datname sql.NullString + var ageDatfrozenxid, ageDatminmxid sql.NullFloat64 + + if err := rows.Scan(&datname, &ageDatfrozenxid, &ageDatminmxid); err != nil { + return err + } + + datnameLabel := "unknown" + if datname.Valid { + datnameLabel = datname.String + } + + ageDatfrozenxidMetric := 0.0 + if ageDatfrozenxid.Valid { + ageDatfrozenxidMetric = ageDatfrozenxid.Float64 + } + + ch <- prometheus.MustNewConstMetric( + databaseWraparoundAgeDatfrozenxid, + prometheus.GaugeValue, + ageDatfrozenxidMetric, datnameLabel, + ) + + ageDatminmxidMetric := 0.0 + if ageDatminmxid.Valid { + ageDatminmxidMetric = ageDatminmxid.Float64 + } + ch <- prometheus.MustNewConstMetric( + databaseWraparoundAgeDatminmxid, + prometheus.GaugeValue, + ageDatminmxidMetric, datnameLabel, + ) + } + if err := rows.Err(); err != nil { + return err + } + return nil +} diff --git a/collector/pg_database_wraparound_test.go b/collector/pg_database_wraparound_test.go new file mode 100644 index 000000000..e9ca70335 --- /dev/null +++ b/collector/pg_database_wraparound_test.go @@ -0,0 +1,105 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package collector + +import ( + "context" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/smartystreets/goconvey/convey" +) + +func TestPGDatabaseWraparoundCollector(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + inst := &instance{db: db} + columns := []string{ + "datname", + "age_datfrozenxid", + "age_datminmxid", + } + rows := sqlmock.NewRows(columns). + AddRow("newreddit", 87126426, 0) + + mock.ExpectQuery(sanitizeQuery(databaseWraparoundQuery)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGDatabaseWraparoundCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGDatabaseWraparoundCollector.Update: %s", err) + } + }() + expected := []MetricResult{ + {labels: labelMap{"datname": "newreddit"}, value: 87126426, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"datname": "newreddit"}, value: 0, metricType: dto.MetricType_GAUGE}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} + +func TestPGDatabaseWraparoundCollectorNull(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + inst := &instance{db: db} + columns := []string{ + "datname", + "age_datfrozenxid", + "age_datminmxid", + } + rows := sqlmock.NewRows(columns). + AddRow(nil, nil, nil) + + mock.ExpectQuery(sanitizeQuery(databaseWraparoundQuery)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGDatabaseWraparoundCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGDatabaseWraparoundCollector.Update: %s", err) + } + }() + expected := []MetricResult{ + {labels: labelMap{"datname": "unknown"}, value: 0, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"datname": "unknown"}, value: 0, metricType: dto.MetricType_GAUGE}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} diff --git a/collector/pg_index_size.go b/collector/pg_index_size.go new file mode 100644 index 000000000..fcc33d2c1 --- /dev/null +++ b/collector/pg_index_size.go @@ -0,0 +1,106 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "context" + "database/sql" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" +) + +const indexSizeSubsystem = "index_size" + +func init() { + registerCollector(indexSizeSubsystem, defaultDisabled, NewPGIndexSizeCollector) +} + +type PGIndexSizeCollector struct { + log log.Logger +} + +func NewPGIndexSizeCollector(config collectorConfig) (Collector, error) { + return &PGIndexSizeCollector{log: config.logger}, nil +} + +var ( + indexSizeDesc = prometheus.NewDesc( + prometheus.BuildFQName(namespace, indexSizeSubsystem, "bytes"), + "Size of the index as per pg_table_size function", + []string{"schemaname", "relname", "indexrelname"}, + prometheus.Labels{}, + ) + + indexSizeQuery = ` + SELECT + schemaname, + tablename as relname, + indexname as indexrelname, + pg_class.relpages * 8192::bigint as index_size + FROM + pg_indexes inner join pg_namespace on pg_indexes.schemaname = pg_namespace.nspname + inner join pg_class on pg_class.relnamespace = pg_namespace.oid and pg_class.relname = pg_indexes.indexname + WHERE + pg_indexes.schemaname != 'pg_catalog' + ` +) + +func (PGIndexSizeCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() + rows, err := db.QueryContext(ctx, + indexSizeQuery) + + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + var schemaname, relname, indexrelname sql.NullString + var indexSize sql.NullFloat64 + + if err := rows.Scan(&schemaname, &relname, &indexrelname, &indexSize); err != nil { + return err + } + schemanameLabel := "unknown" + if schemaname.Valid { + schemanameLabel = schemaname.String + } + relnameLabel := "unknown" + if relname.Valid { + relnameLabel = relname.String + } + indexrelnameLabel := "unknown" + if indexrelname.Valid { + indexrelnameLabel = indexrelname.String + } + labels := []string{schemanameLabel, relnameLabel, indexrelnameLabel} + + indexSizeMetric := 0.0 + if indexSize.Valid { + indexSizeMetric = indexSize.Float64 + } + ch <- prometheus.MustNewConstMetric( + indexSizeDesc, + prometheus.GaugeValue, + indexSizeMetric, + labels..., + ) + } + if err := rows.Err(); err != nil { + return err + } + return nil +} diff --git a/collector/pg_index_size_test.go b/collector/pg_index_size_test.go new file mode 100644 index 000000000..8adaa4709 --- /dev/null +++ b/collector/pg_index_size_test.go @@ -0,0 +1,105 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package collector + +import ( + "context" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/smartystreets/goconvey/convey" +) + +func TestPgIndexSizeCollector(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + inst := &instance{db: db} + columns := []string{ + "schemaname", + "relname", + "indexrelname", + "index_size", + } + rows := sqlmock.NewRows(columns). + AddRow("public", "foo", "foo_key", 100) + + mock.ExpectQuery(sanitizeQuery(indexSizeQuery)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGIndexSizeCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGIndexSizeCollector.Update: %s", err) + } + }() + expected := []MetricResult{ + {labels: labelMap{"schemaname": "public", "relname": "foo", "indexrelname": "foo_key"}, value: 100, metricType: dto.MetricType_GAUGE}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} + +func TestPgIndexSizeCollectorNull(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + inst := &instance{db: db} + columns := []string{ + "schemaname", + "relname", + "indexrelname", + "index_size", + } + rows := sqlmock.NewRows(columns). + AddRow(nil, nil, nil, nil) + + mock.ExpectQuery(sanitizeQuery(indexSizeQuery)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGIndexSizeCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGIndexSizeCollector.Update: %s", err) + } + }() + expected := []MetricResult{ + {labels: labelMap{"schemaname": "unknown", "relname": "unknown", "indexrelname": "unknown"}, value: 0, metricType: dto.MetricType_GAUGE}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} diff --git a/collector/pg_long_running_transactions.go b/collector/pg_long_running_transactions.go new file mode 100644 index 000000000..baedd7869 --- /dev/null +++ b/collector/pg_long_running_transactions.go @@ -0,0 +1,93 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "context" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" +) + +const longRunningTransactionsSubsystem = "long_running_transactions" + +func init() { + registerCollector(longRunningTransactionsSubsystem, defaultDisabled, NewPGLongRunningTransactionsCollector) +} + +type PGLongRunningTransactionsCollector struct { + log log.Logger +} + +func NewPGLongRunningTransactionsCollector(config collectorConfig) (Collector, error) { + return &PGLongRunningTransactionsCollector{log: config.logger}, nil +} + +var ( + longRunningTransactionsCount = prometheus.NewDesc( + "pg_long_running_transactions", + "Current number of long running transactions", + []string{}, + prometheus.Labels{}, + ) + + longRunningTransactionsAgeInSeconds = prometheus.NewDesc( + prometheus.BuildFQName(namespace, longRunningTransactionsSubsystem, "age_in_seconds"), + "The current maximum transaction age in seconds", + []string{}, + prometheus.Labels{}, + ) + + longRunningTransactionsQuery = ` + SELECT + COUNT(*) as transactions, + MAX(EXTRACT(EPOCH FROM (clock_timestamp() - xact_start))) AS age_in_seconds + FROM pg_catalog.pg_stat_activity + WHERE state is distinct from 'idle' AND (now() - xact_start) > '1 minutes'::interval AND query not like 'autovacuum:%' + ` +) + +func (PGLongRunningTransactionsCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() + rows, err := db.QueryContext(ctx, + longRunningTransactionsQuery) + + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + var transactions, ageInSeconds float64 + + if err := rows.Scan(&transactions, &ageInSeconds); err != nil { + return err + } + + ch <- prometheus.MustNewConstMetric( + longRunningTransactionsCount, + prometheus.GaugeValue, + transactions, + ) + ch <- prometheus.MustNewConstMetric( + longRunningTransactionsAgeInSeconds, + prometheus.GaugeValue, + ageInSeconds, + ) + } + if err := rows.Err(); err != nil { + return err + } + return nil +} diff --git a/collector/pg_long_running_transactions_summary.go b/collector/pg_long_running_transactions_summary.go new file mode 100644 index 000000000..0fb70c717 --- /dev/null +++ b/collector/pg_long_running_transactions_summary.go @@ -0,0 +1,109 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "context" + "database/sql" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" +) + +const longRunningTransactionsSummarySubsystem = "long_running_transactions_summary" + +func init() { + registerCollector(longRunningTransactionsSummarySubsystem, defaultDisabled, NewPGLongRunningTransactionsSummaryCollector) +} + +type PGLongRunningTransactionsSummaryCollector struct { + log log.Logger +} + +func NewPGLongRunningTransactionsSummaryCollector(config collectorConfig) (Collector, error) { + return &PGLongRunningTransactionsSummaryCollector{log: config.logger}, nil +} + +var ( + longRunningTransactionsSummaryMaxAgeInSeconds = prometheus.NewDesc( + prometheus.BuildFQName(namespace, longRunningTransactionsSummarySubsystem, "max_age_in_seconds"), + "The current maximum transaction age in seconds", + []string{"application", "endpoint"}, + prometheus.Labels{}, + ) + + longRunningTransactionsSummaryQuery = ` + SELECT + activity.matches[1] AS application, + activity.matches[2] AS endpoint, + MAX(age_in_seconds) AS max_age_in_seconds + FROM ( + SELECT + regexp_matches(query, '^\s*(?:\/\*(?:application:(\w+),?)?(?:correlation_id:\w+,?)?(?:jid:\w+,?)?(?:endpoint_id:([\w/\-\.:\#\s]+),?)?.*?\*\/)?\s*(\w+)') AS matches, + EXTRACT(EPOCH FROM (clock_timestamp() - xact_start)) AS age_in_seconds + FROM + pg_catalog.pg_stat_activity + WHERE state <> 'idle' + AND (clock_timestamp() - xact_start) > '30 seconds'::interval + AND query NOT LIKE 'autovacuum:%' + ) activity + GROUP BY application, endpoint + ORDER BY max_age_in_seconds DESC + ` +) + +func (PGLongRunningTransactionsSummaryCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() + rows, err := db.QueryContext(ctx, + longRunningTransactionsSummaryQuery) + + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + var application, endpoint sql.NullString + var maxAgeInSeconds sql.NullFloat64 + + if err := rows.Scan(&application, &endpoint, &maxAgeInSeconds); err != nil { + return err + } + + applicationLabel := "unknown" + if application.Valid { + applicationLabel = application.String + } + endpointLabel := "unknown" + if endpoint.Valid { + endpointLabel = endpoint.String + } + labels := []string{applicationLabel, endpointLabel} + + maxAgeInSecondsMetric := 0.0 + if maxAgeInSeconds.Valid { + maxAgeInSecondsMetric = maxAgeInSeconds.Float64 + } + ch <- prometheus.MustNewConstMetric( + longRunningTransactionsSummaryMaxAgeInSeconds, + prometheus.GaugeValue, + maxAgeInSecondsMetric, + labels..., + ) + } + if err := rows.Err(); err != nil { + return err + } + return nil +} diff --git a/collector/pg_long_running_transactions_test.go b/collector/pg_long_running_transactions_test.go new file mode 100644 index 000000000..eedda7c65 --- /dev/null +++ b/collector/pg_long_running_transactions_test.go @@ -0,0 +1,63 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package collector + +import ( + "context" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/smartystreets/goconvey/convey" +) + +func TestPGLongRunningTransactionsCollector(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + inst := &instance{db: db} + columns := []string{ + "transactions", + "age_in_seconds", + } + rows := sqlmock.NewRows(columns). + AddRow(20, 1200) + + mock.ExpectQuery(sanitizeQuery(longRunningTransactionsQuery)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGLongRunningTransactionsCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGLongRunningTransactionsCollector.Update: %s", err) + } + }() + expected := []MetricResult{ + {labels: labelMap{}, value: 20, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{}, value: 1200, metricType: dto.MetricType_GAUGE}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} diff --git a/collector/pg_oldest_blocked.go b/collector/pg_oldest_blocked.go new file mode 100644 index 000000000..6bbf52ede --- /dev/null +++ b/collector/pg_oldest_blocked.go @@ -0,0 +1,83 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "context" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" +) + +const oldestBlockedSubsystem = "oldest_blocked" + +func init() { + registerCollector(oldestBlockedSubsystem, defaultDisabled, NewPGOldestBlockedCollector) +} + +type PGOldestBlockedCollector struct { + log log.Logger +} + +func NewPGOldestBlockedCollector(config collectorConfig) (Collector, error) { + return &PGOldestBlockedCollector{log: config.logger}, nil +} + +var ( + oldestBlockedAgeSeconds = prometheus.NewDesc( + prometheus.BuildFQName(namespace, oldestBlockedSubsystem, "age_seconds"), + "Largest number of seconds any transaction is currently waiting on a lock", + []string{}, + prometheus.Labels{}, + ) + + oldestBlockedQuery = ` + SELECT + coalesce(extract('epoch' from max(clock_timestamp() - state_change)), 0) age_seconds + FROM + pg_catalog.pg_stat_activity + WHERE + wait_event_type = 'Lock' + AND state='active' + ` +) + +func (PGOldestBlockedCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() + rows, err := db.QueryContext(ctx, + oldestBlockedQuery) + + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + var ageSeconds float64 + + if err := rows.Scan(&ageSeconds); err != nil { + return err + } + + ch <- prometheus.MustNewConstMetric( + oldestBlockedAgeSeconds, + prometheus.GaugeValue, + ageSeconds, + ) + } + if err := rows.Err(); err != nil { + return err + } + return nil +} diff --git a/collector/pg_oldest_blocked_test.go b/collector/pg_oldest_blocked_test.go new file mode 100644 index 000000000..9aac3762f --- /dev/null +++ b/collector/pg_oldest_blocked_test.go @@ -0,0 +1,61 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package collector + +import ( + "context" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/smartystreets/goconvey/convey" +) + +func TestPgOldestBlockedCollector(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + inst := &instance{db: db} + columns := []string{ + "age_seconds", + } + rows := sqlmock.NewRows(columns). + AddRow(100000) + + mock.ExpectQuery(sanitizeQuery(oldestBlockedQuery)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGOldestBlockedCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGOldestBlockedCollector.Update: %s", err) + } + }() + expected := []MetricResult{ + {labels: labelMap{}, value: 100000, metricType: dto.MetricType_GAUGE}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} diff --git a/collector/pg_replication_slot_test.go b/collector/pg_replication_slot_test.go index 7e91ea261..6e390bd50 100644 --- a/collector/pg_replication_slot_test.go +++ b/collector/pg_replication_slot_test.go @@ -23,6 +23,7 @@ import ( ) func TestPgReplicationSlotCollectorActive(t *testing.T) { + db, mock, err := sqlmock.New() if err != nil { t.Fatalf("Error opening a stub db connection: %s", err) diff --git a/collector/pg_slow.go b/collector/pg_slow.go new file mode 100644 index 000000000..c0b44eb45 --- /dev/null +++ b/collector/pg_slow.go @@ -0,0 +1,82 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "context" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" +) + +const slowSubsystem = "slow" + +func init() { + registerCollector(slowSubsystem, defaultDisabled, NewPGSlowCollector) +} + +type PGSlowCollector struct { + log log.Logger +} + +func NewPGSlowCollector(config collectorConfig) (Collector, error) { + return &PGSlowCollector{log: config.logger}, nil +} + +var ( + slowQueries = prometheus.NewDesc( + prometheus.BuildFQName(namespace, slowSubsystem, "queries"), + "Current number of slow queries", + []string{}, + prometheus.Labels{}, + ) + + slowQuery = ` + SELECT + COUNT(*) AS queries + FROM + pg_catalog.pg_stat_activity + WHERE + state = 'active' AND (now() - query_start) > '1 seconds'::interval + ` +) + +func (PGSlowCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() + rows, err := db.QueryContext(ctx, + slowQuery) + + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + var queries float64 + + if err := rows.Scan(&queries); err != nil { + return err + } + + ch <- prometheus.MustNewConstMetric( + slowQueries, + prometheus.GaugeValue, + queries, + ) + } + if err := rows.Err(); err != nil { + return err + } + return nil +} diff --git a/collector/pg_slow_test.go b/collector/pg_slow_test.go new file mode 100644 index 000000000..eece77c2a --- /dev/null +++ b/collector/pg_slow_test.go @@ -0,0 +1,61 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package collector + +import ( + "context" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/smartystreets/goconvey/convey" +) + +func TestPGSlowCollector(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + inst := &instance{db: db} + columns := []string{ + "queries", + } + rows := sqlmock.NewRows(columns). + AddRow(25) + + mock.ExpectQuery(sanitizeQuery(slowQuery)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGSlowCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGSlowCollector.Update: %s", err) + } + }() + expected := []MetricResult{ + {labels: labelMap{}, value: 25, metricType: dto.MetricType_GAUGE}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} diff --git a/collector/pg_stat_activity_autovacuum.go b/collector/pg_stat_activity_autovacuum.go new file mode 100644 index 000000000..3ed37ed5b --- /dev/null +++ b/collector/pg_stat_activity_autovacuum.go @@ -0,0 +1,85 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "context" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" +) + +const statActivityAutovacuumSubsystem = "stat_activity_autovacuum" + +func init() { + registerCollector(statActivityAutovacuumSubsystem, defaultDisabled, NewPGStatActivityAutovacuumCollector) +} + +type PGStatActivityAutovacuumCollector struct { + log log.Logger +} + +func NewPGStatActivityAutovacuumCollector(config collectorConfig) (Collector, error) { + return &PGStatActivityAutovacuumCollector{log: config.logger}, nil +} + +var ( + statActivityAutovacuumAgeInSeconds = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statActivityAutovacuumSubsystem, "age_in_seconds"), + "The age of the vacuum process in seconds", + []string{"relname"}, + prometheus.Labels{}, + ) + + statActivityAutovacuumQuery = ` + SELECT + SPLIT_PART(query, '.', 2) AS relname, + EXTRACT(EPOCH FROM (clock_timestamp() - xact_start)) AS age_in_seconds + FROM + pg_catalog.pg_stat_activity + WHERE + query like 'autovacuum:%' AND + EXTRACT(EPOCH FROM (clock_timestamp() - xact_start)) > 300 + ` +) + +func (PGStatActivityAutovacuumCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() + rows, err := db.QueryContext(ctx, + statActivityAutovacuumQuery) + + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + var relname string + var ageInSeconds float64 + + if err := rows.Scan(&relname, &ageInSeconds); err != nil { + return err + } + + ch <- prometheus.MustNewConstMetric( + statActivityAutovacuumAgeInSeconds, + prometheus.GaugeValue, + ageInSeconds, relname, + ) + } + if err := rows.Err(); err != nil { + return err + } + return nil +} diff --git a/collector/pg_stat_activity_autovacuum_active.go b/collector/pg_stat_activity_autovacuum_active.go new file mode 100644 index 000000000..1ad960e3c --- /dev/null +++ b/collector/pg_stat_activity_autovacuum_active.go @@ -0,0 +1,91 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "context" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" +) + +const statActivityAutovacuumActiveSubsystem = "stat_activity_autovacuum_active" + +func init() { + registerCollector(statActivityAutovacuumActiveSubsystem, defaultDisabled, NewPGStatActivityAutovacuumActiveCollector) +} + +type PGStatActivityAutovacuumActiveCollector struct { + log log.Logger +} + +func NewPGStatActivityAutovacuumActiveCollector(config collectorConfig) (Collector, error) { + return &PGStatActivityAutovacuumActiveCollector{log: config.logger}, nil +} + +var ( + statActivityAutovacuumActiveWorkersCount = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statActivityAutovacuumActiveSubsystem, "workers"), + "Current number of statActivityAutovacuumActive queries", + []string{"phase", "mode"}, + prometheus.Labels{}, + ) + + statActivityAutovacuumActiveQuery = ` + SELECT + v.phase, + CASE + when a.query ~ '^autovacuum.*to prevent wraparound' then 'wraparound' + when a.query ~* '^vacuum' then 'user' + when a.pid is null then 'idle' + ELSE 'regular' + END as mode, + count(1) as workers_count + FROM pg_stat_progress_vacuum v + LEFT JOIN pg_catalog.pg_stat_activity a using (pid) + GROUP BY 1,2 + ` +) + +func (PGStatActivityAutovacuumActiveCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() + rows, err := db.QueryContext(ctx, + statActivityAutovacuumActiveQuery) + + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + var phase, mode string + var workersCount float64 + + if err := rows.Scan(&phase, &mode, &workersCount); err != nil { + return err + } + labels := []string{phase, mode} + + ch <- prometheus.MustNewConstMetric( + statActivityAutovacuumActiveWorkersCount, + prometheus.GaugeValue, + workersCount, + labels..., + ) + } + if err := rows.Err(); err != nil { + return err + } + return nil +} diff --git a/collector/pg_stat_activity_autovacuum_active_test.go b/collector/pg_stat_activity_autovacuum_active_test.go new file mode 100644 index 000000000..edde1ef90 --- /dev/null +++ b/collector/pg_stat_activity_autovacuum_active_test.go @@ -0,0 +1,62 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package collector + +import ( + "context" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/smartystreets/goconvey/convey" +) + +func TestPGStatActivityAutovacuumActiveCollector(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + inst := &instance{db: db} + columns := []string{ + "phase", + "mode", + "workers_count", + } + rows := sqlmock.NewRows(columns). + AddRow("Scanning heap", "regular", 2) + mock.ExpectQuery(sanitizeQuery(statActivityAutovacuumActiveQuery)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGStatActivityAutovacuumActiveCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGStatActivityAutovacuumActiveCollector.Update: %s", err) + } + }() + expected := []MetricResult{ + {labels: labelMap{"phase": "Scanning heap", "mode": "regular"}, value: 2, metricType: dto.MetricType_GAUGE}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} diff --git a/collector/pg_stat_activity_autovacuum_test.go b/collector/pg_stat_activity_autovacuum_test.go new file mode 100644 index 000000000..a6fcdbcad --- /dev/null +++ b/collector/pg_stat_activity_autovacuum_test.go @@ -0,0 +1,62 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package collector + +import ( + "context" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/smartystreets/goconvey/convey" +) + +func TestPGStatActivityAutovacuumCollector(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + inst := &instance{db: db} + columns := []string{ + "relname", + "timestamp_seconds", + } + rows := sqlmock.NewRows(columns). + AddRow("test", 3600) + + mock.ExpectQuery(sanitizeQuery(statActivityAutovacuumQuery)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGStatActivityAutovacuumCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGStatActivityAutovacuumCollector.Update: %s", err) + } + }() + expected := []MetricResult{ + {labels: labelMap{"relname": "test"}, value: 3600, metricType: dto.MetricType_GAUGE}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} diff --git a/collector/pg_stat_activity_summary.go b/collector/pg_stat_activity_summary.go new file mode 100644 index 000000000..29c435c56 --- /dev/null +++ b/collector/pg_stat_activity_summary.go @@ -0,0 +1,152 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "context" + "database/sql" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" +) + +const statActivitySummarySubsystem = "stat_activity_summary" + +func init() { + registerCollector(statActivitySummarySubsystem, defaultEnabled, NewPGStatActivitySummaryCollector) +} + +type PGStatActivitySummaryCollector struct { + log log.Logger +} + +func NewPGStatActivitySummaryCollector(config collectorConfig) (Collector, error) { + return &PGStatActivitySummaryCollector{log: config.logger}, nil +} + +var ( + statActivitySummaryActiveCount = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statActivitySummarySubsystem, "active_count"), + "Number of active queries at time of sample", + []string{"usename", "application", "endpoint", "command", "state", "wait_event", "wait_event_type"}, + prometheus.Labels{}, + ) + statActivitySummaryMaxTxAgeInSeconds = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statActivitySummarySubsystem, "max_tx_age_in_seconds"), + "Number of active queries at time of sample", + []string{"usename", "application", "endpoint", "command", "state", "wait_event", "wait_event_type"}, + prometheus.Labels{}, + ) + + statActivitySummaryQuery = ` + SELECT + usename AS usename, + a.matches[1] AS application, + a.matches[2] AS endpoint, + a.matches[3] AS command, + a.state AS state, + a.wait_event AS wait_event, + a.wait_event_type AS wait_event_type, + COUNT(*) active_count, + MAX(age_in_seconds) AS max_tx_age_in_seconds + FROM ( + SELECT + usename, + regexp_matches(query, '^\s*(?:\/\*(?:application:(\w+),?)?(?:correlation_id:\w+,?)?(?:jid:\w+,?)?(?:endpoint_id:([\w/\-\.:\#\s]+),?)?.*?\*\/)?\s*(\w+)') AS matches, + state, + wait_event, + wait_event_type, + EXTRACT(EPOCH FROM (clock_timestamp() - xact_start)) AS age_in_seconds + FROM + pg_catalog.pg_stat_activity + ) a + GROUP BY usename, application, endpoint, command, state, wait_event, wait_event_type + ORDER BY active_count DESC + ` +) + +func (PGStatActivitySummaryCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() + rows, err := db.QueryContext(ctx, + statActivitySummaryQuery) + + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + var usename, application, endpoint, command, state, waitEvent, waitEventType sql.NullString + var count, maxTxAge sql.NullFloat64 + + if err := rows.Scan(&usename, &application, &endpoint, &command, &state, &waitEvent, &waitEventType, &count, &maxTxAge); err != nil { + return err + } + usenameLabel := "unknown" + if usename.Valid { + usenameLabel = usename.String + } + applicationLabel := "unknown" + if application.Valid { + applicationLabel = application.String + } + endpointLabel := "unknown" + if endpoint.Valid { + endpointLabel = endpoint.String + } + commandLabel := "unknown" + if command.Valid { + commandLabel = command.String + } + stateLabel := "unknown" + if state.Valid { + stateLabel = state.String + } + waitEventLabel := "unknown" + if waitEvent.Valid { + waitEventLabel = waitEvent.String + } + waitEventTypeLabel := "unknown" + if waitEventType.Valid { + waitEventTypeLabel = waitEventType.String + } + labels := []string{usenameLabel, applicationLabel, endpointLabel, commandLabel, stateLabel, waitEventLabel, waitEventTypeLabel} + + countMetric := 0.0 + if count.Valid { + countMetric = count.Float64 + } + ch <- prometheus.MustNewConstMetric( + statActivitySummaryActiveCount, + prometheus.GaugeValue, + countMetric, + labels..., + ) + + maxTxAgeMetric := 0.0 + if maxTxAge.Valid { + maxTxAgeMetric = maxTxAge.Float64 + } + ch <- prometheus.MustNewConstMetric( + statActivitySummaryMaxTxAgeInSeconds, + prometheus.GaugeValue, + maxTxAgeMetric, + labels..., + ) + } + if err := rows.Err(); err != nil { + return err + } + return nil +} diff --git a/collector/pg_stat_user_indexes.go b/collector/pg_stat_user_indexes.go new file mode 100644 index 000000000..d7b289cc9 --- /dev/null +++ b/collector/pg_stat_user_indexes.go @@ -0,0 +1,136 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package collector + +import ( + "context" + "database/sql" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" +) + +func init() { + registerCollector(statUserIndexesSubsystem, defaultDisabled, NewPGStatUserIndexesCollector) +} + +type PGStatUserIndexesCollector struct { + log log.Logger +} + +const statUserIndexesSubsystem = "stat_user_indexes" + +func NewPGStatUserIndexesCollector(config collectorConfig) (Collector, error) { + return &PGStatUserIndexesCollector{log: config.logger}, nil +} + +var ( + statUserIndexesIdxScan = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statUserIndexesSubsystem, "idx_scans_total"), + "Number of index scans initiated on this index", + []string{"schemaname", "relname", "indexrelname"}, + prometheus.Labels{}, + ) + statUserIndexesIdxTupRead = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statUserIndexesSubsystem, "idx_tup_reads_total"), + "Number of index entries returned by scans on this index", + []string{"schemaname", "relname", "indexrelname"}, + prometheus.Labels{}, + ) + statUserIndexesIdxTupFetch = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statUserIndexesSubsystem, "idx_tup_fetches_total"), + "Number of live table rows fetched by simple index scans using this index", + []string{"schemaname", "relname", "indexrelname"}, + prometheus.Labels{}, + ) + + statUserIndexesQuery = ` + SELECT + schemaname, + relname, + indexrelname, + idx_scan, + idx_tup_read, + idx_tup_fetch + FROM pg_stat_user_indexes + ` +) + +func (c *PGStatUserIndexesCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() + rows, err := db.QueryContext(ctx, + statUserIndexesQuery) + + if err != nil { + return err + } + defer rows.Close() + for rows.Next() { + var schemaname, relname, indexrelname sql.NullString + var idxScan, idxTupRead, idxTupFetch sql.NullFloat64 + + if err := rows.Scan(&schemaname, &relname, &indexrelname, &idxScan, &idxTupRead, &idxTupFetch); err != nil { + return err + } + schemanameLabel := "unknown" + if schemaname.Valid { + schemanameLabel = schemaname.String + } + relnameLabel := "unknown" + if relname.Valid { + relnameLabel = relname.String + } + indexrelnameLabel := "unknown" + if indexrelname.Valid { + indexrelnameLabel = indexrelname.String + } + labels := []string{schemanameLabel, relnameLabel, indexrelnameLabel} + + idxScanMetric := 0.0 + if idxScan.Valid { + idxScanMetric = idxScan.Float64 + } + ch <- prometheus.MustNewConstMetric( + statUserIndexesIdxScan, + prometheus.CounterValue, + idxScanMetric, + labels..., + ) + + idxTupReadMetric := 0.0 + if idxTupRead.Valid { + idxTupReadMetric = idxTupRead.Float64 + } + ch <- prometheus.MustNewConstMetric( + statUserIndexesIdxTupRead, + prometheus.CounterValue, + idxTupReadMetric, + labels..., + ) + + idxTupFetchMetric := 0.0 + if idxTupFetch.Valid { + idxTupFetchMetric = idxTupFetch.Float64 + } + ch <- prometheus.MustNewConstMetric( + statUserIndexesIdxTupFetch, + prometheus.CounterValue, + idxTupFetchMetric, + labels..., + ) + } + if err := rows.Err(); err != nil { + return err + } + return nil +} diff --git a/collector/pg_stat_user_indexes_test.go b/collector/pg_stat_user_indexes_test.go new file mode 100644 index 000000000..b0c7fde19 --- /dev/null +++ b/collector/pg_stat_user_indexes_test.go @@ -0,0 +1,113 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package collector + +import ( + "context" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/smartystreets/goconvey/convey" +) + +func TestPgStatUserIndexesCollector(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + inst := &instance{db: db} + columns := []string{ + "schemaname", + "relname", + "indexrelname", + "idx_scan", + "idx_tup_read", + "idx_tup_fetch", + } + rows := sqlmock.NewRows(columns). + AddRow("public", "pgbench_accounts", "pgbench_accounts_pkey", 5, 6, 7) + + mock.ExpectQuery(sanitizeQuery(statUserIndexesQuery)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGStatUserIndexesCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGStatUserIndexesCollector.Update: %s", err) + } + }() + expected := []MetricResult{ + {labels: labelMap{"schemaname": "public", "relname": "pgbench_accounts", "indexrelname": "pgbench_accounts_pkey"}, value: 5, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"schemaname": "public", "relname": "pgbench_accounts", "indexrelname": "pgbench_accounts_pkey"}, value: 6, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"schemaname": "public", "relname": "pgbench_accounts", "indexrelname": "pgbench_accounts_pkey"}, value: 7, metricType: dto.MetricType_COUNTER}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} + +func TestPgStatUserIndexesCollectorNull(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + inst := &instance{db: db} + columns := []string{ + "schemaname", + "relname", + "indexrelname", + "idx_scan", + "idx_tup_read", + "idx_tup_fetch", + } + rows := sqlmock.NewRows(columns). + AddRow(nil, nil, nil, nil, nil, nil) + + mock.ExpectQuery(sanitizeQuery(statUserIndexesQuery)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGStatUserIndexesCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGStatUserIndexesCollector.Update: %s", err) + } + }() + expected := []MetricResult{ + {labels: labelMap{"schemaname": "unknown", "relname": "unknown", "indexrelname": "unknown"}, value: 0, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"schemaname": "unknown", "relname": "unknown", "indexrelname": "unknown"}, value: 0, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"schemaname": "unknown", "relname": "unknown", "indexrelname": "unknown"}, value: 0, metricType: dto.MetricType_COUNTER}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} diff --git a/collector/pg_stat_walreceiver.go b/collector/pg_stat_walreceiver.go new file mode 100644 index 000000000..deb9aa733 --- /dev/null +++ b/collector/pg_stat_walreceiver.go @@ -0,0 +1,306 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package collector + +import ( + "context" + "database/sql" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" +) + +func init() { + registerCollector(statWalReceiverSubsystem, defaultDisabled, NewPGStatWalReceiverCollector) +} + +type PGStatWalReceiverCollector struct { + log log.Logger +} + +const statWalReceiverSubsystem = "stat_wal_receiver" + +func NewPGStatWalReceiverCollector(config collectorConfig) (Collector, error) { + return &PGStatWalReceiverCollector{log: config.logger}, nil +} + +var ( + statWalReceiverStatus = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "status"), + "Activity status of the WAL receiver process", + []string{"upstream_host", "slot_name"}, + prometheus.Labels{}, + ) + statWalReceiverReceiveStartLsn = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "receive_start_lsn"), + "First write-ahead log location used when WAL receiver is started represented as a decimal", + []string{"upstream_host", "slot_name"}, + prometheus.Labels{}, + ) + statWalReceiverReceiveStartTli = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "receive_start_tli"), + "First timeline number used when WAL receiver is started", + []string{"upstream_host", "slot_name"}, + prometheus.Labels{}, + ) + statWalReceiverFlushedLSN = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "flushed_lsn"), + "Last write-ahead log location already received and flushed to disk, the initial value of this field being the first log location used when WAL receiver is started represented as a decimal", + []string{"upstream_host", "slot_name"}, + prometheus.Labels{}, + ) + statWalReceiverReceivedTli = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "received_tli"), + "Timeline number of last write-ahead log location received and flushed to disk", + []string{"upstream_host", "slot_name"}, + prometheus.Labels{}, + ) + statWalReceiverLastMsgSendTime = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "last_msg_send_time"), + "Send time of last message received from origin WAL sender", + []string{"upstream_host", "slot_name"}, + prometheus.Labels{}, + ) + statWalReceiverLastMsgReceiptTime = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "last_msg_receipt_time"), + "Send time of last message received from origin WAL sender", + []string{"upstream_host", "slot_name"}, + prometheus.Labels{}, + ) + statWalReceiverLatestEndLsn = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "latest_end_lsn"), + "Last write-ahead log location reported to origin WAL sender as integer", + []string{"upstream_host", "slot_name"}, + prometheus.Labels{}, + ) + statWalReceiverLatestEndTime = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "latest_end_time"), + "Time of last write-ahead log location reported to origin WAL sender", + []string{"upstream_host", "slot_name"}, + prometheus.Labels{}, + ) + statWalReceiverUpstreamNode = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "upstream_node"), + "Node ID of the upstream node", + []string{"upstream_host", "slot_name"}, + prometheus.Labels{}, + ) + + pgStatWalColumnQuery = ` + SELECT + column_name + FROM information_schema.columns + WHERE + table_name = 'pg_stat_wal_receiver' and + column_name = 'flushed_lsn' + ` + + pgStatWalReceiverQueryWithNoFlushedLSN = ` + SELECT + trim(both '''' from substring(conninfo from 'host=([^ ]*)')) as upstream_host, + slot_name, + case status + when 'stopped' then 0 + when 'starting' then 1 + when 'streaming' then 2 + when 'waiting' then 3 + when 'restarting' then 4 + when 'stopping' then 5 else -1 + end as status, + (receive_start_lsn- '0/0') % (2^52)::bigint as receive_start_lsn, + receive_start_tli, + received_tli, + extract(epoch from last_msg_send_time) as last_msg_send_time, + extract(epoch from last_msg_receipt_time) as last_msg_receipt_time, + (latest_end_lsn - '0/0') % (2^52)::bigint as latest_end_lsn, + extract(epoch from latest_end_time) as latest_end_time, + substring(slot_name from 'repmgr_slot_([0-9]*)') as upstream_node + FROM pg_catalog.pg_stat_wal_receiver + ` + + pgStatWalReceiverQueryWithFlushedLSN = ` + SELECT + trim(both '''' from substring(conninfo from 'host=([^ ]*)')) as upstream_host, + slot_name, + case status + when 'stopped' then 0 + when 'starting' then 1 + when 'streaming' then 2 + when 'waiting' then 3 + when 'restarting' then 4 + when 'stopping' then 5 else -1 + end as status, + (receive_start_lsn- '0/0') % (2^52)::bigint as receive_start_lsn, + receive_start_tli, + (flushed_lsn- '0/0') % (2^52)::bigint as flushed_lsn, + received_tli, + extract(epoch from last_msg_send_time) as last_msg_send_time, + extract(epoch from last_msg_receipt_time) as last_msg_receipt_time, + (latest_end_lsn - '0/0') % (2^52)::bigint as latest_end_lsn, + extract(epoch from latest_end_time) as latest_end_time, + substring(slot_name from 'repmgr_slot_([0-9]*)') as upstream_node + FROM pg_catalog.pg_stat_wal_receiver + ` +) + +func (c *PGStatWalReceiverCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() + hasFlushedLSNRows, err := db.QueryContext(ctx, pgStatWalColumnQuery) + if err != nil { + return err + } + + defer hasFlushedLSNRows.Close() + hasFlushedLSN := hasFlushedLSNRows.Next() + var query string + if hasFlushedLSN { + query = pgStatWalReceiverQueryWithFlushedLSN + } else { + query = pgStatWalReceiverQueryWithNoFlushedLSN + } + rows, err := db.QueryContext(ctx, query) + if err != nil { + return err + } + defer rows.Close() + for rows.Next() { + var upstreamHost, slotName sql.NullString + var status, receiveStartLsn, receiveStartTli, flushedLsn, receivedTli, latestEndLsn, upstreamNode sql.NullInt64 + var lastMsgSendTime, lastMsgReceiptTime, latestEndTime sql.NullFloat64 + + if hasFlushedLSN { + if err := rows.Scan(&upstreamHost, &slotName, &status, &receiveStartLsn, &receiveStartTli, &flushedLsn, &receivedTli, &lastMsgSendTime, &lastMsgReceiptTime, &latestEndLsn, &latestEndTime, &upstreamNode); err != nil { + return err + } + } else { + if err := rows.Scan(&upstreamHost, &slotName, &status, &receiveStartLsn, &receiveStartTli, &receivedTli, &lastMsgSendTime, &lastMsgReceiptTime, &latestEndLsn, &latestEndTime, &upstreamNode); err != nil { + return err + } + } + upstreamHostLabel := "unknown" + if upstreamHost.Valid { + upstreamHostLabel = upstreamHost.String + } + slotNameLabel := "unknown" + if slotName.Valid { + slotNameLabel = slotName.String + } + labels := []string{upstreamHostLabel, slotNameLabel} + + statusMetric := 0.0 + if status.Valid { + statusMetric = float64(status.Int64) + } + ch <- prometheus.MustNewConstMetric( + statWalReceiverStatus, + prometheus.GaugeValue, + statusMetric, + labels...) + + receiveStartLsnMetric := 0.0 + if receiveStartLsn.Valid { + receiveStartLsnMetric = float64(receiveStartLsn.Int64) + } + ch <- prometheus.MustNewConstMetric( + statWalReceiverReceiveStartLsn, + prometheus.CounterValue, + receiveStartLsnMetric, + labels...) + + receiveStartTliMetric := 0.0 + if receiveStartTli.Valid { + receiveStartTliMetric = float64(receiveStartTli.Int64) + } + ch <- prometheus.MustNewConstMetric( + statWalReceiverReceiveStartTli, + prometheus.GaugeValue, + receiveStartTliMetric, + labels...) + + if hasFlushedLSN { + flushedLsnMetric := 0.0 + if flushedLsn.Valid { + flushedLsnMetric = float64(flushedLsn.Int64) + } + ch <- prometheus.MustNewConstMetric( + statWalReceiverFlushedLSN, + prometheus.CounterValue, + flushedLsnMetric, + labels...) + } + + receivedTliMetric := 0.0 + if receivedTli.Valid { + receivedTliMetric = float64(receivedTli.Int64) + } + ch <- prometheus.MustNewConstMetric( + statWalReceiverReceivedTli, + prometheus.GaugeValue, + receivedTliMetric, + labels...) + + lastMsgSendTimeMetric := 0.0 + if lastMsgSendTime.Valid { + lastMsgSendTimeMetric = float64(lastMsgSendTime.Float64) + } + ch <- prometheus.MustNewConstMetric( + statWalReceiverLastMsgSendTime, + prometheus.CounterValue, + lastMsgSendTimeMetric, + labels...) + + lastMsgReceiptTimeMetric := 0.0 + if lastMsgReceiptTime.Valid { + lastMsgReceiptTimeMetric = float64(lastMsgReceiptTime.Float64) + } + ch <- prometheus.MustNewConstMetric( + statWalReceiverLastMsgReceiptTime, + prometheus.CounterValue, + lastMsgReceiptTimeMetric, + labels...) + + latestEndLsnMetric := 0.0 + if latestEndLsn.Valid { + latestEndLsnMetric = float64(latestEndLsn.Int64) + } + ch <- prometheus.MustNewConstMetric( + statWalReceiverLatestEndLsn, + prometheus.CounterValue, + latestEndLsnMetric, + labels...) + + latestEndTimeMetric := 0.0 + if latestEndTime.Valid { + latestEndTimeMetric = float64(latestEndTime.Float64) + } + ch <- prometheus.MustNewConstMetric( + statWalReceiverLatestEndTime, + prometheus.CounterValue, + latestEndTimeMetric, + labels...) + + upstreamNodeMetric := 0.0 + if upstreamNode.Valid { + upstreamNodeMetric = float64(upstreamNode.Int64) + } + ch <- prometheus.MustNewConstMetric( + statWalReceiverUpstreamNode, + prometheus.GaugeValue, + upstreamNodeMetric, + labels...) + } + if err := rows.Err(); err != nil { + return err + } + return nil +} diff --git a/collector/pg_stat_walreceiver_test.go b/collector/pg_stat_walreceiver_test.go new file mode 100644 index 000000000..6a7dc1cce --- /dev/null +++ b/collector/pg_stat_walreceiver_test.go @@ -0,0 +1,266 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package collector + +import ( + "context" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/smartystreets/goconvey/convey" +) + +func TestPGStatWalReceiverCollectorWithFlushedLSN(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + + inst := &instance{db: db} + infoSchemaColumns := []string{ + "column_name", + } + + infoSchemaRows := sqlmock.NewRows(infoSchemaColumns). + AddRow( + "flushed_lsn", + ) + + mock.ExpectQuery(sanitizeQuery(pgStatWalColumnQuery)).WillReturnRows(infoSchemaRows) + + columns := []string{ + "upstream_host", + "slot_name", + "status", + "receive_start_lsn", + "receive_start_tli", + "flushed_lsn", + "received_tli", + "last_msg_send_time", + "last_msg_receipt_time", + "latest_end_lsn", + "latest_end_time", + "upstream_node", + } + rows := sqlmock.NewRows(columns). + AddRow( + "foo", + "bar", + 2, + 1200668684563608, + 1687321285, + 1200668684563609, + 1687321280, + 1687321275, + 1687321276, + 1200668684563610, + 1687321277, + 5, + ) + mock.ExpectQuery(sanitizeQuery(pgStatWalReceiverQueryWithFlushedLSN)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGStatWalReceiverCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PgStatWalReceiverCollector.Update: %s", err) + } + }() + expected := []MetricResult{ + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 2, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1200668684563608, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1687321285, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1200668684563609, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1687321280, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1687321275, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1687321276, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1200668684563610, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1687321277, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 5, metricType: dto.MetricType_GAUGE}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } + +} + +func TestPGStatWalReceiverCollectorWithNoFlushedLSN(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + + inst := &instance{db: db} + infoSchemaColumns := []string{ + "column_name", + } + + infoSchemaRows := sqlmock.NewRows(infoSchemaColumns) + + mock.ExpectQuery(sanitizeQuery(pgStatWalColumnQuery)).WillReturnRows(infoSchemaRows) + + columns := []string{ + "upstream_host", + "slot_name", + "status", + "receive_start_lsn", + "receive_start_tli", + "received_tli", + "last_msg_send_time", + "last_msg_receipt_time", + "latest_end_lsn", + "latest_end_time", + "upstream_node", + } + rows := sqlmock.NewRows(columns). + AddRow( + "foo", + "bar", + 2, + 1200668684563608, + 1687321285, + 1687321280, + 1687321275, + 1687321276, + 1200668684563610, + 1687321277, + 5, + ) + mock.ExpectQuery(sanitizeQuery(pgStatWalReceiverQueryWithNoFlushedLSN)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGStatWalReceiverCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PgStatWalReceiverCollector.Update: %s", err) + } + }() + expected := []MetricResult{ + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 2, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1200668684563608, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1687321285, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1687321280, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1687321275, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1687321276, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1200668684563610, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 1687321277, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "foo", "slot_name": "bar"}, value: 5, metricType: dto.MetricType_GAUGE}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } + +} + +func TestPGStatWalReceiverCollectorWithFlushedLSNNull(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + + inst := &instance{db: db} + infoSchemaColumns := []string{ + "column_name", + } + + infoSchemaRows := sqlmock.NewRows(infoSchemaColumns). + AddRow( + "flushed_lsn", + ) + + mock.ExpectQuery(sanitizeQuery(pgStatWalColumnQuery)).WillReturnRows(infoSchemaRows) + + columns := []string{ + "upstream_host", + "slot_name", + "status", + "receive_start_lsn", + "receive_start_tli", + "flushed_lsn", + "received_tli", + "last_msg_send_time", + "last_msg_receipt_time", + "latest_end_lsn", + "latest_end_time", + "upstream_node", + } + rows := sqlmock.NewRows(columns). + AddRow( + nil, + nil, + nil, + nil, + nil, + nil, + nil, + nil, + nil, + nil, + nil, + nil, + ) + mock.ExpectQuery(sanitizeQuery(pgStatWalReceiverQueryWithFlushedLSN)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGStatWalReceiverCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PgStatWalReceiverCollector.Update: %s", err) + } + }() + expected := []MetricResult{ + {labels: labelMap{"upstream_host": "unknown", "slot_name": "unknown"}, value: 0, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"upstream_host": "unknown", "slot_name": "unknown"}, value: 0, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "unknown", "slot_name": "unknown"}, value: 0, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"upstream_host": "unknown", "slot_name": "unknown"}, value: 0, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "unknown", "slot_name": "unknown"}, value: 0, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"upstream_host": "unknown", "slot_name": "unknown"}, value: 0, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "unknown", "slot_name": "unknown"}, value: 0, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "unknown", "slot_name": "unknown"}, value: 0, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "unknown", "slot_name": "unknown"}, value: 0, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"upstream_host": "unknown", "slot_name": "unknown"}, value: 0, metricType: dto.MetricType_GAUGE}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } + +} diff --git a/collector/pg_statio_user_indexes.go b/collector/pg_statio_user_indexes.go new file mode 100644 index 000000000..18a29952b --- /dev/null +++ b/collector/pg_statio_user_indexes.go @@ -0,0 +1,118 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package collector + +import ( + "context" + "database/sql" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" +) + +func init() { + registerCollector(statioUserIndexesSubsystem, defaultDisabled, NewPGStatioUserIndexesCollector) +} + +type PGStatioUserIndexesCollector struct { + log log.Logger +} + +const statioUserIndexesSubsystem = "statio_user_indexes" + +func NewPGStatioUserIndexesCollector(config collectorConfig) (Collector, error) { + return &PGStatioUserIndexesCollector{log: config.logger}, nil +} + +var ( + statioUserIndexesIdxBlksRead = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statioUserIndexesSubsystem, "idx_blks_read"), + "Number of disk blocks read from this index", + []string{"schemaname", "relname", "indexrelname"}, + prometheus.Labels{}, + ) + statioUserIndexesIdxBlksHit = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statioUserIndexesSubsystem, "idx_blks_hit"), + "Number of buffer hits in this index", + []string{"schemaname", "relname", "indexrelname"}, + prometheus.Labels{}, + ) + + statioUserIndexesQuery = ` + SELECT + schemaname, + relname, + indexrelname, + idx_blks_read, + idx_blks_hit + FROM pg_statio_user_indexes + ` +) + +func (c *PGStatioUserIndexesCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() + rows, err := db.QueryContext(ctx, + statioUserIndexesQuery) + + if err != nil { + return err + } + defer rows.Close() + for rows.Next() { + var schemaname, relname, indexrelname sql.NullString + var idxBlksRead, idxBlksHit sql.NullFloat64 + + if err := rows.Scan(&schemaname, &relname, &indexrelname, &idxBlksRead, &idxBlksHit); err != nil { + return err + } + schemanameLabel := "unknown" + if schemaname.Valid { + schemanameLabel = schemaname.String + } + relnameLabel := "unknown" + if relname.Valid { + relnameLabel = relname.String + } + indexrelnameLabel := "unknown" + if indexrelname.Valid { + indexrelnameLabel = indexrelname.String + } + labels := []string{schemanameLabel, relnameLabel, indexrelnameLabel} + + idxBlksReadMetric := 0.0 + if idxBlksRead.Valid { + idxBlksReadMetric = idxBlksRead.Float64 + } + ch <- prometheus.MustNewConstMetric( + statioUserIndexesIdxBlksRead, + prometheus.CounterValue, + idxBlksReadMetric, + labels..., + ) + + idxBlksHitMetric := 0.0 + if idxBlksHit.Valid { + idxBlksHitMetric = idxBlksHit.Float64 + } + ch <- prometheus.MustNewConstMetric( + statioUserIndexesIdxBlksHit, + prometheus.CounterValue, + idxBlksHitMetric, + labels..., + ) + } + if err := rows.Err(); err != nil { + return err + } + return nil +} diff --git a/collector/pg_statio_user_indexes_test.go b/collector/pg_statio_user_indexes_test.go new file mode 100644 index 000000000..174012162 --- /dev/null +++ b/collector/pg_statio_user_indexes_test.go @@ -0,0 +1,109 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package collector + +import ( + "context" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/smartystreets/goconvey/convey" +) + +func TestPgStatioUserIndexesCollector(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + inst := &instance{db: db} + columns := []string{ + "schemaname", + "relname", + "indexrelname", + "idx_blks_read", + "idx_blks_hit", + } + rows := sqlmock.NewRows(columns). + AddRow("public", "pgtest_accounts", "pgtest_accounts_pkey", 8, 9) + + mock.ExpectQuery(sanitizeQuery(statioUserIndexesQuery)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGStatioUserIndexesCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGStatioUserIndexesCollector.Update: %s", err) + } + }() + expected := []MetricResult{ + {labels: labelMap{"schemaname": "public", "relname": "pgtest_accounts", "indexrelname": "pgtest_accounts_pkey"}, value: 8, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"schemaname": "public", "relname": "pgtest_accounts", "indexrelname": "pgtest_accounts_pkey"}, value: 9, metricType: dto.MetricType_COUNTER}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} + +func TestPgStatioUserIndexesCollectorNull(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + inst := &instance{db: db} + columns := []string{ + "schemaname", + "relname", + "indexrelname", + "idx_blks_read", + "idx_blks_hit", + } + rows := sqlmock.NewRows(columns). + AddRow(nil, nil, nil, nil, nil) + + mock.ExpectQuery(sanitizeQuery(statioUserIndexesQuery)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGStatioUserIndexesCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGStatioUserIndexesCollector.Update: %s", err) + } + }() + expected := []MetricResult{ + {labels: labelMap{"schemaname": "unknown", "relname": "unknown", "indexrelname": "unknown"}, value: 0, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{"schemaname": "unknown", "relname": "unknown", "indexrelname": "unknown"}, value: 0, metricType: dto.MetricType_COUNTER}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} diff --git a/collector/pg_stuck_idle_in_transaction.go b/collector/pg_stuck_idle_in_transaction.go new file mode 100644 index 000000000..e6839d4b3 --- /dev/null +++ b/collector/pg_stuck_idle_in_transaction.go @@ -0,0 +1,86 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "context" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" +) + +const stuckIdleInTransactionSubsystem = "stuck_in_transaction" + +func init() { + registerCollector(stuckIdleInTransactionSubsystem, defaultDisabled, NewPGStuckIdleInTransactionCollector) +} + +type PGStuckIdleInTransactionCollector struct { + log log.Logger +} + +func NewPGStuckIdleInTransactionCollector(config collectorConfig) (Collector, error) { + return &PGStuckIdleInTransactionCollector{log: config.logger}, nil +} + +var ( + stuckIdleInTransactionQueries = prometheus.NewDesc( + prometheus.BuildFQName(namespace, longRunningTransactionsSubsystem, "queries"), + "Current number of queries that are stuck being idle in transactions", + []string{}, + prometheus.Labels{}, + ) + + stuckIdleInTransactionQuery = ` + SELECT + COUNT(*) AS queries + FROM pg_catalog.pg_stat_activity + WHERE + state = 'idle in transaction' AND (now() - query_start) > '10 minutes'::interval + ` +) + +func (PGStuckIdleInTransactionCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() + rows, err := db.QueryContext(ctx, + stuckIdleInTransactionQuery) + + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + var queries float64 + + if err := rows.Scan(&queries); err != nil { + return err + } + + ch <- prometheus.MustNewConstMetric( + stuckIdleInTransactionQueries, + prometheus.GaugeValue, + queries, + ) + ch <- prometheus.MustNewConstMetric( + longRunningTransactionsAgeInSeconds, + prometheus.GaugeValue, + queries, + ) + } + if err := rows.Err(); err != nil { + return err + } + return nil +} diff --git a/collector/pg_stuck_idle_in_transaction_test.go b/collector/pg_stuck_idle_in_transaction_test.go new file mode 100644 index 000000000..1313806a9 --- /dev/null +++ b/collector/pg_stuck_idle_in_transaction_test.go @@ -0,0 +1,61 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package collector + +import ( + "context" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/smartystreets/goconvey/convey" +) + +func TestPGStuckIdleInTransactionCollector(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + inst := &instance{db: db} + columns := []string{ + "queries", + } + rows := sqlmock.NewRows(columns). + AddRow(30) + + mock.ExpectQuery(sanitizeQuery(stuckIdleInTransactionQuery)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGStuckIdleInTransactionCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGStuckIdleInTransactionCollector.Update: %s", err) + } + }() + expected := []MetricResult{ + {labels: labelMap{}, value: 30, metricType: dto.MetricType_GAUGE}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} diff --git a/collector/pg_total_relation_size.go b/collector/pg_total_relation_size.go new file mode 100644 index 000000000..b9024833e --- /dev/null +++ b/collector/pg_total_relation_size.go @@ -0,0 +1,98 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "context" + "database/sql" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" +) + +const totalRelationSizeSubsystem = "total_relation_size" + +func init() { + registerCollector(totalRelationSizeSubsystem, defaultDisabled, NewPGTotalRelationSizeCollector) +} + +type PGTotalRelationSizeCollector struct { + log log.Logger +} + +func NewPGTotalRelationSizeCollector(config collectorConfig) (Collector, error) { + return &PGTotalRelationSizeCollector{log: config.logger}, nil +} + +var ( + totalRelationSizeBytes = prometheus.NewDesc( + prometheus.BuildFQName(namespace, totalRelationSizeSubsystem, "bytes"), + "total disk space usage for the specified table and associated indexes", + []string{"schemaname", "relname"}, + prometheus.Labels{}, + ) + + totalRelationSizeQuery = ` + SELECT + relnamespace::regnamespace as schemaname, + relname as relname, + pg_total_relation_size(oid) bytes + FROM pg_class + WHERE relkind = 'r'; + ` +) + +func (PGTotalRelationSizeCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() + rows, err := db.QueryContext(ctx, + totalRelationSizeQuery) + + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + var schemaname, relname sql.NullString + var bytes sql.NullFloat64 + + if err := rows.Scan(&schemaname, &relname, &bytes); err != nil { + return err + } + schemanameLabel := "unknown" + if schemaname.Valid { + schemanameLabel = schemaname.String + } + relnameLabel := "unknown" + if relname.Valid { + relnameLabel = relname.String + } + labels := []string{schemanameLabel, relnameLabel} + + bytesMetric := 0.0 + if bytes.Valid { + bytesMetric = bytes.Float64 + } + ch <- prometheus.MustNewConstMetric( + totalRelationSizeBytes, + prometheus.GaugeValue, + bytesMetric, + labels..., + ) + } + if err := rows.Err(); err != nil { + return err + } + return nil +} diff --git a/collector/pg_total_relation_size_test.go b/collector/pg_total_relation_size_test.go new file mode 100644 index 000000000..c13bd8045 --- /dev/null +++ b/collector/pg_total_relation_size_test.go @@ -0,0 +1,103 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package collector + +import ( + "context" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/smartystreets/goconvey/convey" +) + +func TestPgTotalRelationSizeCollector(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + inst := &instance{db: db} + columns := []string{ + "schemaname", + "relname", + "bytes", + } + rows := sqlmock.NewRows(columns). + AddRow("public", "bar", 200) + + mock.ExpectQuery(sanitizeQuery(totalRelationSizeQuery)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGTotalRelationSizeCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGTotalRelationSizeCollector.Update: %s", err) + } + }() + expected := []MetricResult{ + {labels: labelMap{"schemaname": "public", "relname": "bar"}, value: 200, metricType: dto.MetricType_GAUGE}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} + +func TestPgTotalRelationSizeCollectorNull(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + inst := &instance{db: db} + columns := []string{ + "schemaname", + "relname", + "bytes", + } + rows := sqlmock.NewRows(columns). + AddRow(nil, nil, nil) + + mock.ExpectQuery(sanitizeQuery(totalRelationSizeQuery)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGTotalRelationSizeCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGTotalRelationSizeCollector.Update: %s", err) + } + }() + expected := []MetricResult{ + {labels: labelMap{"schemaname": "unknown", "relname": "unknown"}, value: 0, metricType: dto.MetricType_GAUGE}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} diff --git a/collector/pg_xid.go b/collector/pg_xid.go new file mode 100644 index 000000000..c5db64328 --- /dev/null +++ b/collector/pg_xid.go @@ -0,0 +1,99 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "context" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" +) + +const xidSubsystem = "xid" + +func init() { + registerCollector(xidSubsystem, defaultDisabled, NewPGXidCollector) +} + +type PGXidCollector struct { + log log.Logger +} + +func NewPGXidCollector(config collectorConfig) (Collector, error) { + return &PGXidCollector{log: config.logger}, nil +} + +var ( + xidCurrent = prometheus.NewDesc( + prometheus.BuildFQName(namespace, xidSubsystem, "current"), + "Current 64-bit transaction id of the query used to collect this metric (truncated to low 52 bits)", + []string{}, prometheus.Labels{}, + ) + xidXmin = prometheus.NewDesc( + prometheus.BuildFQName(namespace, xidSubsystem, "xmin"), + "Oldest transaction id of a transaction still in progress, i.e. not known committed or aborted (truncated to low 52 bits)", + []string{}, prometheus.Labels{}, + ) + xidXminAge = prometheus.NewDesc( + prometheus.BuildFQName(namespace, xidSubsystem, "xmin_age"), + "Age of oldest transaction still not committed or aborted measured in transaction ids", + []string{}, prometheus.Labels{}, + ) + + xidQuery = ` + SELECT + CASE WHEN pg_is_in_recovery() THEN 'NaN'::float ELSE txid_current() % (2^52)::bigint END AS current, + CASE WHEN pg_is_in_recovery() THEN 'NaN'::float ELSE txid_snapshot_xmin(txid_current_snapshot()) % (2^52)::bigint END AS xmin, + CASE WHEN pg_is_in_recovery() THEN 'NaN'::float ELSE txid_current() - txid_snapshot_xmin(txid_current_snapshot()) END AS xmin_age + ` +) + +func (PGXidCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() + rows, err := db.QueryContext(ctx, + xidQuery) + + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + var current, xmin, xminAge float64 + + if err := rows.Scan(¤t, &xmin, &xminAge); err != nil { + return err + } + + ch <- prometheus.MustNewConstMetric( + xidCurrent, + prometheus.CounterValue, + current, + ) + ch <- prometheus.MustNewConstMetric( + xidXmin, + prometheus.CounterValue, + xmin, + ) + ch <- prometheus.MustNewConstMetric( + xidXminAge, + prometheus.GaugeValue, + xminAge, + ) + } + if err := rows.Err(); err != nil { + return err + } + return nil +} diff --git a/collector/pg_xid_test.go b/collector/pg_xid_test.go new file mode 100644 index 000000000..6de90fcfa --- /dev/null +++ b/collector/pg_xid_test.go @@ -0,0 +1,111 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package collector + +import ( + "context" + "math" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/smartystreets/goconvey/convey" +) + +func TestPgXidCollector(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + inst := &instance{db: db} + columns := []string{ + "current", + "xmin", + "xmin_age", + } + rows := sqlmock.NewRows(columns). + AddRow(22, 25, 30) + + mock.ExpectQuery(sanitizeQuery(xidQuery)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGXidCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGXidCollector.Update: %s", err) + } + }() + expected := []MetricResult{ + {labels: labelMap{}, value: 22, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{}, value: 25, metricType: dto.MetricType_COUNTER}, + {labels: labelMap{}, value: 30, metricType: dto.MetricType_GAUGE}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} + +func TestPgNanCollector(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + inst := &instance{db: db} + columns := []string{ + "current", + "xmin", + "xmin_age", + } + rows := sqlmock.NewRows(columns). + AddRow(math.NaN(), math.NaN(), math.NaN()) + + mock.ExpectQuery(sanitizeQuery(xidQuery)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGXidCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGXidCollector.Update: %s", err) + } + }() + expected := []MetricResult{ + {labels: labelMap{}, value: math.NaN(), metricType: dto.MetricType_COUNTER}, + {labels: labelMap{}, value: math.NaN(), metricType: dto.MetricType_COUNTER}, + {labels: labelMap{}, value: math.NaN(), metricType: dto.MetricType_GAUGE}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + + convey.So(expect.labels, convey.ShouldResemble, m.labels) + convey.So(math.IsNaN(m.value), convey.ShouldResemble, math.IsNaN(expect.value)) + convey.So(expect.metricType, convey.ShouldEqual, m.metricType) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} diff --git a/collector/pg_xlog_location.go b/collector/pg_xlog_location.go new file mode 100644 index 000000000..b281d7a51 --- /dev/null +++ b/collector/pg_xlog_location.go @@ -0,0 +1,80 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "context" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" +) + +const xlogLocationSubsystem = "xlog_location" + +func init() { + registerCollector(xlogLocationSubsystem, defaultDisabled, NewPGXlogLocationCollector) +} + +type PGXlogLocationCollector struct { + log log.Logger +} + +func NewPGXlogLocationCollector(config collectorConfig) (Collector, error) { + return &PGXlogLocationCollector{log: config.logger}, nil +} + +var ( + xlogLocationBytes = prometheus.NewDesc( + prometheus.BuildFQName(namespace, xlogLocationSubsystem, "bytes"), + "Postgres LSN (log sequence number) being generated on primary or replayed on replica (truncated to low 52 bits)", + []string{}, + prometheus.Labels{}, + ) + + xlogLocationQuery = ` + SELECT CASE + WHEN pg_is_in_recovery() THEN (pg_last_xlog_replay_location() - '0/0') % (2^52)::bigint + ELSE (pg_current_xlog_location() - '0/0') % (2^52)::bigint + END AS bytes + ` +) + +func (PGXlogLocationCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() + rows, err := db.QueryContext(ctx, + xlogLocationQuery) + + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + var bytes float64 + + if err := rows.Scan(&bytes); err != nil { + return err + } + + ch <- prometheus.MustNewConstMetric( + xlogLocationBytes, + prometheus.CounterValue, + bytes, + ) + } + if err := rows.Err(); err != nil { + return err + } + return nil +} diff --git a/collector/pg_xlog_location_test.go b/collector/pg_xlog_location_test.go new file mode 100644 index 000000000..9071a2cc1 --- /dev/null +++ b/collector/pg_xlog_location_test.go @@ -0,0 +1,61 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package collector + +import ( + "context" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/smartystreets/goconvey/convey" +) + +func TestPGXlogLocationCollector(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + inst := &instance{db: db} + columns := []string{ + "bytes", + } + rows := sqlmock.NewRows(columns). + AddRow(53401) + + mock.ExpectQuery(sanitizeQuery(xlogLocationQuery)).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGXlogLocationCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGXlogLocationCollector.Update: %s", err) + } + }() + expected := []MetricResult{ + {labels: labelMap{}, value: 53401, metricType: dto.MetricType_COUNTER}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +}