diff --git a/docs/supplementary-resources.rst b/docs/supplementary-resources.rst index c1d837d48..207504813 100644 --- a/docs/supplementary-resources.rst +++ b/docs/supplementary-resources.rst @@ -1,3 +1,11 @@ +Pilots +------ + +Navigator creates one ``Pilot`` resource for every database node. +``Pilot`` resources have the same name and name space as the ``Pod`` for the corresponding database node. +The ``Pilot.Spec`` is read by the pilot process running inside a ``Pod`` and contains its desired configuration. +The ``Pilot.Status`` is updated by the pilot process and contains the discovered state of a single database node. + Other Supplementary Resources ----------------------------- diff --git a/pkg/controllers/cassandra/pilot/pilot.go b/pkg/controllers/cassandra/pilot/pilot.go index 502a6ebf1..418d4762f 100644 --- a/pkg/controllers/cassandra/pilot/pilot.go +++ b/pkg/controllers/cassandra/pilot/pilot.go @@ -1,10 +1,6 @@ package pilot import ( - "fmt" - "hash/fnv" - "reflect" - "k8s.io/api/core/v1" k8sErrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -18,7 +14,6 @@ import ( navlisters "github.com/jetstack/navigator/pkg/client/listers/navigator/v1alpha1" "github.com/jetstack/navigator/pkg/controllers" "github.com/jetstack/navigator/pkg/controllers/cassandra/util" - hashutil "github.com/jetstack/navigator/pkg/util/hash" ) const ( @@ -79,29 +74,20 @@ func (c *pilotControl) clusterPods(cluster *v1alpha1.CassandraCluster) ([]*v1.Po return clusterPods, nil } -func (c *pilotControl) createOrUpdatePilot(cluster *v1alpha1.CassandraCluster, pod *v1.Pod) error { +func (c *pilotControl) createPilot(cluster *v1alpha1.CassandraCluster, pod *v1.Pod) error { desiredPilot := PilotForCluster(cluster, pod) client := c.naviClient.NavigatorV1alpha1().Pilots(desiredPilot.GetNamespace()) lister := c.pilots.Pilots(desiredPilot.GetNamespace()) existingPilot, err := lister.Get(desiredPilot.GetName()) - if k8sErrors.IsNotFound(err) { - _, err = client.Create(desiredPilot) - return err + // Pilot already exists + if err == nil { + return util.OwnerCheck(existingPilot, cluster) } - if err != nil { + // The only error we expect is that the pilot does not exist. + if !k8sErrors.IsNotFound(err) { return err } - err = util.OwnerCheck(existingPilot, cluster) - if err != nil { - return err - } - existingPilot = existingPilot.DeepCopy() - existingPilot.Status = v1alpha1.PilotStatus{} - desiredPilot = existingPilot.DeepCopy() - desiredPilot = updatePilotForCluster(cluster, pod, desiredPilot) - if !reflect.DeepEqual(desiredPilot, existingPilot) { - _, err = client.Update(desiredPilot) - } + _, err = client.Create(desiredPilot) return err } @@ -111,7 +97,7 @@ func (c *pilotControl) syncPilots(cluster *v1alpha1.CassandraCluster) error { return err } for _, pod := range pods { - err = c.createOrUpdatePilot(cluster, pod) + err = c.createPilot(cluster, pod) if err != nil { return err } @@ -129,55 +115,12 @@ func (c *pilotControl) Sync(cluster *v1alpha1.CassandraCluster) error { } func PilotForCluster(cluster *v1alpha1.CassandraCluster, pod *v1.Pod) *v1alpha1.Pilot { - pilot := &v1alpha1.Pilot{} - pilot.SetOwnerReferences( - []metav1.OwnerReference{ - util.NewControllerRef(cluster), + return &v1alpha1.Pilot{ + ObjectMeta: metav1.ObjectMeta{ + Name: pod.Name, + Namespace: pod.Namespace, + Labels: util.ClusterLabels(cluster), + OwnerReferences: []metav1.OwnerReference{util.NewControllerRef(cluster)}, }, - ) - return updatePilotForCluster(cluster, pod, pilot) -} - -func updatePilotForCluster( - cluster *v1alpha1.CassandraCluster, - pod *v1.Pod, - pilot *v1alpha1.Pilot, -) *v1alpha1.Pilot { - pilot.SetName(pod.GetName()) - pilot.SetNamespace(cluster.GetNamespace()) - labels := pilot.GetLabels() - if labels == nil { - labels = map[string]string{} } - for key, val := range util.ClusterLabels(cluster) { - labels[key] = val - } - pilot.SetLabels(labels) - ComputeHashAndUpdateAnnotation(pilot) - return pilot -} - -func ComputeHash(p *v1alpha1.Pilot) uint32 { - hashVar := []interface{}{ - p.Spec, - p.ObjectMeta, - p.Labels, - } - hasher := fnv.New32() - hashutil.DeepHashObject(hasher, hashVar) - return hasher.Sum32() -} - -func UpdateHashAnnotation(p *v1alpha1.Pilot, hash uint32) { - annotations := p.GetAnnotations() - if annotations == nil { - annotations = map[string]string{} - } - annotations[HashAnnotationKey] = fmt.Sprintf("%d", hash) - p.SetAnnotations(annotations) -} - -func ComputeHashAndUpdateAnnotation(p *v1alpha1.Pilot) { - hash := ComputeHash(p) - UpdateHashAnnotation(p, hash) } diff --git a/pkg/controllers/cassandra/pilot/pilot_test.go b/pkg/controllers/cassandra/pilot/pilot_test.go index 880d69f71..1363e4ef5 100644 --- a/pkg/controllers/cassandra/pilot/pilot_test.go +++ b/pkg/controllers/cassandra/pilot/pilot_test.go @@ -6,8 +6,11 @@ import ( "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "github.com/jetstack/navigator/internal/test/unit/framework" "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1" + "github.com/jetstack/navigator/pkg/controllers" "github.com/jetstack/navigator/pkg/controllers/cassandra/pilot" casstesting "github.com/jetstack/navigator/pkg/controllers/cassandra/testing" "github.com/jetstack/navigator/pkg/controllers/cassandra/util" @@ -25,121 +28,114 @@ func clusterPod(cluster *v1alpha1.CassandraCluster, name string) *v1.Pod { return pod } -func nonClusterPod(cluster *v1alpha1.CassandraCluster, name string) *v1.Pod { - p := clusterPod(cluster, name) - p.SetOwnerReferences([]metav1.OwnerReference{}) - return p -} - func TestPilotSync(t *testing.T) { - t.Run( - "each cluster pod gets a pilot", - func(t *testing.T) { - f := casstesting.NewFixture(t) - f.AddObjectK(clusterPod(f.Cluster, "foo")) - f.AddObjectK(clusterPod(f.Cluster, "bar")) - f.Run() - f.AssertPilotsLength(2) - }, - ) - t.Run( - "non-cluster pods are ignored", - func(t *testing.T) { - f := casstesting.NewFixture(t) - f.AddObjectK(clusterPod(f.Cluster, "foo")) - f.AddObjectK(nonClusterPod(f.Cluster, "bar")) - f.Run() - f.AssertPilotsLength(1) - }, - ) - t.Run( - "pilot exists", - func(t *testing.T) { - f := casstesting.NewFixture(t) - pod := clusterPod(f.Cluster, "foo") - pilot := pilot.PilotForCluster(f.Cluster, pod) - f.AddObjectK(pod) - f.AddObjectN(pilot) - f.Run() - f.AssertPilotsLength(1) - }, - ) - t.Run( - "foreign owned pilot", - func(t *testing.T) { - f := casstesting.NewFixture(t) - pod := clusterPod(f.Cluster, "foo") - pilot := pilot.PilotForCluster(f.Cluster, pod) - pilot.SetOwnerReferences([]metav1.OwnerReference{}) - f.AddObjectK(pod) - f.AddObjectN(pilot) - f.RunExpectError() - f.AssertPilotsLength(1) + cluster1 := casstesting.ClusterForTest() + cluster1pod1 := clusterPod(cluster1, "c1p1") + cluster1pod2 := clusterPod(cluster1, "c1p2") + cluster1pilot1 := pilot.PilotForCluster(cluster1, cluster1pod1) + cluster1pilot1foreign := cluster1pilot1.DeepCopy() + cluster1pilot1foreign.SetOwnerReferences([]metav1.OwnerReference{}) + + cluster2 := casstesting.ClusterForTest() + cluster2.SetName("cluster2") + cluster2.SetUID("uid2") + cluster2pod1 := clusterPod(cluster2, "c2p1") + + type testT struct { + kubeObjects []runtime.Object + navObjects []runtime.Object + cluster *v1alpha1.CassandraCluster + assertions func(*testing.T, *controllers.State) + expectErr bool + } + + tests := map[string]testT{ + "each cluster pod gets a pilot": { + kubeObjects: []runtime.Object{ + cluster1pod1, + cluster1pod2, + cluster2pod1, + }, + cluster: cluster1, + assertions: func(t *testing.T, state *controllers.State) { + pilots, err := state.NavigatorClientset. + Navigator().Pilots(cluster1.Namespace).List(metav1.ListOptions{}) + if err != nil { + t.Fatal(err) + } + expectedPilotCount := 2 + pilotCount := len(pilots.Items) + if pilotCount != expectedPilotCount { + t.Log(pilots.Items) + t.Errorf("Unexpected pilot count: %d != %d", expectedPilotCount, pilotCount) + } + }, }, - ) - t.Run( - "pilot sync when hash changes", - func(t *testing.T) { - f := casstesting.NewFixture(t) - pod := clusterPod(f.Cluster, "foo") - unsyncedPilot := pilot.PilotForCluster(f.Cluster, pod) - pilot.UpdateHashAnnotation(unsyncedPilot, 0) - f.AddObjectK(pod) - f.AddObjectN(unsyncedPilot) - f.Run() - f.AssertPilotsLength(1) - updatedPilot := f.Pilots().Items[0] - updatedPilotAnnotations := updatedPilot.GetAnnotations() - hash, ok := updatedPilotAnnotations[pilot.HashAnnotationKey] - if !ok { - t.Log(updatedPilotAnnotations) - t.Error("pilot hash annotation not found") - } - if hash == "0" { - t.Log(updatedPilot) - t.Error("Pilot was not updated") - } + "non-cluster pods are ignored": { + kubeObjects: []runtime.Object{ + cluster1pod1, + cluster2pod1, + }, + cluster: cluster1, + assertions: func(t *testing.T, state *controllers.State) { + pilots, err := state.NavigatorClientset. + Navigator().Pilots(cluster1.Namespace).List(metav1.ListOptions{}) + if err != nil { + t.Fatal(err) + } + expectedPilotCount := 1 + pilotCount := len(pilots.Items) + if pilotCount != expectedPilotCount { + t.Log(pilots.Items) + t.Errorf("Unexpected pilot count: %d != %d", expectedPilotCount, pilotCount) + } + }, }, - ) - t.Run( - "pilot no sync if hash matches", - func(t *testing.T) { - f := casstesting.NewFixture(t) - pod := clusterPod(f.Cluster, "foo") - // Remove the labels - unsyncedPilot := pilot.PilotForCluster(f.Cluster, pod) - unsyncedPilot.SetLabels(map[string]string{}) - pilot.ComputeHashAndUpdateAnnotation(unsyncedPilot) - f.AddObjectK(pod) - f.AddObjectN(unsyncedPilot) - f.Run() - f.AssertPilotsLength(1) - updatedPilot := f.Pilots().Items[0] - updatedLabels := updatedPilot.GetLabels() - if len(updatedLabels) == 0 { - t.Log(updatedPilot) - t.Error("pilot was not updated") - } + "no error if pilot exists": { + kubeObjects: []runtime.Object{cluster1pod1}, + navObjects: []runtime.Object{cluster1pilot1}, + cluster: cluster1, }, - ) - t.Run( - "don't clobber custom labels", - func(t *testing.T) { - f := casstesting.NewFixture(t) - pod := clusterPod(f.Cluster, "foo") - // Remove the labels - unsyncedPilot := pilot.PilotForCluster(f.Cluster, pod) - unsyncedPilot.Labels["foo"] = "bar" - f.AddObjectK(pod) - f.AddObjectN(unsyncedPilot) - f.Run() - f.AssertPilotsLength(1) - updatedPilot := f.Pilots().Items[0] - updatedLabels := updatedPilot.GetLabels() - if updatedLabels["foo"] != "bar" { - t.Log(updatedLabels) - t.Error("custom labels were altered") - } + "error if foreign owned": { + kubeObjects: []runtime.Object{cluster1pod1}, + navObjects: []runtime.Object{cluster1pilot1foreign}, + cluster: cluster1, + expectErr: true, }, - ) + } + for title, test := range tests { + t.Run( + title, + func(t *testing.T) { + fixture := &framework.StateFixture{ + T: t, + KubeObjects: test.kubeObjects, + NavigatorObjects: test.navObjects, + } + fixture.Start() + defer fixture.Stop() + state := fixture.State() + c := pilot.NewControl( + state.NavigatorClientset, + state.PilotLister, + state.PodLister, + state.StatefulSetLister, + state.Recorder, + ) + err := c.Sync(test.cluster) + if err != nil { + if !test.expectErr { + t.Errorf("Unexpected error: %s", err) + } + } else { + if test.expectErr { + t.Error("Missing error") + } + } + if test.assertions != nil { + test.assertions(t, state) + } + }, + ) + } }