Skip to content

Commit 626b2f3

Browse files
authored
Merge pull request #3089 from k8s-infra-cherrypick-robot/cherry-pick-3085-to-release-0.20
[release-0.20] 🐛 Priorityqueue: Yet another queue_depth metric fix
2 parents 791b6c9 + 64cb665 commit 626b2f3

File tree

3 files changed

+45
-3
lines changed

3 files changed

+45
-3
lines changed

pkg/controller/priorityqueue/metrics.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -85,11 +85,11 @@ func (m *defaultQueueMetrics[T]) get(item T) {
8585
return
8686
}
8787

88+
m.depth.Dec()
89+
8890
m.mapLock.Lock()
8991
defer m.mapLock.Unlock()
9092

91-
m.depth.Dec()
92-
9393
m.processingStartTimes[item] = m.clock.Now()
9494
if startTime, exists := m.addTimes[item]; exists {
9595
m.latency.Observe(m.sinceInSeconds(startTime))

pkg/controller/priorityqueue/priorityqueue.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) {
168168
}
169169

170170
if item.ReadyAt != nil && (readyAt == nil || readyAt.Before(*item.ReadyAt)) {
171-
if readyAt == nil {
171+
if readyAt == nil && !w.becameReady.Has(key) {
172172
w.metrics.add(key)
173173
}
174174
item.ReadyAt = readyAt

pkg/controller/priorityqueue/priorityqueue_test.go

+42
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,48 @@ var _ = Describe("Controllerworkqueue", func() {
395395
Expect(q.Len()).To(Equal(1))
396396
metrics.mu.Lock()
397397
Expect(metrics.depth["test"]).To(Equal(1))
398+
metrics.mu.Unlock()
399+
400+
// Get the item to ensure the codepath in
401+
// `spin` for the metrics is passed by so
402+
// that this starts failing if it incorrectly
403+
// calls `metrics.add` again.
404+
item, _ := q.Get()
405+
Expect(item).To(Equal("foo"))
406+
Expect(q.Len()).To(Equal(0))
407+
metrics.mu.Lock()
408+
Expect(metrics.depth["test"]).To(Equal(0))
409+
metrics.mu.Unlock()
410+
})
411+
412+
It("Updates metrics correctly for an item whose requeueAfter expired that gets added again without requeueAfter", func() {
413+
q, metrics := newQueue()
414+
defer q.ShutDown()
415+
416+
q.AddWithOpts(AddOpts{After: 50 * time.Millisecond}, "foo")
417+
time.Sleep(100 * time.Millisecond)
418+
419+
Expect(q.Len()).To(Equal(1))
420+
metrics.mu.Lock()
421+
Expect(metrics.depth["test"]).To(Equal(1))
422+
metrics.mu.Unlock()
423+
424+
q.AddWithOpts(AddOpts{}, "foo")
425+
Expect(q.Len()).To(Equal(1))
426+
metrics.mu.Lock()
427+
Expect(metrics.depth["test"]).To(Equal(1))
428+
metrics.mu.Unlock()
429+
430+
// Get the item to ensure the codepath in
431+
// `spin` for the metrics is passed by so
432+
// that this starts failing if it incorrectly
433+
// calls `metrics.add` again.
434+
item, _ := q.Get()
435+
Expect(item).To(Equal("foo"))
436+
Expect(q.Len()).To(Equal(0))
437+
metrics.mu.Lock()
438+
Expect(metrics.depth["test"]).To(Equal(0))
439+
metrics.mu.Unlock()
398440
})
399441
})
400442

0 commit comments

Comments
 (0)