diff --git a/pkg/sources/s3/metrics.go b/pkg/sources/s3/metrics.go index 3eb8b7c7bc00..534241477266 100644 --- a/pkg/sources/s3/metrics.go +++ b/pkg/sources/s3/metrics.go @@ -11,8 +11,8 @@ import ( type metricsCollector interface { // Object metrics. - RecordObjectScanned(bucket string) - RecordObjectSkipped(bucket, reason string) + RecordObjectScanned(bucket string, sizeBytes float64) + RecordObjectSkipped(bucket, reason string, sizeBytes float64) RecordObjectError(bucket string) // Role metrics. @@ -22,8 +22,8 @@ type metricsCollector interface { } type collector struct { - objectsScanned *prometheus.CounterVec - objectsSkipped *prometheus.CounterVec + objectsScanned *prometheus.HistogramVec + objectsSkipped *prometheus.HistogramVec objectsErrors *prometheus.CounterVec rolesScanned *prometheus.GaugeVec bucketsPerRole *prometheus.GaugeVec @@ -33,18 +33,22 @@ var metricsInstance metricsCollector func init() { metricsInstance = &collector{ - objectsScanned: promauto.NewCounterVec(prometheus.CounterOpts{ + objectsScanned: promauto.NewHistogramVec(prometheus.HistogramOpts{ Namespace: common.MetricsNamespace, Subsystem: common.MetricsSubsystem, - Name: "objects_scanned_total", - Help: "Total number of S3 objects successfully scanned", + Name: "objects_scanned_bytes", + Help: "Size distribution of successfully scanned S3 objects in bytes", + // 64B, 512B, 4KB, 32KB, 256KB, 2MB, 16MB, 128MB, 1GB. + Buckets: prometheus.ExponentialBuckets(64, 8, 9), }, []string{"bucket"}), - objectsSkipped: promauto.NewCounterVec(prometheus.CounterOpts{ + objectsSkipped: promauto.NewHistogramVec(prometheus.HistogramOpts{ Namespace: common.MetricsNamespace, Subsystem: common.MetricsSubsystem, - Name: "objects_skipped_total", - Help: "Total number of S3 objects skipped during scan", + Name: "objects_skipped_bytes", + Help: "Size distribution of skipped S3 objects in bytes", + // 64B, 512B, 4KB, 32KB, 256KB, 2MB, 16MB, 128MB, 1GB. + Buckets: prometheus.ExponentialBuckets(64, 8, 9), }, []string{"bucket", "reason"}), objectsErrors: promauto.NewCounterVec(prometheus.CounterOpts{ @@ -70,12 +74,12 @@ func init() { } } -func (c *collector) RecordObjectScanned(bucket string) { - c.objectsScanned.WithLabelValues(bucket).Inc() +func (c *collector) RecordObjectScanned(bucket string, sizeBytes float64) { + c.objectsScanned.WithLabelValues(bucket).Observe(sizeBytes) } -func (c *collector) RecordObjectSkipped(bucket, reason string) { - c.objectsSkipped.WithLabelValues(bucket, reason).Inc() +func (c *collector) RecordObjectSkipped(bucket, reason string, sizeBytes float64) { + c.objectsSkipped.WithLabelValues(bucket, reason).Observe(sizeBytes) } func (c *collector) RecordObjectError(bucket string) { diff --git a/pkg/sources/s3/s3.go b/pkg/sources/s3/s3.go index 959a0bbfa1d4..bed0b9ddcc44 100644 --- a/pkg/sources/s3/s3.go +++ b/pkg/sources/s3/s3.go @@ -432,7 +432,7 @@ func (s *Source) pageChunker( for objIdx, obj := range metadata.page.Contents { if obj == nil { - s.metricsCollector.RecordObjectSkipped(metadata.bucket, "nil_object") + s.metricsCollector.RecordObjectSkipped(metadata.bucket, "nil_object", 0) if err := s.checkpointer.UpdateObjectCompletion(ctx, objIdx, metadata.bucket, metadata.page.Contents); err != nil { ctx.Logger().Error(err, "could not update progress for nil object") } @@ -448,7 +448,7 @@ func (s *Source) pageChunker( // Skip GLACIER and GLACIER_IR objects. if obj.StorageClass == nil || strings.Contains(*obj.StorageClass, "GLACIER") { ctx.Logger().V(5).Info("Skipping object in storage class", "storage_class", *obj.StorageClass) - s.metricsCollector.RecordObjectSkipped(metadata.bucket, "storage_class") + s.metricsCollector.RecordObjectSkipped(metadata.bucket, "storage_class", float64(*obj.Size)) if err := s.checkpointer.UpdateObjectCompletion(ctx, objIdx, metadata.bucket, metadata.page.Contents); err != nil { ctx.Logger().Error(err, "could not update progress for glacier object") } @@ -458,7 +458,7 @@ func (s *Source) pageChunker( // Ignore large files. if *obj.Size > s.maxObjectSize { ctx.Logger().V(5).Info("Skipping %d byte file (over maxObjectSize limit)") - s.metricsCollector.RecordObjectSkipped(metadata.bucket, "size_limit") + s.metricsCollector.RecordObjectSkipped(metadata.bucket, "size_limit", float64(*obj.Size)) if err := s.checkpointer.UpdateObjectCompletion(ctx, objIdx, metadata.bucket, metadata.page.Contents); err != nil { ctx.Logger().Error(err, "could not update progress for large file") } @@ -468,7 +468,7 @@ func (s *Source) pageChunker( // File empty file. if *obj.Size == 0 { ctx.Logger().V(5).Info("Skipping empty file") - s.metricsCollector.RecordObjectSkipped(metadata.bucket, "empty_file") + s.metricsCollector.RecordObjectSkipped(metadata.bucket, "empty_file", 0) if err := s.checkpointer.UpdateObjectCompletion(ctx, objIdx, metadata.bucket, metadata.page.Contents); err != nil { ctx.Logger().Error(err, "could not update progress for empty file") } @@ -478,7 +478,7 @@ func (s *Source) pageChunker( // Skip incompatible extensions. if common.SkipFile(*obj.Key) { ctx.Logger().V(5).Info("Skipping file with incompatible extension") - s.metricsCollector.RecordObjectSkipped(metadata.bucket, "incompatible_extension") + s.metricsCollector.RecordObjectSkipped(metadata.bucket, "incompatible_extension", float64(*obj.Size)) if err := s.checkpointer.UpdateObjectCompletion(ctx, objIdx, metadata.bucket, metadata.page.Contents); err != nil { ctx.Logger().Error(err, "could not update progress for incompatible file") } @@ -493,7 +493,7 @@ func (s *Source) pageChunker( if strings.HasSuffix(*obj.Key, "/") { ctx.Logger().V(5).Info("Skipping directory") - s.metricsCollector.RecordObjectSkipped(metadata.bucket, "directory") + s.metricsCollector.RecordObjectSkipped(metadata.bucket, "directory", float64(*obj.Size)) return nil } @@ -521,7 +521,7 @@ func (s *Source) pageChunker( if err != nil { if strings.Contains(err.Error(), "AccessDenied") { ctx.Logger().Error(err, "could not get S3 object; access denied") - s.metricsCollector.RecordObjectSkipped(metadata.bucket, "access_denied") + s.metricsCollector.RecordObjectSkipped(metadata.bucket, "access_denied", float64(*obj.Size)) } else { ctx.Logger().Error(err, "could not get S3 object") s.metricsCollector.RecordObjectError(metadata.bucket) @@ -596,7 +596,7 @@ func (s *Source) pageChunker( if err := s.checkpointer.UpdateObjectCompletion(ctx, objIdx, metadata.bucket, metadata.page.Contents); err != nil { ctx.Logger().Error(err, "could not update progress for scanned object") } - s.metricsCollector.RecordObjectScanned(metadata.bucket) + s.metricsCollector.RecordObjectScanned(metadata.bucket, float64(*obj.Size)) return nil })