Skip to content

Commit e022b91

Browse files
authoredJan 23, 2025
Merge pull request #3087 from tareksha/typed_waitforsync
[release-0.19] 🐛fix(controller): support WaitForSync in custom TypedSyncingSource
2 parents d4df90f + b2c2171 commit e022b91

File tree

2 files changed

+77
-1
lines changed

2 files changed

+77
-1
lines changed
 

‎pkg/internal/controller/controller.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ func (c *Controller[request]) Start(ctx context.Context) error {
183183
c.LogConstructor(nil).Info("Starting Controller")
184184

185185
for _, watch := range c.startWatches {
186-
syncingSource, ok := watch.(source.SyncingSource)
186+
syncingSource, ok := watch.(source.TypedSyncingSource[request])
187187
if !ok {
188188
continue
189189
}

‎pkg/internal/controller/controller_test.go

+76
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ import (
4646
"sigs.k8s.io/controller-runtime/pkg/source"
4747
)
4848

49+
type TestRequest struct {
50+
Key string
51+
}
52+
4953
var _ = Describe("controller", func() {
5054
var fakeReconcile *fakeReconciler
5155
var ctrl *Controller[reconcile.Request]
@@ -323,6 +327,41 @@ var _ = Describe("controller", func() {
323327
Expect(err.Error()).To(Equal("controller was started more than once. This is likely to be caused by being added to a manager multiple times"))
324328
})
325329

330+
It("should check for correct TypedSyncingSource if custom types are used", func() {
331+
queue := &controllertest.TypedQueue[TestRequest]{
332+
TypedInterface: workqueue.NewTyped[TestRequest](),
333+
}
334+
ctrl := &Controller[TestRequest]{
335+
NewQueue: func(string, workqueue.TypedRateLimiter[TestRequest]) workqueue.TypedRateLimitingInterface[TestRequest] {
336+
return queue
337+
},
338+
LogConstructor: func(*TestRequest) logr.Logger {
339+
return log.RuntimeLog.WithName("controller").WithName("test")
340+
},
341+
}
342+
ctrl.CacheSyncTimeout = time.Second
343+
src := &bisignallingSource[TestRequest]{
344+
startCall: make(chan workqueue.TypedRateLimitingInterface[TestRequest]),
345+
startDone: make(chan error, 1),
346+
waitCall: make(chan struct{}),
347+
waitDone: make(chan error, 1),
348+
}
349+
ctrl.startWatches = []source.TypedSource[TestRequest]{src}
350+
ctrl.Name = "foo"
351+
ctx, cancel := context.WithCancel(context.Background())
352+
defer cancel()
353+
startCh := make(chan error)
354+
go func() {
355+
defer GinkgoRecover()
356+
startCh <- ctrl.Start(ctx)
357+
}()
358+
Eventually(src.startCall).Should(Receive(Equal(queue)))
359+
src.startDone <- nil
360+
Eventually(src.waitCall).Should(BeClosed())
361+
src.waitDone <- nil
362+
cancel()
363+
Eventually(startCh).Should(Receive(Succeed()))
364+
})
326365
})
327366

328367
Describe("Processing queue items from a Controller", func() {
@@ -875,3 +914,40 @@ func (c *cacheWithIndefinitelyBlockingGetInformer) GetInformer(ctx context.Conte
875914
<-ctx.Done()
876915
return nil, errors.New("GetInformer timed out")
877916
}
917+
918+
type bisignallingSource[T comparable] struct {
919+
// receives the queue that is passed to Start
920+
startCall chan workqueue.TypedRateLimitingInterface[T]
921+
// passes an error to return from Start
922+
startDone chan error
923+
// closed when WaitForSync is called
924+
waitCall chan struct{}
925+
// passes an error to return from WaitForSync
926+
waitDone chan error
927+
}
928+
929+
var _ source.TypedSyncingSource[int] = (*bisignallingSource[int])(nil)
930+
931+
func (t *bisignallingSource[T]) Start(ctx context.Context, q workqueue.TypedRateLimitingInterface[T]) error {
932+
select {
933+
case t.startCall <- q:
934+
case <-ctx.Done():
935+
return ctx.Err()
936+
}
937+
select {
938+
case err := <-t.startDone:
939+
return err
940+
case <-ctx.Done():
941+
return ctx.Err()
942+
}
943+
}
944+
945+
func (t *bisignallingSource[T]) WaitForSync(ctx context.Context) error {
946+
close(t.waitCall)
947+
select {
948+
case err := <-t.waitDone:
949+
return err
950+
case <-ctx.Done():
951+
return ctx.Err()
952+
}
953+
}

0 commit comments

Comments
 (0)