Skip to content

Commit

Permalink
⚠️ Fakeclient: Fix status handling
Browse files Browse the repository at this point in the history
This change fixes the status handling of the fake status client by

* Adding a new `WithStatusSubresource` option and pre-poluating it with
  all in-tree resources that have a status subresource
* Making its `Update` and `Patch` not change the status of any such
  resource
* Making its status client `Update` and `Patch` only change the status
  for any such resources

Since this was completely broken before in that both non-status and
status Update/Patch would always update everything and the status
resources get pre-populated, this is a breaking change. Any test that
relied on the previous behavior would pass incorrectly though, as it
didn't match what the Kubernetes API does.
  • Loading branch information
alvaroaleman committed Apr 11, 2023
1 parent 5db1738 commit d569207
Show file tree
Hide file tree
Showing 2 changed files with 421 additions and 29 deletions.
236 changes: 207 additions & 29 deletions pkg/client/fake/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ limitations under the License.
package fake

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"reflect"
"runtime/debug"
"strconv"
"strings"
"sync"
Expand All @@ -35,6 +37,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilrand "k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes/scheme"
Expand All @@ -48,13 +51,15 @@ import (

type versionedTracker struct {
testing.ObjectTracker
scheme *runtime.Scheme
scheme *runtime.Scheme
withStatusSubresource sets.Set[schema.GroupVersionKind]
}

type fakeClient struct {
tracker versionedTracker
scheme *runtime.Scheme
restMapper meta.RESTMapper
tracker versionedTracker
scheme *runtime.Scheme
restMapper meta.RESTMapper
withStatusSubresource sets.Set[schema.GroupVersionKind]

// indexes maps each GroupVersionKind (GVK) to the indexes registered for that GVK.
// The inner map maps from index name to IndexerFunc.
Expand Down Expand Up @@ -95,12 +100,13 @@ func NewClientBuilder() *ClientBuilder {

// ClientBuilder builds a fake client.
type ClientBuilder struct {
scheme *runtime.Scheme
restMapper meta.RESTMapper
initObject []client.Object
initLists []client.ObjectList
initRuntimeObjects []runtime.Object
objectTracker testing.ObjectTracker
scheme *runtime.Scheme
restMapper meta.RESTMapper
initObject []client.Object
initLists []client.ObjectList
initRuntimeObjects []runtime.Object
withStatusSubresource []client.Object
objectTracker testing.ObjectTracker

// indexes maps each GroupVersionKind (GVK) to the indexes registered for that GVK.
// The inner map maps from index name to IndexerFunc.
Expand Down Expand Up @@ -185,6 +191,13 @@ func (f *ClientBuilder) WithIndex(obj runtime.Object, field string, extractValue
return f
}

// WithStatusSubresource configures the passed object with a status subresource, which means
// calls to Update and Patch will not alters its status.
func (f *ClientBuilder) WithStatusSubresource(o ...client.Object) *ClientBuilder {
f.withStatusSubresource = append(f.withStatusSubresource, o...)
return f
}

// Build builds and returns a new fake client.
func (f *ClientBuilder) Build() client.WithWatch {
if f.scheme == nil {
Expand All @@ -196,10 +209,19 @@ func (f *ClientBuilder) Build() client.WithWatch {

var tracker versionedTracker

withStatusSubResource := sets.New(inTreeResourcesWithStatus()...)
for _, o := range f.withStatusSubresource {
gvk, err := apiutil.GVKForObject(o, f.scheme)
if err != nil {
panic(fmt.Errorf("failed to get gvk for object %T: %w", withStatusSubResource, err))
}
withStatusSubResource.Insert(gvk)
}

if f.objectTracker == nil {
tracker = versionedTracker{ObjectTracker: testing.NewObjectTracker(f.scheme, scheme.Codecs.UniversalDecoder()), scheme: f.scheme}
tracker = versionedTracker{ObjectTracker: testing.NewObjectTracker(f.scheme, scheme.Codecs.UniversalDecoder()), scheme: f.scheme, withStatusSubresource: withStatusSubResource}
} else {
tracker = versionedTracker{ObjectTracker: f.objectTracker, scheme: f.scheme}
tracker = versionedTracker{ObjectTracker: f.objectTracker, scheme: f.scheme, withStatusSubresource: withStatusSubResource}
}

for _, obj := range f.initObject {
Expand All @@ -217,11 +239,13 @@ func (f *ClientBuilder) Build() client.WithWatch {
panic(fmt.Errorf("failed to add runtime object %v to fake client: %w", obj, err))
}
}

return &fakeClient{
tracker: tracker,
scheme: f.scheme,
restMapper: f.restMapper,
indexes: f.indexes,
tracker: tracker,
scheme: f.scheme,
restMapper: f.restMapper,
indexes: f.indexes,
withStatusSubresource: withStatusSubResource,
}
}

Expand Down Expand Up @@ -318,6 +342,16 @@ func convertFromUnstructuredIfNecessary(s *runtime.Scheme, o runtime.Object) (ru
}

func (t versionedTracker) Update(gvr schema.GroupVersionResource, obj runtime.Object, ns string) error {
isStatus := false
// We apply patches using a client-go reaction that ends up calling the trackers Update. As we can't change
// that reaction, we use the callstack to figure out if this originated from the status client.
if bytes.Contains(debug.Stack(), []byte("sigs.k8s.io/controller-runtime/pkg/client/fake.(*fakeSubResourceClient).Patch")) {
isStatus = true
}
return t.update(gvr, obj, ns, isStatus)
}

func (t versionedTracker) update(gvr schema.GroupVersionResource, obj runtime.Object, ns string, isStatus bool) error {
accessor, err := meta.Accessor(obj)
if err != nil {
return fmt.Errorf("failed to get accessor for object: %w", err)
Expand Down Expand Up @@ -348,6 +382,20 @@ func (t versionedTracker) Update(gvr schema.GroupVersionResource, obj runtime.Ob
return err
}

if t.withStatusSubresource.Has(gvk) {
if isStatus { // copy everything but status and metadata.ResourceVersion from original object
if err := copyNonStatusFrom(oldObject, obj); err != nil {
return fmt.Errorf("failed to copy non-status field for object with status subresouce: %w", err)
}
} else { // copy status from original object
if err := copyStatusFrom(oldObject, obj); err != nil {
return fmt.Errorf("failed to copy the status for object with status subresource: %w", err)
}
}
} else if isStatus {
return apierrors.NewNotFound(gvr.GroupResource(), accessor.GetName())
}

oldAccessor, err := meta.Accessor(oldObject)
if err != nil {
return err
Expand Down Expand Up @@ -689,6 +737,10 @@ func (c *fakeClient) DeleteAllOf(ctx context.Context, obj client.Object, opts ..
}

func (c *fakeClient) Update(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error {
return c.update(obj, false, opts...)
}

func (c *fakeClient) update(obj client.Object, isStatus bool, opts ...client.UpdateOption) error {
updateOptions := &client.UpdateOptions{}
updateOptions.ApplyOptions(opts)

Expand All @@ -706,10 +758,14 @@ func (c *fakeClient) Update(ctx context.Context, obj client.Object, opts ...clie
if err != nil {
return err
}
return c.tracker.Update(gvr, obj, accessor.GetNamespace())
return c.tracker.update(gvr, obj, accessor.GetNamespace(), isStatus)
}

func (c *fakeClient) Patch(ctx context.Context, obj client.Object, patch client.Patch, opts ...client.PatchOption) error {
return c.patch(obj, patch, opts...)
}

func (c *fakeClient) patch(obj client.Object, patch client.Patch, opts ...client.PatchOption) error {
patchOptions := &client.PatchOptions{}
patchOptions.ApplyOptions(opts)

Expand All @@ -732,6 +788,11 @@ func (c *fakeClient) Patch(ctx context.Context, obj client.Object, patch client.
return err
}

gvk, err := apiutil.GVKForObject(obj, c.scheme)
if err != nil {
return err
}

reaction := testing.ObjectReaction(c.tracker)
handled, o, err := reaction(testing.NewPatchAction(gvr, accessor.GetNamespace(), accessor.GetName(), patch.Type(), data))
if err != nil {
Expand All @@ -740,11 +801,6 @@ func (c *fakeClient) Patch(ctx context.Context, obj client.Object, patch client.
if !handled {
panic("tracker could not handle patch method")
}

gvk, err := apiutil.GVKForObject(obj, c.scheme)
if err != nil {
return err
}
ta, err := meta.TypeAccessor(o)
if err != nil {
return err
Expand All @@ -762,6 +818,97 @@ func (c *fakeClient) Patch(ctx context.Context, obj client.Object, patch client.
return err
}

func copyNonStatusFrom(old, new runtime.Object) error {
newClientObject, ok := new.(client.Object)
if !ok {
return fmt.Errorf("%T is not a client.Object", new)
}
// The only thing other than status we have to retain
rv := newClientObject.GetResourceVersion()

oldMapStringAny, err := toMapStringAny(old)
if err != nil {
return fmt.Errorf("failed to convert old to *unstructured.Unstructured: %w", err)
}
newMapStringAny, err := toMapStringAny(new)
if err != nil {
return fmt.Errorf("failed to convert new to *unststructured.Unstructured: %w", err)
}

// delete everything other than status in case it has fields that were not present in
// the old object
for k := range newMapStringAny {
if k != "status" {
delete(newMapStringAny, k)
}
}
// copy everything other than status from the old object
for k := range oldMapStringAny {
if k != "status" {
newMapStringAny[k] = oldMapStringAny[k]
}
}

newClientObject.SetResourceVersion(rv)

if err := fromMapStringAny(newMapStringAny, new); err != nil {
return fmt.Errorf("failed to convert back from map[string]any: %w", err)
}
return nil
}

// copyStatusFrom copies the status from old into new
func copyStatusFrom(old, new runtime.Object) error {
oldMapStringAny, err := toMapStringAny(old)
if err != nil {
return fmt.Errorf("failed to convert old to *unstructured.Unstructured: %w", err)
}
newMapStringAny, err := toMapStringAny(new)
if err != nil {
return fmt.Errorf("failed to convert new to *unststructured.Unstructured: %w", err)
}

newMapStringAny["status"] = oldMapStringAny["status"]

if err := fromMapStringAny(newMapStringAny, new); err != nil {
return fmt.Errorf("failed to convert back from map[string]any: %w", err)
}

return nil
}

func toMapStringAny(obj runtime.Object) (map[string]any, error) {
if unstructured, isUnstructured := obj.(*unstructured.Unstructured); isUnstructured {
return unstructured.Object, nil
}

serialized, err := json.Marshal(obj)
if err != nil {
return nil, err
}

u := map[string]any{}
return u, json.Unmarshal(serialized, &u)
}

func fromMapStringAny(u map[string]any, target runtime.Object) error {
if targetUnstructured, isUnstructured := target.(*unstructured.Unstructured); isUnstructured {
targetUnstructured.Object = u
return nil
}

serialized, err := json.Marshal(u)
if err != nil {
return fmt.Errorf("failed to serialize: %w", err)
}

if err := json.Unmarshal(serialized, &target); err != nil {
return fmt.Errorf("failed to deserialize: %w", err)
}

return nil
}

func (c *fakeClient) Status() client.SubResourceWriter {
return c.SubResource("status")
}
Expand Down Expand Up @@ -809,22 +956,17 @@ func (sw *fakeSubResourceClient) Create(ctx context.Context, obj client.Object,
}

func (sw *fakeSubResourceClient) Update(ctx context.Context, obj client.Object, opts ...client.SubResourceUpdateOption) error {
// TODO(droot): This results in full update of the obj (spec + subresources). Need
// a way to update subresource only.
updateOptions := client.SubResourceUpdateOptions{}
updateOptions.ApplyOptions(opts)

body := obj
if updateOptions.SubResourceBody != nil {
body = updateOptions.SubResourceBody
}
return sw.client.Update(ctx, body, &updateOptions.UpdateOptions)
return sw.client.update(body, true, &updateOptions.UpdateOptions)
}

func (sw *fakeSubResourceClient) Patch(ctx context.Context, obj client.Object, patch client.Patch, opts ...client.SubResourcePatchOption) error {
// TODO(droot): This results in full update of the obj (spec + subresources). Need
// a way to update subresource only.

patchOptions := client.SubResourcePatchOptions{}
patchOptions.ApplyOptions(opts)

Expand All @@ -833,7 +975,7 @@ func (sw *fakeSubResourceClient) Patch(ctx context.Context, obj client.Object, p
body = patchOptions.SubResourceBody
}

return sw.client.Patch(ctx, body, patch, &patchOptions.PatchOptions)
return sw.client.patch(body, patch, &patchOptions.PatchOptions)
}

func allowsUnconditionalUpdate(gvk schema.GroupVersionKind) bool {
Expand Down Expand Up @@ -933,6 +1075,42 @@ func allowsCreateOnUpdate(gvk schema.GroupVersionKind) bool {
return false
}

func inTreeResourcesWithStatus() []schema.GroupVersionKind {
return []schema.GroupVersionKind{
{Version: "v1", Kind: "Namespace"},
{Version: "v1", Kind: "Node"},
{Version: "v1", Kind: "PersistentVolumeClaim"},
{Version: "v1", Kind: "PersistentVolume"},
{Version: "v1", Kind: "Pod"},
{Version: "v1", Kind: "ReplicationController"},
{Version: "v1", Kind: "Service"},

{Group: "apps", Version: "v1", Kind: "Deployment"},
{Group: "apps", Version: "v1", Kind: "DaemonSet"},
{Group: "apps", Version: "v1", Kind: "ReplicaSet"},
{Group: "apps", Version: "v1", Kind: "StatefulSet"},

{Group: "autoscaling", Version: "v1", Kind: "HorizontalPodAutoscaler"},

{Group: "batch", Version: "v1", Kind: "CronJob"},
{Group: "batch", Version: "v1", Kind: "Job"},

{Group: "certificates.k8s.io", Version: "v1", Kind: "CertificateSigningRequest"},

{Group: "networking.k8s.io", Version: "v1", Kind: "Ingress"},
{Group: "networking.k8s.io", Version: "v1", Kind: "NetworkPolicy"},

{Group: "policy", Version: "v1", Kind: "PodDisruptionBudget"},

{Group: "storage.k8s.io", Version: "v1", Kind: "VolumeAttachment"},

{Group: "apiextensions.k8s.io", Version: "v1", Kind: "CustomResourceDefinition"},

{Group: "flowcontrol.apiserver.k8s.io", Version: "v1beta2", Kind: "FlowSchema"},
{Group: "flowcontrol.apiserver.k8s.io", Version: "v1beta2", Kind: "PriorityLevelConfiguration"},
}
}

// zero zeros the value of a pointer.
func zero(x interface{}) {
if x == nil {
Expand Down
Loading

0 comments on commit d569207

Please # to comment.