diff --git a/disperser/encoder/metrics.go b/disperser/encoder/metrics.go index a68071bb90..11ba438b54 100644 --- a/disperser/encoder/metrics.go +++ b/disperser/encoder/metrics.go @@ -6,7 +6,6 @@ import ( "net/http" "time" - "github.com/Layr-Labs/eigenda/disperser/common" "github.com/Layr-Labs/eigensdk-go/logging" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" @@ -107,9 +106,9 @@ func (m *Metrics) ObserveLatency(stage string, duration time.Duration) { m.Latency.WithLabelValues(stage).Observe(float64(duration.Milliseconds())) } -func (m *Metrics) ObserveQueue(queueStats map[int]int) { - for blobSize, num := range queueStats { - m.BlobQueue.With(prometheus.Labels{"size_bucket": common.BlobSizeBucket(blobSize)}).Set(float64(num)) +func (m *Metrics) ObserveQueue(queueStats map[string]int) { + for bucket, num := range queueStats { + m.BlobQueue.With(prometheus.Labels{"size_bucket": bucket}).Set(float64(num)) } } diff --git a/disperser/encoder/server.go b/disperser/encoder/server.go index b02dd358d7..18a7ad43ec 100644 --- a/disperser/encoder/server.go +++ b/disperser/encoder/server.go @@ -12,6 +12,7 @@ import ( "github.com/Layr-Labs/eigenda/common/healthcheck" "github.com/Layr-Labs/eigenda/disperser" pb "github.com/Layr-Labs/eigenda/disperser/api/grpc/encoder" + "github.com/Layr-Labs/eigenda/disperser/common" "github.com/Layr-Labs/eigenda/encoding" "github.com/Layr-Labs/eigensdk-go/logging" "google.golang.org/grpc" @@ -30,7 +31,7 @@ type EncoderServer struct { runningRequests chan struct{} requestPool chan blobRequest - queueStats map[int]int + queueStats map[string]int queueLock sync.Mutex } @@ -47,7 +48,7 @@ func NewEncoderServer(config ServerConfig, logger logging.Logger, prover encodin runningRequests: make(chan struct{}, config.MaxConcurrentRequests), requestPool: make(chan blobRequest, config.RequestPoolSize), - queueStats: make(map[int]int), + queueStats: make(map[string]int), } } @@ -92,16 +93,15 @@ func (s *EncoderServer) EncodeBlob(ctx context.Context, req *pb.EncodeBlobReques blobSize := len(req.GetData()) select { case s.requestPool <- blobRequest{blobSizeByte: blobSize}: + s.queueLock.Lock() + s.queueStats[common.BlobSizeBucket(blobSize)]++ + s.metrics.ObserveQueue(s.queueStats) + s.queueLock.Unlock() default: s.metrics.IncrementRateLimitedBlobRequestNum(len(req.GetData())) s.logger.Warn("rate limiting as request pool is full", "requestPoolSize", s.config.RequestPoolSize, "maxConcurrentRequests", s.config.MaxConcurrentRequests) return nil, errors.New("too many requests") } - s.queueLock.Lock() - s.queueStats[blobSize]++ - s.metrics.ObserveQueue(s.queueStats) - s.queueLock.Unlock() - s.runningRequests <- struct{}{} defer s.popRequest() @@ -126,7 +126,7 @@ func (s *EncoderServer) popRequest() { blobRequest := <-s.requestPool <-s.runningRequests s.queueLock.Lock() - s.queueStats[blobRequest.blobSizeByte]-- + s.queueStats[common.BlobSizeBucket(blobRequest.blobSizeByte)]-- s.metrics.ObserveQueue(s.queueStats) s.queueLock.Unlock() }