Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[refactor] - s3 metrics #3760

Merged
merged 2 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 18 additions & 14 deletions pkg/sources/s3/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
type metricsCollector interface {
// Object metrics.

RecordObjectScanned(bucket string)
RecordObjectSkipped(bucket, reason string)
RecordObjectScanned(bucket string, sizeBytes float64)
RecordObjectSkipped(bucket, reason string, sizeBytes float64)
RecordObjectError(bucket string)

// Role metrics.
Expand All @@ -22,8 +22,8 @@ type metricsCollector interface {
}

type collector struct {
objectsScanned *prometheus.CounterVec
objectsSkipped *prometheus.CounterVec
objectsScanned *prometheus.HistogramVec
objectsSkipped *prometheus.HistogramVec
objectsErrors *prometheus.CounterVec
rolesScanned *prometheus.GaugeVec
bucketsPerRole *prometheus.GaugeVec
Expand All @@ -33,18 +33,22 @@ var metricsInstance metricsCollector

func init() {
metricsInstance = &collector{
objectsScanned: promauto.NewCounterVec(prometheus.CounterOpts{
objectsScanned: promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "objects_scanned_total",
Help: "Total number of S3 objects successfully scanned",
Name: "objects_scanned_bytes",
Help: "Size distribution of successfully scanned S3 objects in bytes",
// 64B, 512B, 4KB, 32KB, 256KB, 2MB, 16MB, 128MB, 1GB.
Buckets: prometheus.ExponentialBuckets(64, 8, 9),
}, []string{"bucket"}),

objectsSkipped: promauto.NewCounterVec(prometheus.CounterOpts{
objectsSkipped: promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "objects_skipped_total",
Help: "Total number of S3 objects skipped during scan",
Name: "objects_skipped_bytes",
Help: "Size distribution of skipped S3 objects in bytes",
// 64B, 512B, 4KB, 32KB, 256KB, 2MB, 16MB, 128MB, 1GB.
Buckets: prometheus.ExponentialBuckets(64, 8, 9),
}, []string{"bucket", "reason"}),

objectsErrors: promauto.NewCounterVec(prometheus.CounterOpts{
Expand All @@ -70,12 +74,12 @@ func init() {
}
}

func (c *collector) RecordObjectScanned(bucket string) {
c.objectsScanned.WithLabelValues(bucket).Inc()
func (c *collector) RecordObjectScanned(bucket string, sizeBytes float64) {
c.objectsScanned.WithLabelValues(bucket).Observe(sizeBytes)
}

func (c *collector) RecordObjectSkipped(bucket, reason string) {
c.objectsSkipped.WithLabelValues(bucket, reason).Inc()
func (c *collector) RecordObjectSkipped(bucket, reason string, sizeBytes float64) {
c.objectsSkipped.WithLabelValues(bucket, reason).Observe(sizeBytes)
}

func (c *collector) RecordObjectError(bucket string) {
Expand Down
16 changes: 8 additions & 8 deletions pkg/sources/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ func (s *Source) pageChunker(

for objIdx, obj := range metadata.page.Contents {
if obj == nil {
s.metricsCollector.RecordObjectSkipped(metadata.bucket, "nil_object")
s.metricsCollector.RecordObjectSkipped(metadata.bucket, "nil_object", 0)
if err := s.checkpointer.UpdateObjectCompletion(ctx, objIdx, metadata.bucket, metadata.page.Contents); err != nil {
ctx.Logger().Error(err, "could not update progress for nil object")
}
Expand All @@ -448,7 +448,7 @@ func (s *Source) pageChunker(
// Skip GLACIER and GLACIER_IR objects.
if obj.StorageClass == nil || strings.Contains(*obj.StorageClass, "GLACIER") {
ctx.Logger().V(5).Info("Skipping object in storage class", "storage_class", *obj.StorageClass)
s.metricsCollector.RecordObjectSkipped(metadata.bucket, "storage_class")
s.metricsCollector.RecordObjectSkipped(metadata.bucket, "storage_class", float64(*obj.Size))
if err := s.checkpointer.UpdateObjectCompletion(ctx, objIdx, metadata.bucket, metadata.page.Contents); err != nil {
ctx.Logger().Error(err, "could not update progress for glacier object")
}
Expand All @@ -458,7 +458,7 @@ func (s *Source) pageChunker(
// Ignore large files.
if *obj.Size > s.maxObjectSize {
ctx.Logger().V(5).Info("Skipping %d byte file (over maxObjectSize limit)")
s.metricsCollector.RecordObjectSkipped(metadata.bucket, "size_limit")
s.metricsCollector.RecordObjectSkipped(metadata.bucket, "size_limit", float64(*obj.Size))
if err := s.checkpointer.UpdateObjectCompletion(ctx, objIdx, metadata.bucket, metadata.page.Contents); err != nil {
ctx.Logger().Error(err, "could not update progress for large file")
}
Expand All @@ -468,7 +468,7 @@ func (s *Source) pageChunker(
// File empty file.
if *obj.Size == 0 {
ctx.Logger().V(5).Info("Skipping empty file")
s.metricsCollector.RecordObjectSkipped(metadata.bucket, "empty_file")
s.metricsCollector.RecordObjectSkipped(metadata.bucket, "empty_file", 0)
if err := s.checkpointer.UpdateObjectCompletion(ctx, objIdx, metadata.bucket, metadata.page.Contents); err != nil {
ctx.Logger().Error(err, "could not update progress for empty file")
}
Expand All @@ -478,7 +478,7 @@ func (s *Source) pageChunker(
// Skip incompatible extensions.
if common.SkipFile(*obj.Key) {
ctx.Logger().V(5).Info("Skipping file with incompatible extension")
s.metricsCollector.RecordObjectSkipped(metadata.bucket, "incompatible_extension")
s.metricsCollector.RecordObjectSkipped(metadata.bucket, "incompatible_extension", float64(*obj.Size))
if err := s.checkpointer.UpdateObjectCompletion(ctx, objIdx, metadata.bucket, metadata.page.Contents); err != nil {
ctx.Logger().Error(err, "could not update progress for incompatible file")
}
Expand All @@ -493,7 +493,7 @@ func (s *Source) pageChunker(

if strings.HasSuffix(*obj.Key, "/") {
ctx.Logger().V(5).Info("Skipping directory")
s.metricsCollector.RecordObjectSkipped(metadata.bucket, "directory")
s.metricsCollector.RecordObjectSkipped(metadata.bucket, "directory", float64(*obj.Size))
return nil
}

Expand Down Expand Up @@ -521,7 +521,7 @@ func (s *Source) pageChunker(
if err != nil {
if strings.Contains(err.Error(), "AccessDenied") {
ctx.Logger().Error(err, "could not get S3 object; access denied")
s.metricsCollector.RecordObjectSkipped(metadata.bucket, "access_denied")
s.metricsCollector.RecordObjectSkipped(metadata.bucket, "access_denied", float64(*obj.Size))
} else {
ctx.Logger().Error(err, "could not get S3 object")
s.metricsCollector.RecordObjectError(metadata.bucket)
Expand Down Expand Up @@ -596,7 +596,7 @@ func (s *Source) pageChunker(
if err := s.checkpointer.UpdateObjectCompletion(ctx, objIdx, metadata.bucket, metadata.page.Contents); err != nil {
ctx.Logger().Error(err, "could not update progress for scanned object")
}
s.metricsCollector.RecordObjectScanned(metadata.bucket)
s.metricsCollector.RecordObjectScanned(metadata.bucket, float64(*obj.Size))

return nil
})
Expand Down
Loading