Skip to content

Commit

Permalink
Replace ListOperations with GetOperation because it has higher limits…
Browse files Browse the repository at this point in the history
…. Rework the poller code to keep the OperationPolling simple with Submit - submitting operations, Poll - polling operations till complete or timeout, and Await - waiting for the all the submitted operation's result.
  • Loading branch information
runakash committed Nov 10, 2022
1 parent 0f47392 commit 9abe508
Show file tree
Hide file tree
Showing 11 changed files with 322 additions and 608 deletions.
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ ifneq ($(shell test -d $(MOCKS_DESTINATION); echo $$?), 0)
$(MOCKGEN) --source pkg/cloudmap/client.go --destination $(MOCKS_DESTINATION)/pkg/cloudmap/client_mock.go --package cloudmap_mock
$(MOCKGEN) --source pkg/cloudmap/cache.go --destination $(MOCKS_DESTINATION)/pkg/cloudmap/cache_mock.go --package cloudmap_mock
$(MOCKGEN) --source pkg/cloudmap/operation_poller.go --destination $(MOCKS_DESTINATION)/pkg/cloudmap/operation_poller_mock.go --package cloudmap_mock
$(MOCKGEN) --source pkg/cloudmap/operation_collector.go --destination $(MOCKS_DESTINATION)/pkg/cloudmap/operation_collector_mock.go --package cloudmap_mock
$(MOCKGEN) --source pkg/cloudmap/api.go --destination $(MOCKS_DESTINATION)/pkg/cloudmap/api_mock.go --package cloudmap_mock
$(MOCKGEN) --source pkg/cloudmap/aws_facade.go --destination $(MOCKS_DESTINATION)/pkg/cloudmap/aws_facade_mock.go --package cloudmap_mock
$(MOCKGEN) --source integration/janitor/api.go --destination $(MOCKS_DESTINATION)/integration/janitor/api_mock.go --package janitor_mock
Expand Down
17 changes: 10 additions & 7 deletions integration/janitor/janitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (j *cloudMapJanitor) Cleanup(ctx context.Context, nsName string) {
opId, err := j.sdApi.DeleteNamespace(ctx, ns.Id)
if err == nil {
fmt.Println("namespace delete in progress")
_, err = j.sdApi.PollNamespaceOperation(ctx, opId)
_, err = cloudmap.NewOperationPoller(j.sdApi).Poll(ctx, opId)
}
j.checkOrFail(err, "clean up successful", "could not cleanup namespace")
}
Expand All @@ -87,17 +87,20 @@ func (j *cloudMapJanitor) deregisterInstances(ctx context.Context, nsName string
fmt.Sprintf("service has %d instances to clean", len(insts)),
"could not list instances to cleanup")

opColl := cloudmap.NewOperationCollector()
opPoller := cloudmap.NewOperationPoller(j.sdApi)
for _, inst := range insts {
instId := aws.ToString(inst.InstanceId)
fmt.Printf("found instance to clean: %s\n", instId)
opColl.Add(func() (opId string, err error) {
return j.sdApi.DeregisterInstance(ctx, svcId, instId)
})
opId, _err := j.sdApi.DeregisterInstance(ctx, svcId, instId)
if _err != nil {
fmt.Printf("DeregisterInstance failed for the service %s, %s", svcId, _err.Error())
j.fail()
}
opPoller.Submit(ctx, opId)
}

opErr := cloudmap.NewDeregisterInstancePoller(j.sdApi, svcId, opColl.Collect(), opColl.GetStartTime()).Poll(ctx)
j.checkOrFail(opErr, "instances de-registered", "could not cleanup instances")
err = opPoller.Await()
j.checkOrFail(err, "instances de-registered", "could not cleanup instances")
}

func (j *cloudMapJanitor) checkOrFail(err error, successMsg string, failMsg string) {
Expand Down
9 changes: 5 additions & 4 deletions integration/janitor/janitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,15 @@ func TestCleanupHappyCase(t *testing.T) {

tj.mockApi.EXPECT().DeregisterInstance(context.TODO(), test.SvcId, test.EndptId1).
Return(test.OpId1, nil)
tj.mockApi.EXPECT().ListOperations(context.TODO(), gomock.Any()).
Return(map[string]types.OperationStatus{test.OpId1: types.OperationStatusSuccess}, nil)
tj.mockApi.EXPECT().GetOperation(context.TODO(), test.OpId1).
Return(&types.Operation{Status: types.OperationStatusSuccess}, nil)
tj.mockApi.EXPECT().DeleteService(context.TODO(), test.SvcId).
Return(nil)
tj.mockApi.EXPECT().DeleteNamespace(context.TODO(), test.HttpNsId).
Return(test.OpId2, nil)
tj.mockApi.EXPECT().PollNamespaceOperation(context.TODO(), test.OpId2).
Return(test.HttpNsId, nil)
tj.mockApi.EXPECT().GetOperation(context.TODO(), test.OpId2).
Return(&types.Operation{Status: types.OperationStatusSuccess,
Targets: map[string]string{string(types.OperationTargetTypeNamespace): test.HttpNsId}}, nil)

tj.janitor.Cleanup(context.TODO(), test.HttpNsName)
assert.False(t, *tj.failed)
Expand Down
59 changes: 0 additions & 59 deletions pkg/cloudmap/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package cloudmap

import (
"context"
"errors"
"fmt"
"time"

"golang.org/x/time/rate"
Expand All @@ -13,7 +11,6 @@ import (
"github.com/aws/aws-sdk-go-v2/aws"
sd "github.com/aws/aws-sdk-go-v2/service/servicediscovery"
"github.com/aws/aws-sdk-go-v2/service/servicediscovery/types"
"k8s.io/apimachinery/pkg/util/wait"
)

const (
Expand All @@ -32,9 +29,6 @@ type ServiceDiscoveryApi interface {
// DiscoverInstances returns a list of service instances registered to a given service.
DiscoverInstances(ctx context.Context, nsName string, svcName string, queryParameters map[string]string) (insts []types.HttpInstanceSummary, err error)

// ListOperations returns a map of operations to their status matching a list of filters.
ListOperations(ctx context.Context, opFilters []types.OperationFilter) (operationStatusMap map[string]types.OperationStatus, err error)

// GetOperation returns an operation.
GetOperation(ctx context.Context, operationId string) (operation *types.Operation, err error)

Expand All @@ -49,9 +43,6 @@ type ServiceDiscoveryApi interface {

// DeregisterInstance de-registers a service instance in Cloud Map.
DeregisterInstance(ctx context.Context, serviceId string, instanceId string) (operationId string, err error)

// PollNamespaceOperation polls a namespace operation, and returns the namespace ID.
PollNamespaceOperation(ctx context.Context, operationId string) (namespaceId string, err error)
}

type serviceDiscoveryApi struct {
Expand Down Expand Up @@ -151,28 +142,6 @@ func (sdApi *serviceDiscoveryApi) DiscoverInstances(ctx context.Context, nsName
return out.Instances, nil
}

func (sdApi *serviceDiscoveryApi) ListOperations(ctx context.Context, opFilters []types.OperationFilter) (map[string]types.OperationStatus, error) {
opStatusMap := make(map[string]types.OperationStatus)

pages := sd.NewListOperationsPaginator(sdApi.awsFacade, &sd.ListOperationsInput{
Filters: opFilters,
})

for pages.HasMorePages() {
output, err := pages.NextPage(ctx)

if err != nil {
return opStatusMap, err
}

for _, sdOp := range output.Operations {
opStatusMap[aws.ToString(sdOp.Id)] = sdOp.Status
}
}

return opStatusMap, nil
}

func (sdApi *serviceDiscoveryApi) GetOperation(ctx context.Context, opId string) (operation *types.Operation, err error) {
err = sdApi.opRateLimiter.Wait(ctx)
if err != nil {
Expand Down Expand Up @@ -261,31 +230,3 @@ func (sdApi *serviceDiscoveryApi) DeregisterInstance(ctx context.Context, svcId

return aws.ToString(deregResp.OperationId), err
}

func (sdApi *serviceDiscoveryApi) PollNamespaceOperation(ctx context.Context, opId string) (nsId string, err error) {
err = wait.Poll(defaultOperationPollInterval, defaultOperationPollTimeout, func() (done bool, err error) {
sdApi.log.Info("polling operation", "opId", opId)
op, err := sdApi.GetOperation(ctx, opId)

if err != nil {
return true, err
}

if op.Status == types.OperationStatusFail {
return true, fmt.Errorf("failed to create namespace: %s", aws.ToString(op.ErrorMessage))
}

if op.Status == types.OperationStatusSuccess {
nsId = op.Targets[string(types.OperationTargetTypeNamespace)]
return true, nil
}

return false, nil
})

if err == wait.ErrWaitTimeout {
err = errors.New(operationPollTimoutErrorMessage)
}

return nsId, err
}
39 changes: 0 additions & 39 deletions pkg/cloudmap/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,26 +131,6 @@ func TestServiceDiscoveryApi_DiscoverInstances_HappyCase(t *testing.T) {
assert.Equal(t, test.EndptId2, *insts[1].InstanceId)
}

func TestServiceDiscoveryApi_ListOperations_HappyCase(t *testing.T) {
mockController := gomock.NewController(t)
defer mockController.Finish()

awsFacade := cloudmapMock.NewMockAwsFacade(mockController)
sdApi := getServiceDiscoveryApi(t, awsFacade)

filters := make([]types.OperationFilter, 0)
awsFacade.EXPECT().ListOperations(context.TODO(), &sd.ListOperationsInput{Filters: filters}).
Return(&sd.ListOperationsOutput{
Operations: []types.OperationSummary{
{Id: aws.String(test.OpId1), Status: types.OperationStatusSuccess},
}}, nil)

ops, err := sdApi.ListOperations(context.TODO(), filters)
assert.Nil(t, err, "No error for happy case")
assert.True(t, len(ops) == 1)
assert.Equal(t, ops[test.OpId1], types.OperationStatusSuccess)
}

func TestServiceDiscoveryApi_GetOperation_HappyCase(t *testing.T) {
mockController := gomock.NewController(t)
defer mockController.Finish()
Expand Down Expand Up @@ -316,25 +296,6 @@ func TestServiceDiscoveryApi_DeregisterInstance_Error(t *testing.T) {
assert.Equal(t, sdkErr, err)
}

func TestServiceDiscoveryApi_PollNamespaceOperation_HappyCase(t *testing.T) {
mockController := gomock.NewController(t)
defer mockController.Finish()

awsFacade := cloudmapMock.NewMockAwsFacade(mockController)
awsFacade.EXPECT().GetOperation(context.TODO(), &sd.GetOperationInput{OperationId: aws.String(test.OpId1)}).
Return(&sd.GetOperationOutput{Operation: &types.Operation{Status: types.OperationStatusPending}}, nil)

awsFacade.EXPECT().GetOperation(context.TODO(), &sd.GetOperationInput{OperationId: aws.String(test.OpId1)}).
Return(&sd.GetOperationOutput{Operation: &types.Operation{Status: types.OperationStatusSuccess,
Targets: map[string]string{string(types.OperationTargetTypeNamespace): test.HttpNsId}}}, nil)

sdApi := getServiceDiscoveryApi(t, awsFacade)

nsId, err := sdApi.PollNamespaceOperation(context.TODO(), test.OpId1)
assert.Nil(t, err)
assert.Equal(t, test.HttpNsId, nsId)
}

func getServiceDiscoveryApi(t *testing.T, awsFacade *cloudmapMock.MockAwsFacade) ServiceDiscoveryApi {
scheme := runtime.NewScheme()
scheme.AddKnownTypes(aboutv1alpha1.GroupVersion, &aboutv1alpha1.ClusterProperty{})
Expand Down
52 changes: 22 additions & 30 deletions pkg/cloudmap/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/aws/aws-cloud-map-mcs-controller-for-k8s/pkg/common"
"github.com/aws/aws-cloud-map-mcs-controller-for-k8s/pkg/model"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/servicediscovery/types"
)

// ServiceDiscoveryClient provides the service endpoint management functionality required by the AWS Cloud Map
Expand Down Expand Up @@ -149,27 +150,22 @@ func (sdc *serviceDiscoveryClient) RegisterEndpoints(ctx context.Context, nsName
return err
}

opCollector := NewOperationCollector()

operationPoller := NewOperationPoller(sdc.sdApi)
for _, endpt := range endpts {
endptId := endpt.Id
endptAttrs := endpt.GetCloudMapAttributes()
opCollector.Add(func() (opId string, err error) {
return sdc.sdApi.RegisterInstance(ctx, svcId, endptId, endptAttrs)
})
opId, endptErr := sdc.sdApi.RegisterInstance(ctx, svcId, endpt.Id, endptAttrs)
if endptErr != nil {
err = common.Wrap(err, endptErr)
}
operationPoller.Submit(ctx, opId)
}

err = NewRegisterInstancePoller(sdc.sdApi, svcId, opCollector.Collect(), opCollector.GetStartTime()).Poll(ctx)

// Evict cache entry so next list call reflects changes
sdc.cache.EvictEndpoints(nsName, svcName)

err = operationPoller.Await()
if err != nil {
return err
}

if !opCollector.IsAllOperationsCreated() {
return errors.New("failure while registering endpoints")
return common.Wrap(err, errors.New("failure while registering endpoints"))
}

return nil
Expand All @@ -188,29 +184,24 @@ func (sdc *serviceDiscoveryClient) DeleteEndpoints(ctx context.Context, nsName s
return err
}

opCollector := NewOperationCollector()

operationPoller := NewOperationPoller(sdc.sdApi)
for _, endpt := range endpts {
endptId := endpt.Id
// add operation to delete endpoint
opCollector.Add(func() (opId string, err error) {
return sdc.sdApi.DeregisterInstance(ctx, svcId, endptId)
})
opId, endptErr := sdc.sdApi.DeregisterInstance(ctx, svcId, endpt.Id)
if endptErr != nil {
err = common.Wrap(err, endptErr)
}
operationPoller.Submit(ctx, opId)
}

err = NewDeregisterInstancePoller(sdc.sdApi, svcId, opCollector.Collect(), opCollector.GetStartTime()).Poll(ctx)

// Evict cache entry so next list call reflects changes
sdc.cache.EvictEndpoints(nsName, svcName)
if err != nil {
return err
}

if !opCollector.IsAllOperationsCreated() {
return errors.New("failure while de-registering endpoints")
err = operationPoller.Await()
if err != nil {
return common.Wrap(err, errors.New("failure while de-registering endpoints"))
}

return nil
return err
}

func (sdc *serviceDiscoveryClient) getEndpoints(ctx context.Context, nsName string, svcName string) (endpts []*model.Endpoint, err error) {
Expand Down Expand Up @@ -315,12 +306,13 @@ func (sdc *serviceDiscoveryClient) createNamespace(ctx context.Context, nsName s
return nil, err
}

nsId, err := sdc.sdApi.PollNamespaceOperation(ctx, opId)
op, err := NewOperationPoller(sdc.sdApi).Poll(ctx, opId)
if err != nil {
return nil, err
}
nsId := op.Targets[string(types.OperationTargetTypeNamespace)]

sdc.log.Info("namespace created", "nsId", nsId)
sdc.log.Info("namespace created", "nsId", nsId, "namespace", nsName)

// Default namespace type HTTP
namespace = &model.Namespace{
Expand Down
Loading

0 comments on commit 9abe508

Please # to comment.