Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Rework operation poller #245

Merged
merged 2 commits into from
Nov 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ ifneq ($(shell test -d $(MOCKS_DESTINATION); echo $$?), 0)
$(MOCKGEN) --source pkg/cloudmap/client.go --destination $(MOCKS_DESTINATION)/pkg/cloudmap/client_mock.go --package cloudmap_mock
$(MOCKGEN) --source pkg/cloudmap/cache.go --destination $(MOCKS_DESTINATION)/pkg/cloudmap/cache_mock.go --package cloudmap_mock
$(MOCKGEN) --source pkg/cloudmap/operation_poller.go --destination $(MOCKS_DESTINATION)/pkg/cloudmap/operation_poller_mock.go --package cloudmap_mock
$(MOCKGEN) --source pkg/cloudmap/operation_collector.go --destination $(MOCKS_DESTINATION)/pkg/cloudmap/operation_collector_mock.go --package cloudmap_mock
$(MOCKGEN) --source pkg/cloudmap/api.go --destination $(MOCKS_DESTINATION)/pkg/cloudmap/api_mock.go --package cloudmap_mock
$(MOCKGEN) --source pkg/cloudmap/aws_facade.go --destination $(MOCKS_DESTINATION)/pkg/cloudmap/aws_facade_mock.go --package cloudmap_mock
$(MOCKGEN) --source integration/janitor/api.go --destination $(MOCKS_DESTINATION)/integration/janitor/api_mock.go --package janitor_mock
Expand Down
10 changes: 5 additions & 5 deletions integration/janitor/janitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (j *cloudMapJanitor) Cleanup(ctx context.Context, nsName string) {
opId, err := j.sdApi.DeleteNamespace(ctx, ns.Id)
if err == nil {
fmt.Println("namespace delete in progress")
_, err = j.sdApi.PollNamespaceOperation(ctx, opId)
_, err = cloudmap.NewOperationPoller(j.sdApi).Poll(ctx, opId)
}
j.checkOrFail(err, "clean up successful", "could not cleanup namespace")
}
Expand All @@ -87,17 +87,17 @@ func (j *cloudMapJanitor) deregisterInstances(ctx context.Context, nsName string
fmt.Sprintf("service has %d instances to clean", len(insts)),
"could not list instances to cleanup")

opColl := cloudmap.NewOperationCollector()
opPoller := cloudmap.NewOperationPoller(j.sdApi)
for _, inst := range insts {
instId := aws.ToString(inst.InstanceId)
fmt.Printf("found instance to clean: %s\n", instId)
opColl.Add(func() (opId string, err error) {
opPoller.Submit(ctx, func() (opId string, err error) {
return j.sdApi.DeregisterInstance(ctx, svcId, instId)
})
}

opErr := cloudmap.NewDeregisterInstancePoller(j.sdApi, svcId, opColl.Collect(), opColl.GetStartTime()).Poll(ctx)
j.checkOrFail(opErr, "instances de-registered", "could not cleanup instances")
err = opPoller.Await()
j.checkOrFail(err, "instances de-registered", "could not cleanup instances")
}

func (j *cloudMapJanitor) checkOrFail(err error, successMsg string, failMsg string) {
Expand Down
9 changes: 5 additions & 4 deletions integration/janitor/janitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,15 @@ func TestCleanupHappyCase(t *testing.T) {

tj.mockApi.EXPECT().DeregisterInstance(context.TODO(), test.SvcId, test.EndptId1).
Return(test.OpId1, nil)
tj.mockApi.EXPECT().ListOperations(context.TODO(), gomock.Any()).
Return(map[string]types.OperationStatus{test.OpId1: types.OperationStatusSuccess}, nil)
tj.mockApi.EXPECT().GetOperation(context.TODO(), test.OpId1).
Return(&types.Operation{Status: types.OperationStatusSuccess}, nil)
tj.mockApi.EXPECT().DeleteService(context.TODO(), test.SvcId).
Return(nil)
tj.mockApi.EXPECT().DeleteNamespace(context.TODO(), test.HttpNsId).
Return(test.OpId2, nil)
tj.mockApi.EXPECT().PollNamespaceOperation(context.TODO(), test.OpId2).
Return(test.HttpNsId, nil)
tj.mockApi.EXPECT().GetOperation(context.TODO(), test.OpId2).
Return(&types.Operation{Status: types.OperationStatusSuccess,
Targets: map[string]string{string(types.OperationTargetTypeNamespace): test.HttpNsId}}, nil)

tj.janitor.Cleanup(context.TODO(), test.HttpNsName)
assert.False(t, *tj.failed)
Expand Down
94 changes: 19 additions & 75 deletions pkg/cloudmap/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,12 @@ package cloudmap

import (
"context"
"errors"
"fmt"
"time"

"golang.org/x/time/rate"

"github.com/aws/aws-cloud-map-mcs-controller-for-k8s/pkg/common"
"github.com/aws/aws-cloud-map-mcs-controller-for-k8s/pkg/model"
"github.com/aws/aws-sdk-go-v2/aws"
sd "github.com/aws/aws-sdk-go-v2/service/servicediscovery"
"github.com/aws/aws-sdk-go-v2/service/servicediscovery/types"
"k8s.io/apimachinery/pkg/util/wait"
)

const (
Expand All @@ -32,9 +26,6 @@ type ServiceDiscoveryApi interface {
// DiscoverInstances returns a list of service instances registered to a given service.
DiscoverInstances(ctx context.Context, nsName string, svcName string, queryParameters map[string]string) (insts []types.HttpInstanceSummary, err error)

// ListOperations returns a map of operations to their status matching a list of filters.
ListOperations(ctx context.Context, opFilters []types.OperationFilter) (operationStatusMap map[string]types.OperationStatus, err error)

// GetOperation returns an operation.
GetOperation(ctx context.Context, operationId string) (operation *types.Operation, err error)

Expand All @@ -49,32 +40,25 @@ type ServiceDiscoveryApi interface {

// DeregisterInstance de-registers a service instance in Cloud Map.
DeregisterInstance(ctx context.Context, serviceId string, instanceId string) (operationId string, err error)

// PollNamespaceOperation polls a namespace operation, and returns the namespace ID.
PollNamespaceOperation(ctx context.Context, operationId string) (namespaceId string, err error)
}

type serviceDiscoveryApi struct {
log common.Logger
awsFacade AwsFacade
nsRateLimiter *rate.Limiter
svcRateLimiter *rate.Limiter
opRateLimiter *rate.Limiter
log common.Logger
awsFacade AwsFacade
rateLimiter common.RateLimiter
}

// NewServiceDiscoveryApiFromConfig creates a new AWS Cloud Map API connection manager from an AWS client config.
func NewServiceDiscoveryApiFromConfig(cfg *aws.Config) ServiceDiscoveryApi {
return &serviceDiscoveryApi{
log: common.NewLogger("cloudmap", "api"),
awsFacade: NewAwsFacadeFromConfig(cfg),
nsRateLimiter: rate.NewLimiter(rate.Every(1*time.Second), 5), // 1 per second
svcRateLimiter: rate.NewLimiter(rate.Every(2*time.Second), 10), // 2 per second
opRateLimiter: rate.NewLimiter(rate.Every(100*time.Second), 200), // 100 per second
log: common.NewLogger("cloudmap", "api"),
awsFacade: NewAwsFacadeFromConfig(cfg),
rateLimiter: common.NewDefaultRateLimiter(),
}
}

func (sdApi *serviceDiscoveryApi) GetNamespaceMap(ctx context.Context) (map[string]*model.Namespace, error) {
err := sdApi.nsRateLimiter.Wait(ctx)
err := sdApi.rateLimiter.Wait(ctx, common.ListNamespaces)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -105,7 +89,7 @@ func (sdApi *serviceDiscoveryApi) GetNamespaceMap(ctx context.Context) (map[stri
}

func (sdApi *serviceDiscoveryApi) GetServiceIdMap(ctx context.Context, nsId string) (map[string]string, error) {
err := sdApi.svcRateLimiter.Wait(ctx)
err := sdApi.rateLimiter.Wait(ctx, common.ListServices)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -151,30 +135,8 @@ func (sdApi *serviceDiscoveryApi) DiscoverInstances(ctx context.Context, nsName
return out.Instances, nil
}

func (sdApi *serviceDiscoveryApi) ListOperations(ctx context.Context, opFilters []types.OperationFilter) (map[string]types.OperationStatus, error) {
opStatusMap := make(map[string]types.OperationStatus)

pages := sd.NewListOperationsPaginator(sdApi.awsFacade, &sd.ListOperationsInput{
Filters: opFilters,
})

for pages.HasMorePages() {
output, err := pages.NextPage(ctx)

if err != nil {
return opStatusMap, err
}

for _, sdOp := range output.Operations {
opStatusMap[aws.ToString(sdOp.Id)] = sdOp.Status
}
}

return opStatusMap, nil
}

func (sdApi *serviceDiscoveryApi) GetOperation(ctx context.Context, opId string) (operation *types.Operation, err error) {
err = sdApi.opRateLimiter.Wait(ctx)
err = sdApi.rateLimiter.Wait(ctx, common.GetOperation)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -236,6 +198,11 @@ func (sdApi *serviceDiscoveryApi) getDnsConfig() types.DnsConfig {
}

func (sdApi *serviceDiscoveryApi) RegisterInstance(ctx context.Context, svcId string, instId string, instAttrs map[string]string) (opId string, err error) {
err = sdApi.rateLimiter.Wait(ctx, common.RegisterInstance)
if err != nil {
return "", err
}

regResp, err := sdApi.awsFacade.RegisterInstance(ctx, &sd.RegisterInstanceInput{
Attributes: instAttrs,
InstanceId: &instId,
Expand All @@ -250,6 +217,11 @@ func (sdApi *serviceDiscoveryApi) RegisterInstance(ctx context.Context, svcId st
}

func (sdApi *serviceDiscoveryApi) DeregisterInstance(ctx context.Context, svcId string, instId string) (opId string, err error) {
err = sdApi.rateLimiter.Wait(ctx, common.DeregisterInstance)
if err != nil {
return "", err
}

deregResp, err := sdApi.awsFacade.DeregisterInstance(ctx, &sd.DeregisterInstanceInput{
InstanceId: &instId,
ServiceId: &svcId,
Expand All @@ -261,31 +233,3 @@ func (sdApi *serviceDiscoveryApi) DeregisterInstance(ctx context.Context, svcId

return aws.ToString(deregResp.OperationId), err
}

func (sdApi *serviceDiscoveryApi) PollNamespaceOperation(ctx context.Context, opId string) (nsId string, err error) {
err = wait.Poll(defaultOperationPollInterval, defaultOperationPollTimeout, func() (done bool, err error) {
sdApi.log.Info("polling operation", "opId", opId)
op, err := sdApi.GetOperation(ctx, opId)

if err != nil {
return true, err
}

if op.Status == types.OperationStatusFail {
return true, fmt.Errorf("failed to create namespace: %s", aws.ToString(op.ErrorMessage))
}

if op.Status == types.OperationStatusSuccess {
nsId = op.Targets[string(types.OperationTargetTypeNamespace)]
return true, nil
}

return false, nil
})

if err == wait.ErrWaitTimeout {
err = errors.New(operationPollTimoutErrorMessage)
}

return nsId, err
}
50 changes: 3 additions & 47 deletions pkg/cloudmap/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@ import (
"errors"
"fmt"
"testing"
"time"

"golang.org/x/time/rate"

aboutv1alpha1 "github.com/aws/aws-cloud-map-mcs-controller-for-k8s/pkg/apis/about/v1alpha1"

Expand Down Expand Up @@ -131,26 +128,6 @@ func TestServiceDiscoveryApi_DiscoverInstances_HappyCase(t *testing.T) {
assert.Equal(t, test.EndptId2, *insts[1].InstanceId)
}

func TestServiceDiscoveryApi_ListOperations_HappyCase(t *testing.T) {
mockController := gomock.NewController(t)
defer mockController.Finish()

awsFacade := cloudmapMock.NewMockAwsFacade(mockController)
sdApi := getServiceDiscoveryApi(t, awsFacade)

filters := make([]types.OperationFilter, 0)
awsFacade.EXPECT().ListOperations(context.TODO(), &sd.ListOperationsInput{Filters: filters}).
Return(&sd.ListOperationsOutput{
Operations: []types.OperationSummary{
{Id: aws.String(test.OpId1), Status: types.OperationStatusSuccess},
}}, nil)

ops, err := sdApi.ListOperations(context.TODO(), filters)
assert.Nil(t, err, "No error for happy case")
assert.True(t, len(ops) == 1)
assert.Equal(t, ops[test.OpId1], types.OperationStatusSuccess)
}

func TestServiceDiscoveryApi_GetOperation_HappyCase(t *testing.T) {
mockController := gomock.NewController(t)
defer mockController.Finish()
Expand Down Expand Up @@ -316,33 +293,12 @@ func TestServiceDiscoveryApi_DeregisterInstance_Error(t *testing.T) {
assert.Equal(t, sdkErr, err)
}

func TestServiceDiscoveryApi_PollNamespaceOperation_HappyCase(t *testing.T) {
mockController := gomock.NewController(t)
defer mockController.Finish()

awsFacade := cloudmapMock.NewMockAwsFacade(mockController)
awsFacade.EXPECT().GetOperation(context.TODO(), &sd.GetOperationInput{OperationId: aws.String(test.OpId1)}).
Return(&sd.GetOperationOutput{Operation: &types.Operation{Status: types.OperationStatusPending}}, nil)

awsFacade.EXPECT().GetOperation(context.TODO(), &sd.GetOperationInput{OperationId: aws.String(test.OpId1)}).
Return(&sd.GetOperationOutput{Operation: &types.Operation{Status: types.OperationStatusSuccess,
Targets: map[string]string{string(types.OperationTargetTypeNamespace): test.HttpNsId}}}, nil)

sdApi := getServiceDiscoveryApi(t, awsFacade)

nsId, err := sdApi.PollNamespaceOperation(context.TODO(), test.OpId1)
assert.Nil(t, err)
assert.Equal(t, test.HttpNsId, nsId)
}

func getServiceDiscoveryApi(t *testing.T, awsFacade *cloudmapMock.MockAwsFacade) ServiceDiscoveryApi {
scheme := runtime.NewScheme()
scheme.AddKnownTypes(aboutv1alpha1.GroupVersion, &aboutv1alpha1.ClusterProperty{})
return &serviceDiscoveryApi{
log: common.NewLoggerWithLogr(testr.New(t)),
awsFacade: awsFacade,
nsRateLimiter: rate.NewLimiter(rate.Every(1*time.Second), 2), // 1 per second
svcRateLimiter: rate.NewLimiter(rate.Every(2*time.Second), 4), // 2 per second
opRateLimiter: rate.NewLimiter(rate.Every(10*time.Second), 100), // 10 per second
log: common.NewLoggerWithLogr(testr.New(t)),
awsFacade: awsFacade,
rateLimiter: common.NewDefaultRateLimiter(),
}
}
38 changes: 14 additions & 24 deletions pkg/cloudmap/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/aws/aws-cloud-map-mcs-controller-for-k8s/pkg/common"
"github.com/aws/aws-cloud-map-mcs-controller-for-k8s/pkg/model"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/servicediscovery/types"
)

// ServiceDiscoveryClient provides the service endpoint management functionality required by the AWS Cloud Map
Expand Down Expand Up @@ -149,27 +150,21 @@ func (sdc *serviceDiscoveryClient) RegisterEndpoints(ctx context.Context, nsName
return err
}

opCollector := NewOperationCollector()

operationPoller := NewOperationPoller(sdc.sdApi)
for _, endpt := range endpts {
endptId := endpt.Id
endptAttrs := endpt.GetCloudMapAttributes()
opCollector.Add(func() (opId string, err error) {
operationPoller.Submit(ctx, func() (opId string, err error) {
return sdc.sdApi.RegisterInstance(ctx, svcId, endptId, endptAttrs)
})
}

err = NewRegisterInstancePoller(sdc.sdApi, svcId, opCollector.Collect(), opCollector.GetStartTime()).Poll(ctx)

// Evict cache entry so next list call reflects changes
sdc.cache.EvictEndpoints(nsName, svcName)

err = operationPoller.Await()
if err != nil {
return err
}

if !opCollector.IsAllOperationsCreated() {
return errors.New("failure while registering endpoints")
return common.Wrap(err, errors.New("failure while registering endpoints"))
}

return nil
Expand All @@ -188,29 +183,23 @@ func (sdc *serviceDiscoveryClient) DeleteEndpoints(ctx context.Context, nsName s
return err
}

opCollector := NewOperationCollector()

operationPoller := NewOperationPoller(sdc.sdApi)
for _, endpt := range endpts {
endptId := endpt.Id
// add operation to delete endpoint
opCollector.Add(func() (opId string, err error) {
operationPoller.Submit(ctx, func() (opId string, err error) {
return sdc.sdApi.DeregisterInstance(ctx, svcId, endptId)
})
}

err = NewDeregisterInstancePoller(sdc.sdApi, svcId, opCollector.Collect(), opCollector.GetStartTime()).Poll(ctx)

// Evict cache entry so next list call reflects changes
sdc.cache.EvictEndpoints(nsName, svcName)
if err != nil {
return err
}

if !opCollector.IsAllOperationsCreated() {
return errors.New("failure while de-registering endpoints")
err = operationPoller.Await()
if err != nil {
return common.Wrap(err, errors.New("failure while de-registering endpoints"))
}

return nil
return err
}

func (sdc *serviceDiscoveryClient) getEndpoints(ctx context.Context, nsName string, svcName string) (endpts []*model.Endpoint, err error) {
Expand Down Expand Up @@ -315,12 +304,13 @@ func (sdc *serviceDiscoveryClient) createNamespace(ctx context.Context, nsName s
return nil, err
}

nsId, err := sdc.sdApi.PollNamespaceOperation(ctx, opId)
op, err := NewOperationPoller(sdc.sdApi).Poll(ctx, opId)
if err != nil {
return nil, err
}
nsId := op.Targets[string(types.OperationTargetTypeNamespace)]

sdc.log.Info("namespace created", "nsId", nsId)
sdc.log.Info("namespace created", "nsId", nsId, "namespace", nsName)

// Default namespace type HTTP
namespace = &model.Namespace{
Expand Down
Loading