Skip to content

WIP: Turn clusterName:string into a type parameter #2

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/kind/main.go
Original file line number Diff line number Diff line change
@@ -74,9 +74,9 @@ func main() {
For(&corev1.Pod{}).
Complete(mcreconcile.Func(
func(ctx context.Context, req mcreconcile.Request) (ctrl.Result, error) {
log := log.FromContext(ctx).WithValues("cluster", req.ClusterName)
log := log.FromContext(ctx).WithValues("cluster", req.Cluster)

cl, err := mgr.GetCluster(ctx, req.ClusterName)
cl, err := mgr.GetCluster(ctx, req.Cluster)
if err != nil {
return reconcile.Result{}, err
}
6 changes: 3 additions & 3 deletions examples/namespace/main.go
Original file line number Diff line number Diff line change
@@ -129,9 +129,9 @@ func main() {
For(&corev1.ConfigMap{}).
Complete(mcreconcile.Func(
func(ctx context.Context, req mcreconcile.Request) (ctrl.Result, error) {
log := log.FromContext(ctx).WithValues("cluster", req.ClusterName)
log := log.FromContext(ctx).WithValues("cluster", req.Cluster)

cl, err := mgr.GetCluster(ctx, req.ClusterName)
cl, err := mgr.GetCluster(ctx, req.Cluster)
if err != nil {
return reconcile.Result{}, err
}
@@ -142,7 +142,7 @@ func main() {
if err := client.Get(ctx, req.NamespacedName, cm); err != nil {
return reconcile.Result{}, err
}
log.Info("Reconciling configmap", "cluster", req.ClusterName, "ns", req.Namespace, "name", cm.Name, "uuid", cm.UID)
log.Info("Reconciling configmap", "cluster", req.Cluster, "ns", req.Namespace, "name", cm.Name, "uuid", cm.UID)

return ctrl.Result{}, nil
},
101 changes: 52 additions & 49 deletions pkg/builder/controller.go

Large diffs are not rendered by default.

42 changes: 21 additions & 21 deletions pkg/builder/multicluster_handler.go
Original file line number Diff line number Diff line change
@@ -23,81 +23,81 @@ import (
mcreconcile "github.com/multicluster-runtime/multicluster-runtime/pkg/reconcile"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/cluster"
ctrlcluster "sigs.k8s.io/controller-runtime/pkg/cluster"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
)

// StaticHandler returns a handler constructor with a static value.
func StaticHandler[object client.Object, request comparable](h handler.TypedEventHandler[object, request]) func(cluster.Cluster) handler.TypedEventHandler[object, request] {
return func(cl cluster.Cluster) handler.TypedEventHandler[object, request] {
func StaticHandler[object client.Object, request comparable](h handler.TypedEventHandler[object, request]) func(ctrlcluster.Cluster) handler.TypedEventHandler[object, request] {
return func(cl ctrlcluster.Cluster) handler.TypedEventHandler[object, request] {
return h
}
}

// handlerWithCluster wraps a handler and injects the cluster name into the
// reuqests that are enqueued.
func handlerWithCluster[object any, request mcreconcile.ClusterAware[request]](name string, h handler.TypedEventHandler[object, request]) handler.TypedEventHandler[object, request] {
func handlerWithCluster[object any, cluster comparable, request mcreconcile.ClusterAware[cluster, request]](cl cluster, h handler.TypedEventHandler[object, request]) handler.TypedEventHandler[object, request] {
return handler.TypedFuncs[object, request]{
CreateFunc: func(ctx context.Context, e event.TypedCreateEvent[object], q workqueue.TypedRateLimitingInterface[request]) {
h.Create(ctx, e, clusterAwareWorkqueue[request]{cluster: name, q: q})
h.Create(ctx, e, clusterAwareWorkqueue[cluster, request]{cluster: cl, q: q})
},
UpdateFunc: func(ctx context.Context, e event.TypedUpdateEvent[object], q workqueue.TypedRateLimitingInterface[request]) {
h.Update(ctx, e, clusterAwareWorkqueue[request]{cluster: name, q: q})
h.Update(ctx, e, clusterAwareWorkqueue[cluster, request]{cluster: cl, q: q})
},
DeleteFunc: func(ctx context.Context, e event.TypedDeleteEvent[object], q workqueue.TypedRateLimitingInterface[request]) {
h.Delete(ctx, e, clusterAwareWorkqueue[request]{cluster: name, q: q})
h.Delete(ctx, e, clusterAwareWorkqueue[cluster, request]{cluster: cl, q: q})
},
}
}

type clusterAwareWorkqueue[request mcreconcile.ClusterAware[request]] struct {
cluster string
type clusterAwareWorkqueue[cluster comparable, request mcreconcile.ClusterAware[cluster, request]] struct {
cluster cluster
q workqueue.TypedRateLimitingInterface[request]
}

var _ workqueue.TypedInterface[mcreconcile.Request] = &clusterAwareWorkqueue[mcreconcile.Request]{}
var _ workqueue.TypedInterface[mcreconcile.Request] = &clusterAwareWorkqueue[string, mcreconcile.Request]{}

func (q clusterAwareWorkqueue[request]) Add(item request) {
func (q clusterAwareWorkqueue[cluster, request]) Add(item request) {
q.q.Add(item.WithCluster(q.cluster))
}

func (q clusterAwareWorkqueue[request]) AddAfter(item request, duration time.Duration) {
func (q clusterAwareWorkqueue[cluster, request]) AddAfter(item request, duration time.Duration) {
q.q.AddAfter(item.WithCluster(q.cluster), duration)
}

func (q clusterAwareWorkqueue[request]) AddRateLimited(item request) {
func (q clusterAwareWorkqueue[cluster, request]) AddRateLimited(item request) {
q.q.AddRateLimited(item.WithCluster(q.cluster))
}

func (q clusterAwareWorkqueue[request]) Forget(item request) {
func (q clusterAwareWorkqueue[cluster, request]) Forget(item request) {
q.q.Forget(item.WithCluster(q.cluster))
}

func (q clusterAwareWorkqueue[request]) NumRequeues(item request) int {
func (q clusterAwareWorkqueue[cluster, request]) NumRequeues(item request) int {
return q.q.NumRequeues(item.WithCluster(q.cluster))
}

func (q clusterAwareWorkqueue[request]) Len() int {
func (q clusterAwareWorkqueue[cluster, request]) Len() int {
return q.q.Len()
}

func (q clusterAwareWorkqueue[request]) Get() (item request, shutdown bool) {
func (q clusterAwareWorkqueue[cluster, request]) Get() (item request, shutdown bool) {
return q.q.Get()
}

func (q clusterAwareWorkqueue[request]) Done(item request) {
func (q clusterAwareWorkqueue[cluster, request]) Done(item request) {
q.q.Done(item.WithCluster(q.cluster))
}

func (q clusterAwareWorkqueue[request]) ShutDown() {
func (q clusterAwareWorkqueue[cluster, request]) ShutDown() {
q.q.ShutDown()
}

func (q clusterAwareWorkqueue[request]) ShutDownWithDrain() {
func (q clusterAwareWorkqueue[cluster, request]) ShutDownWithDrain() {
q.q.ShutDownWithDrain()
}

func (q clusterAwareWorkqueue[request]) ShuttingDown() bool {
func (q clusterAwareWorkqueue[cluster, request]) ShuttingDown() bool {
return q.q.ShuttingDown()
}
4 changes: 2 additions & 2 deletions pkg/builder/multicluster_options.go
Original file line number Diff line number Diff line change
@@ -59,10 +59,10 @@ func (w EngageOptions) ApplyToWatches(opts untypedWatchesInput) {
}
}

func (w *WatchesInput[request]) setEngageWithLocalCluster(engage bool) {
func (w *WatchesInput[cluster, request]) setEngageWithLocalCluster(engage bool) {
w.engageWithLocalCluster = &engage
}

func (w *WatchesInput[request]) setEngageWithProviderClusters(engage bool) {
func (w *WatchesInput[cluster, request]) setEngageWithProviderClusters(engage bool) {
w.engageWithProviderClusters = &engage
}
12 changes: 6 additions & 6 deletions pkg/context/cluster.go
Original file line number Diff line number Diff line change
@@ -28,21 +28,21 @@ type clusterKeyType string
const clusterKey clusterKeyType = "cluster"

// WithCluster returns a new context with the given cluster.
func WithCluster(ctx context.Context, cluster string) context.Context {
return context.WithValue(ctx, clusterKey, cluster)
func WithCluster[cluster comparable](ctx context.Context, cl cluster) context.Context {
return context.WithValue(ctx, clusterKey, cl)
}

// ClusterFrom returns the cluster from the context.
func ClusterFrom(ctx context.Context) (string, bool) {
cluster, ok := ctx.Value(clusterKey).(string)
return cluster, ok
func ClusterFrom[cluster comparable](ctx context.Context) (cluster, bool) {
cl, ok := ctx.Value(clusterKey).(cluster)
return cl, ok
}

// ReconcilerWithClusterInContext returns a reconciler that sets the cluster name in the
// context.
func ReconcilerWithClusterInContext(r reconcile.Reconciler) mcreconcile.Reconciler {
return reconcile.TypedFunc[mcreconcile.Request](func(ctx context.Context, req mcreconcile.Request) (reconcile.Result, error) {
ctx = WithCluster(ctx, req.ClusterName)
ctx = WithCluster(ctx, req.Cluster)
return r.Reconcile(ctx, req.Request)
})
}
64 changes: 32 additions & 32 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
@@ -23,7 +23,7 @@ import (

"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/cluster"
ctrlcluster "sigs.k8s.io/controller-runtime/pkg/cluster"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/source"

@@ -37,33 +37,33 @@ import (
// from source.Sources. Work is performed through the reconcile.Reconciler for each enqueued item.
// Work typically is reads and writes Kubernetes objects to make the system state match the state specified
// in the object Spec.
type Controller = TypedController[mcreconcile.Request]
type Controller = TypedController[string, mcreconcile.Request]

// Options are the arguments for creating a new Controller.
type Options = controller.TypedOptions[mcreconcile.Request]

// TypedController implements an API.
type TypedController[request comparable] interface {
type TypedController[cluster, request comparable] interface {
controller.TypedController[request]
multicluster.Aware
multicluster.Aware[cluster]

// MultiClusterWatch watches the provided Source.
MultiClusterWatch(src mcsource.TypedSource[client.Object, request]) error
MultiClusterWatch(src mcsource.TypedSource[client.Object, cluster, request]) error
}

// New returns a new Controller registered with the Manager. The Manager will ensure that shared Caches have
// been synced before the Controller is Started.
//
// The name must be unique as it is used to identify the controller in metrics and logs.
func New(name string, mgr mcmanager.Manager, options Options) (Controller, error) {
return NewTyped(name, mgr, options)
return NewTyped[string, mcreconcile.Request](name, mgr, options)
}

// NewTyped returns a new typed controller registered with the Manager,
//
// The name must be unique as it is used to identify the controller in metrics and logs.
func NewTyped[request comparable](name string, mgr mcmanager.Manager, options controller.TypedOptions[request]) (TypedController[request], error) {
c, err := NewTypedUnmanaged(name, mgr, options)
func NewTyped[cluster, request comparable](name string, mgr mcmanager.TypedManager[cluster], options controller.TypedOptions[request]) (TypedController[cluster, request], error) {
c, err := NewTypedUnmanaged[cluster, request](name, mgr, options)
if err != nil {
return nil, err
}
@@ -77,85 +77,85 @@ func NewTyped[request comparable](name string, mgr mcmanager.Manager, options co
//
// The name must be unique as it is used to identify the controller in metrics and logs.
func NewUnmanaged(name string, mgr mcmanager.Manager, options Options) (Controller, error) {
return NewTypedUnmanaged[mcreconcile.Request](name, mgr, options)
return NewTypedUnmanaged[string, mcreconcile.Request](name, mgr, options)
}

// NewTypedUnmanaged returns a new typed controller without adding it to the manager.
//
// The name must be unique as it is used to identify the controller in metrics and logs.
func NewTypedUnmanaged[request comparable](name string, mgr mcmanager.Manager, options controller.TypedOptions[request]) (TypedController[request], error) {
func NewTypedUnmanaged[cluster, request comparable](name string, mgr mcmanager.TypedManager[cluster], options controller.TypedOptions[request]) (TypedController[cluster, request], error) {
c, err := controller.NewTypedUnmanaged[request](name, mgr.GetLocalManager(), options)
if err != nil {
return nil, err
}
return &mcController[request]{
return &mcController[cluster, request]{
TypedController: c,
clusters: make(map[string]engagedCluster),
clusters: make(map[cluster]engagedCluster[cluster]),
}, nil
}

var _ TypedController[mcreconcile.Request] = &mcController[mcreconcile.Request]{}
var _ TypedController[string, mcreconcile.Request] = &mcController[string, mcreconcile.Request]{}

type mcController[request comparable] struct {
type mcController[cluster, request comparable] struct {
controller.TypedController[request]

lock sync.Mutex
clusters map[string]engagedCluster
sources []mcsource.TypedSource[client.Object, request]
clusters map[cluster]engagedCluster[cluster]
sources []mcsource.TypedSource[client.Object, cluster, request]
}

type engagedCluster struct {
name string
cluster cluster.Cluster
type engagedCluster[cluster comparable] struct {
clRef cluster
cluster ctrlcluster.Cluster
}

func (c *mcController[request]) Engage(ctx context.Context, name string, cl cluster.Cluster) error {
func (c *mcController[cluster, request]) Engage(ctx context.Context, clRef cluster, cl ctrlcluster.Cluster) error {
c.lock.Lock()
defer c.lock.Unlock()

if old, ok := c.clusters[name]; ok && old.cluster == cl {
if old, ok := c.clusters[clRef]; ok && old.cluster == cl {
return nil
}

ctx, cancel := context.WithCancel(ctx)

// pass through in case the controller itself is cluster aware
if ctrl, ok := c.TypedController.(multicluster.Aware); ok {
if err := ctrl.Engage(ctx, name, cl); err != nil {
if ctrl, ok := c.TypedController.(multicluster.Aware[cluster]); ok {
if err := ctrl.Engage(ctx, clRef, cl); err != nil {
return err
}
}

// engage cluster aware instances
for _, aware := range c.sources {
src, err := aware.ForCluster(name, cl)
src, err := aware.ForCluster(clRef, cl)
if err != nil {
cancel()
return fmt.Errorf("failed to engage for cluster %q: %w", name, err)
return fmt.Errorf("failed to engage for cluster %q: %w", clRef, err)
}
if err := c.TypedController.Watch(startWithinContext[request](ctx, src)); err != nil {
cancel()
return fmt.Errorf("failed to watch for cluster %q: %w", name, err)
return fmt.Errorf("failed to watch for cluster %q: %w", clRef, err)
}
}

ec := engagedCluster{
name: name,
ec := engagedCluster[cluster]{
clRef: clRef,
cluster: cl,
}
c.clusters[name] = ec
c.clusters[clRef] = ec
go func() {
c.lock.Lock()
defer c.lock.Unlock()
if c.clusters[name] == ec {
delete(c.clusters, name)
if c.clusters[clRef] == ec {
delete(c.clusters, clRef)
}
}()

return nil
}

func (c *mcController[request]) MultiClusterWatch(src mcsource.TypedSource[client.Object, request]) error {
func (c *mcController[cluster, request]) MultiClusterWatch(src mcsource.TypedSource[client.Object, cluster, request]) error {
c.lock.Lock()
defer c.lock.Unlock()

Loading