From 9e1dc8422f2ca0d17b629913321554601cb8e64f Mon Sep 17 00:00:00 2001 From: c-kruse Date: Fri, 1 Dec 2023 16:19:52 -0800 Subject: [PATCH 1/2] Update policy status on CRD creation/removal Issue #1291 Updates the flow controller to periodically poll for changes to the skupper policy CRD, and to send a flow record to update the site. Changes the collector to update network status when an update to a site is detected. Signed-off-by: c-kruse --- cmd/service-controller/controller.go | 4 +- pkg/domain/podman/controller/controller.go | 2 +- pkg/flow/controller.go | 175 ++++++++++++++++----- pkg/flow/controller_test.go | 155 +++++++++++++++++- pkg/flow/flow_mem_driver.go | 10 +- 5 files changed, 296 insertions(+), 50 deletions(-) diff --git a/cmd/service-controller/controller.go b/cmd/service-controller/controller.go index c5bd9215d..9b5d90b24 100644 --- a/cmd/service-controller/controller.go +++ b/cmd/service-controller/controller.go @@ -217,7 +217,9 @@ func NewController(cli *client.VanClient, origin string, tlsConfig *certs.TlsCon controller.serviceSync = service_sync.NewServiceSync(origin, ttl, version.Version, qdr.NewConnectionFactory("amqps://"+types.QualifiedServiceName(types.LocalTransportServiceName, cli.Namespace)+":5671", tlsConfig), handler, controller.eventHandler) - controller.flowController = flow.NewFlowController(origin, version.Version, siteCreationTime, qdr.NewConnectionFactory("amqps://"+types.QualifiedServiceName(types.LocalTransportServiceName, cli.Namespace)+":5671", tlsConfig)) + controller.flowController = flow.NewFlowController(origin, version.Version, siteCreationTime, + qdr.NewConnectionFactory("amqps://"+types.QualifiedServiceName(types.LocalTransportServiceName, cli.Namespace)+":5671", tlsConfig), + client.NewClusterPolicyValidator(cli)) ipHandler := func(deleted bool, name string, process *flow.ProcessRecord) error { return flow.UpdateProcess(controller.flowController, deleted, name, process) } diff --git a/pkg/domain/podman/controller/controller.go b/pkg/domain/podman/controller/controller.go index 00c61dcb8..1c73cb892 100644 --- a/pkg/domain/podman/controller/controller.go +++ b/pkg/domain/podman/controller/controller.go @@ -76,7 +76,7 @@ func (c *ControllerPodman) Run(stopCh <-chan struct{}) error { c.site = sitePodman siteCreationTime := uint64(time.Now().UnixNano()) / uint64(time.Microsecond) - flowController := flow.NewFlowController(c.origin, version.Version, siteCreationTime, qdr.NewConnectionFactory("amqps://"+types.LocalTransportServiceName+":5671", c.tlsConfig)) + flowController := flow.NewFlowController(c.origin, version.Version, siteCreationTime, qdr.NewConnectionFactory("amqps://"+types.LocalTransportServiceName+":5671", c.tlsConfig), flow.WithPolicyDisabled) flowController.Start(stopCh) log.Println("Started flow-controller") diff --git a/pkg/flow/controller.go b/pkg/flow/controller.go index f7dde8f3f..663b66a2f 100644 --- a/pkg/flow/controller.go +++ b/pkg/flow/controller.go @@ -4,10 +4,10 @@ import ( "log" "os" "strings" + "sync" "time" "github.com/skupperproject/skupper/api/types" - "github.com/skupperproject/skupper/client" "github.com/skupperproject/skupper/pkg/config" "github.com/skupperproject/skupper/pkg/messaging" ) @@ -18,8 +18,6 @@ const ( type FlowController struct { origin string - creationTime uint64 - version string connectionFactory messaging.ConnectionFactory beaconOutgoing chan interface{} heartbeatOutgoing chan interface{} @@ -29,14 +27,19 @@ type FlowController struct { processRecords map[string]*ProcessRecord hostRecords map[string]*HostRecord hostOutgoing chan *HostRecord + siteController siteController startTime int64 } -func NewFlowController(origin string, version string, creationTime uint64, connectionFactory messaging.ConnectionFactory) *FlowController { +type PolicyEvaluator interface { + Enabled() bool +} + +const WithPolicyDisabled = policyEnabledConst(false) + +func NewFlowController(origin string, version string, creationTime uint64, connectionFactory messaging.ConnectionFactory, policyEvaluator PolicyEvaluator) *FlowController { fc := &FlowController{ origin: origin, - creationTime: creationTime, - version: version, connectionFactory: connectionFactory, beaconOutgoing: make(chan interface{}, 10), heartbeatOutgoing: make(chan interface{}, 10), @@ -46,6 +49,7 @@ func NewFlowController(origin string, version string, creationTime uint64, conne processRecords: make(map[string]*ProcessRecord), hostRecords: make(map[string]*HostRecord), hostOutgoing: make(chan *HostRecord, 10), + siteController: newSiteController(creationTime, version, policyEvaluator), startTime: time.Now().Unix(), } return fc @@ -159,46 +163,18 @@ func (c *FlowController) updateHeartbeats(stopCh <-chan struct{}) { } } -func (c *FlowController) updateRecords(stopCh <-chan struct{}) { +func (c *FlowController) updateRecords(stopCh <-chan struct{}, siteRecordsIncoming <-chan *SiteRecord) { tickerAge := time.NewTicker(10 * time.Minute) defer tickerAge.Stop() - - name := os.Getenv("SKUPPER_SITE_NAME") - nameSpace := os.Getenv("SKUPPER_NAMESPACE") - policy := Disabled - var platformStr string - platform := config.GetPlatform() - if platform == "" || platform == types.PlatformKubernetes { - platformStr = string(types.PlatformKubernetes) - cli, err := client.NewClient(nameSpace, "", "") - if err == nil { - cpv := client.NewClusterPolicyValidator(cli) - if cpv.Enabled() { - policy = Enabled - } - } - } else if platform == types.PlatformPodman { - platformStr = string(types.PlatformPodman) - } - site := &SiteRecord{ - Base: Base{ - RecType: recordNames[Site], - Identity: os.Getenv("SKUPPER_SITE_ID"), - StartTime: c.creationTime, - }, - Name: &name, - NameSpace: &nameSpace, - Platform: &platformStr, - Version: &c.version, - Policy: &policy, - } - - c.recordOutgoing <- site - for { select { case process := <-c.processOutgoing: c.recordOutgoing <- process + case site, ok := <-siteRecordsIncoming: + if !ok { + continue + } + c.recordOutgoing <- site case host := <-c.hostOutgoing: c.recordOutgoing <- host case flushUpdates := <-c.flushIncoming: @@ -208,7 +184,7 @@ func (c *FlowController) updateRecords(stopCh <-chan struct{}) { log.Println("Unable to convert interface to flush") } } - c.recordOutgoing <- site + c.recordOutgoing <- c.siteController.Record() for _, process := range c.processRecords { c.recordOutgoing <- process } @@ -240,7 +216,7 @@ func (c *FlowController) run(stopCh <-chan struct{}) { go c.updateBeacon(stopCh) go c.updateHeartbeats(stopCh) - go c.updateRecords(stopCh) + go c.updateRecords(stopCh, c.siteController.Start(stopCh)) <-stopCh beaconSender.stop() @@ -248,3 +224,118 @@ func (c *FlowController) run(stopCh <-chan struct{}) { recordSender.stop() flushReceiver.stop() } + +type siteController struct { + mu sync.Mutex + Identity string + CreatedAt uint64 + Version string + Name string + Namespace string + Platform string + PolicyEnabled bool + + policyEvaluator PolicyEvaluator + pollInterval time.Duration +} + +func newSiteController(createdAt uint64, version string, policyEvaluator PolicyEvaluator) siteController { + var policy bool + var platformStr string + platform := config.GetPlatform() + if platform == "" || platform == types.PlatformKubernetes { + platformStr = string(types.PlatformKubernetes) + policy = policyEvaluator.Enabled() + } else if platform == types.PlatformPodman { + platformStr = string(types.PlatformPodman) + } + return siteController{ + Identity: os.Getenv("SKUPPER_SITE_ID"), + CreatedAt: createdAt, + Version: version, + Name: os.Getenv("SKUPPER_SITE_NAME"), + Namespace: os.Getenv("SKUPPER_NAMESPACE"), + Platform: platformStr, + PolicyEnabled: policy, + + policyEvaluator: policyEvaluator, + } +} + +func (c *siteController) Start(stopCh <-chan struct{}) <-chan *SiteRecord { + updates := make(chan *SiteRecord, 1) + + go func() { + defer close(updates) + updates <- c.Record() + + if c.Platform != string(types.PlatformKubernetes) { + return + } + // watch for changes to policy enabled + pollInterval := c.pollInterval + if pollInterval <= 0 { + pollInterval = time.Second * 30 + } + ticker := time.NewTicker(pollInterval) + defer ticker.Stop() + for { + select { + case <-stopCh: + return + case <-ticker.C: + if update := c.updatePolicy(); update != nil { + updates <- update + } + } + } + }() + return updates +} + +func (c *siteController) updatePolicy() *SiteRecord { + c.mu.Lock() + defer c.mu.Unlock() + enabled := c.policyEvaluator.Enabled() + if enabled == c.PolicyEnabled { + return nil + } + c.PolicyEnabled = enabled + policy := Disabled + if enabled { + policy = Enabled + } + return &SiteRecord{ + Base: Base{ + RecType: recordNames[Site], + Identity: c.Identity, + }, + Policy: &policy, + } + +} + +func (c *siteController) Record() *SiteRecord { + c.mu.Lock() + defer c.mu.Unlock() + policy := Disabled + if c.PolicyEnabled { + policy = Enabled + } + return &SiteRecord{ + Base: Base{ + RecType: recordNames[Site], + Identity: c.Identity, + StartTime: c.CreatedAt, + }, + Name: &c.Name, + NameSpace: &c.Namespace, + Platform: &c.Platform, + Version: &c.Version, + Policy: &policy, + } +} + +type policyEnabledConst bool + +func (c policyEnabledConst) Enabled() bool { return bool(c) } diff --git a/pkg/flow/controller_test.go b/pkg/flow/controller_test.go index 19dd9c887..c5308d04b 100644 --- a/pkg/flow/controller_test.go +++ b/pkg/flow/controller_test.go @@ -5,11 +5,12 @@ import ( "testing" "time" + "github.com/skupperproject/skupper/api/types" "gotest.tools/assert" ) func TestUpdateProcess(t *testing.T) { - fc := NewFlowController("mysite", "X.Y.Z", uint64(time.Now().UnixNano())/uint64(time.Microsecond), nil) + fc := NewFlowController("mysite", "X.Y.Z", uint64(time.Now().UnixNano())/uint64(time.Microsecond), nil, WithPolicyDisabled) assert.Assert(t, fc != nil) procName := "tcp-go-echo" sourceIP := "10.0.0.1" @@ -37,7 +38,7 @@ func TestUpdateProcess(t *testing.T) { } func TestUpdateHost(t *testing.T) { - fc := NewFlowController("mysite", "X.Y.Z", uint64(time.Now().UnixNano())/uint64(time.Microsecond), nil) + fc := NewFlowController("mysite", "X.Y.Z", uint64(time.Now().UnixNano())/uint64(time.Microsecond), nil, WithPolicyDisabled) assert.Assert(t, fc != nil) hostName := "bastion-server" host := &HostRecord{ @@ -61,7 +62,7 @@ func TestUpdateHost(t *testing.T) { func TestUpdateBeaconAndHeartbeats(t *testing.T) { _ = os.Setenv("SKUPPER_SITE_ID", "mySite") - fc := NewFlowController("mySite", "X.Y.Z", uint64(time.Now().UnixNano())/uint64(time.Microsecond), nil) + fc := NewFlowController("mySite", "X.Y.Z", uint64(time.Now().UnixNano())/uint64(time.Microsecond), nil, WithPolicyDisabled) assert.Assert(t, fc != nil) stopCh := make(chan struct{}) go fc.updateBeacon(stopCh) @@ -90,10 +91,11 @@ func TestUpdateBeaconAndHeartbeats(t *testing.T) { func TestUpdateRecords(t *testing.T) { _ = os.Setenv("SKUPPER_SITE_ID", "mySite") _ = os.Setenv("SKUPPER_NAMESPACE", "myNamespace") - fc := NewFlowController("mySite", "X.Y.Z", uint64(time.Now().UnixNano())/uint64(time.Microsecond), nil) + fc := NewFlowController("mySite", "X.Y.Z", uint64(time.Now().UnixNano())/uint64(time.Microsecond), nil, WithPolicyDisabled) assert.Assert(t, fc != nil) stopCh := make(chan struct{}) - go fc.updateRecords(stopCh) + + go fc.updateRecords(stopCh, fc.siteController.Start(stopCh)) recordUpdate := <-fc.recordOutgoing assert.Assert(t, recordUpdate != nil) @@ -148,3 +150,146 @@ func TestUpdateRecords(t *testing.T) { close(stopCh) } + +func TestSiteController(t *testing.T) { + alwaysOnEval := policyEnabledConst(true) + flipFlopEval := stubPolicyEvaluator{ + Next: make(chan bool, 8), + } + testCases := []struct { + controller *siteController + expectedOut []func(t *testing.T, record *SiteRecord, ok bool) + waitTimeout time.Duration + expectWaitTimeout bool + }{ + { + controller: &siteController{ + Identity: "basic-podman", + Name: "basic", + Platform: string(types.PlatformPodman), + CreatedAt: 1_111_111, + policyEvaluator: WithPolicyDisabled, + }, + expectedOut: []func(*testing.T, *SiteRecord, bool){ + func(t *testing.T, rec *SiteRecord, ok bool) { + assert.Equal(t, rec.Identity, "basic-podman") + assert.Equal(t, *rec.Name, "basic") + assert.Equal(t, *rec.Platform, string(types.PlatformPodman)) + assert.Equal(t, rec.StartTime, uint64(1_111_111)) + assert.Equal(t, *rec.Policy, Disabled) + }, + func(t *testing.T, rec *SiteRecord, ok bool) { + assert.Equal(t, ok, false) // expect closed + }, + }, + waitTimeout: time.Millisecond * 500, + }, + { + controller: &siteController{ + Identity: "basic-kube", + Name: "basic", + Platform: string(types.PlatformKubernetes), + CreatedAt: 1_111_111, + PolicyEnabled: true, + policyEvaluator: alwaysOnEval, + pollInterval: time.Millisecond * 5, + }, + expectedOut: []func(*testing.T, *SiteRecord, bool){ + func(t *testing.T, rec *SiteRecord, ok bool) { + assert.Equal(t, rec.Identity, "basic-kube") + assert.Equal(t, *rec.Name, "basic") + assert.Equal(t, *rec.Platform, string(types.PlatformKubernetes)) + assert.Equal(t, rec.StartTime, uint64(1_111_111)) + assert.Equal(t, *rec.Policy, Enabled) + }, + }, + waitTimeout: time.Millisecond * 250, + expectWaitTimeout: true, + }, + { + controller: &siteController{ + Identity: "policy-updates-kube", + Name: "basic", + Platform: string(types.PlatformKubernetes), + CreatedAt: 1_111_111, + policyEvaluator: flipFlopEval, + pollInterval: time.Millisecond * 5, + }, + expectedOut: []func(*testing.T, *SiteRecord, bool){ + func(t *testing.T, rec *SiteRecord, ok bool) { + assert.Equal(t, rec.Identity, "policy-updates-kube") + assert.Equal(t, *rec.Name, "basic") + assert.Equal(t, *rec.Platform, string(types.PlatformKubernetes)) + assert.Equal(t, rec.StartTime, uint64(1_111_111)) + assert.Equal(t, *rec.Policy, Disabled) + // queue two updates to policy enabled + flipFlopEval.Next <- false + flipFlopEval.Next <- true + flipFlopEval.Next <- false + }, + func(t *testing.T, rec *SiteRecord, ok bool) { // closed + assert.DeepEqual(t, *rec, SiteRecord{ + Base: Base{ + RecType: recordNames[Site], + Identity: "policy-updates-kube", + }, + Policy: &Enabled, + }) + }, + func(t *testing.T, rec *SiteRecord, ok bool) { // closed + assert.DeepEqual(t, rec, &SiteRecord{ + Base: Base{ + RecType: recordNames[Site], + Identity: "policy-updates-kube", + }, + Policy: &Disabled, + }) + }, + }, + waitTimeout: time.Millisecond * 500, + }, + } + + for _, testCase := range testCases { + tc := testCase + t.Run(tc.controller.Identity, func(t *testing.T) { + t.Parallel() + end := make(chan struct{}) + defer close(end) + + out := tc.controller.Start(end) + for i, expect := range tc.expectedOut { + select { + case actual, ok := <-out: + expect(t, actual, ok) + case <-time.After(tc.waitTimeout): + t.Fatalf("test case timed out waiting for output %d", i) + } + } + if tc.expectWaitTimeout { + select { + case actual := <-out: + t.Errorf("unexpected update at end of test: %v", actual) + case <-time.After(tc.waitTimeout): + // okay + } + } + }) + } +} + +type stubPolicyEvaluator struct { + IsEnabled bool + Next chan bool +} + +func (s stubPolicyEvaluator) Enabled() bool { + select { + case next, ok := <-s.Next: + if ok { + s.IsEnabled = next + } + default: + } + return s.IsEnabled +} diff --git a/pkg/flow/flow_mem_driver.go b/pkg/flow/flow_mem_driver.go index 76fed1671..d7f45d99f 100644 --- a/pkg/flow/flow_mem_driver.go +++ b/pkg/flow/flow_mem_driver.go @@ -690,6 +690,7 @@ func (fc *FlowCollector) updateLastHeard(source string) error { } func (fc *FlowCollector) updateRecord(record interface{}) error { + var updatesNetworkStatus bool switch record.(type) { case HeartbeatRecord: if heartbeat, ok := record.(HeartbeatRecord); ok { @@ -721,7 +722,10 @@ func (fc *FlowCollector) updateRecord(record interface{}) error { } fc.deleteRecord(current) } else { - *current = site + updatesNetworkStatus = true + if site.Policy != nil { + current.Policy = site.Policy + } } } fc.updateLastHeard(site.Source) @@ -1182,6 +1186,10 @@ func (fc *FlowCollector) updateRecord(record interface{}) error { default: return fmt.Errorf("Unrecognized record type %T", record) } + + if updatesNetworkStatus && fc.mode == RecordStatus { + fc.updateNetworkStatus() + } return nil } From 7f42a5c5c677fd840efa92adb37ebf637b0eaae2 Mon Sep 17 00:00:00 2001 From: Christian Kruse Date: Tue, 5 Dec 2023 08:01:53 -0800 Subject: [PATCH 2/2] apply suggestions from review Signed-off-by: Christian Kruse --- pkg/flow/controller.go | 79 ++++++++++++++++++------------------- pkg/flow/controller_test.go | 10 ++--- 2 files changed, 44 insertions(+), 45 deletions(-) diff --git a/pkg/flow/controller.go b/pkg/flow/controller.go index 663b66a2f..1d70ffe35 100644 --- a/pkg/flow/controller.go +++ b/pkg/flow/controller.go @@ -17,18 +17,18 @@ const ( ) type FlowController struct { - origin string - connectionFactory messaging.ConnectionFactory - beaconOutgoing chan interface{} - heartbeatOutgoing chan interface{} - recordOutgoing chan interface{} - flushIncoming chan []interface{} - processOutgoing chan *ProcessRecord - processRecords map[string]*ProcessRecord - hostRecords map[string]*HostRecord - hostOutgoing chan *HostRecord - siteController siteController - startTime int64 + origin string + connectionFactory messaging.ConnectionFactory + beaconOutgoing chan interface{} + heartbeatOutgoing chan interface{} + recordOutgoing chan interface{} + flushIncoming chan []interface{} + processOutgoing chan *ProcessRecord + processRecords map[string]*ProcessRecord + hostRecords map[string]*HostRecord + hostOutgoing chan *HostRecord + siteRecordController siteRecordController + startTime int64 } type PolicyEvaluator interface { @@ -39,18 +39,18 @@ const WithPolicyDisabled = policyEnabledConst(false) func NewFlowController(origin string, version string, creationTime uint64, connectionFactory messaging.ConnectionFactory, policyEvaluator PolicyEvaluator) *FlowController { fc := &FlowController{ - origin: origin, - connectionFactory: connectionFactory, - beaconOutgoing: make(chan interface{}, 10), - heartbeatOutgoing: make(chan interface{}, 10), - recordOutgoing: make(chan interface{}, 10), - flushIncoming: make(chan []interface{}, 10), - processOutgoing: make(chan *ProcessRecord, 10), - processRecords: make(map[string]*ProcessRecord), - hostRecords: make(map[string]*HostRecord), - hostOutgoing: make(chan *HostRecord, 10), - siteController: newSiteController(creationTime, version, policyEvaluator), - startTime: time.Now().Unix(), + origin: origin, + connectionFactory: connectionFactory, + beaconOutgoing: make(chan interface{}, 10), + heartbeatOutgoing: make(chan interface{}, 10), + recordOutgoing: make(chan interface{}, 10), + flushIncoming: make(chan []interface{}, 10), + processOutgoing: make(chan *ProcessRecord, 10), + processRecords: make(map[string]*ProcessRecord), + hostRecords: make(map[string]*HostRecord), + hostOutgoing: make(chan *HostRecord, 10), + siteRecordController: newSiteRecordController(creationTime, version, policyEvaluator), + startTime: time.Now().Unix(), } return fc } @@ -184,7 +184,7 @@ func (c *FlowController) updateRecords(stopCh <-chan struct{}, siteRecordsIncomi log.Println("Unable to convert interface to flush") } } - c.recordOutgoing <- c.siteController.Record() + c.recordOutgoing <- c.siteRecordController.Record() for _, process := range c.processRecords { c.recordOutgoing <- process } @@ -216,7 +216,7 @@ func (c *FlowController) run(stopCh <-chan struct{}) { go c.updateBeacon(stopCh) go c.updateHeartbeats(stopCh) - go c.updateRecords(stopCh, c.siteController.Start(stopCh)) + go c.updateRecords(stopCh, c.siteRecordController.Start(stopCh)) <-stopCh beaconSender.stop() @@ -225,7 +225,7 @@ func (c *FlowController) run(stopCh <-chan struct{}) { flushReceiver.stop() } -type siteController struct { +type siteRecordController struct { mu sync.Mutex Identity string CreatedAt uint64 @@ -239,7 +239,7 @@ type siteController struct { pollInterval time.Duration } -func newSiteController(createdAt uint64, version string, policyEvaluator PolicyEvaluator) siteController { +func newSiteRecordController(createdAt uint64, version string, policyEvaluator PolicyEvaluator) siteRecordController { var policy bool var platformStr string platform := config.GetPlatform() @@ -249,20 +249,19 @@ func newSiteController(createdAt uint64, version string, policyEvaluator PolicyE } else if platform == types.PlatformPodman { platformStr = string(types.PlatformPodman) } - return siteController{ - Identity: os.Getenv("SKUPPER_SITE_ID"), - CreatedAt: createdAt, - Version: version, - Name: os.Getenv("SKUPPER_SITE_NAME"), - Namespace: os.Getenv("SKUPPER_NAMESPACE"), - Platform: platformStr, - PolicyEnabled: policy, - + return siteRecordController{ + Identity: os.Getenv("SKUPPER_SITE_ID"), + CreatedAt: createdAt, + Version: version, + Name: os.Getenv("SKUPPER_SITE_NAME"), + Namespace: os.Getenv("SKUPPER_NAMESPACE"), + Platform: platformStr, + PolicyEnabled: policy, policyEvaluator: policyEvaluator, } } -func (c *siteController) Start(stopCh <-chan struct{}) <-chan *SiteRecord { +func (c *siteRecordController) Start(stopCh <-chan struct{}) <-chan *SiteRecord { updates := make(chan *SiteRecord, 1) go func() { @@ -293,7 +292,7 @@ func (c *siteController) Start(stopCh <-chan struct{}) <-chan *SiteRecord { return updates } -func (c *siteController) updatePolicy() *SiteRecord { +func (c *siteRecordController) updatePolicy() *SiteRecord { c.mu.Lock() defer c.mu.Unlock() enabled := c.policyEvaluator.Enabled() @@ -315,7 +314,7 @@ func (c *siteController) updatePolicy() *SiteRecord { } -func (c *siteController) Record() *SiteRecord { +func (c *siteRecordController) Record() *SiteRecord { c.mu.Lock() defer c.mu.Unlock() policy := Disabled diff --git a/pkg/flow/controller_test.go b/pkg/flow/controller_test.go index c5308d04b..9b3856c8c 100644 --- a/pkg/flow/controller_test.go +++ b/pkg/flow/controller_test.go @@ -95,7 +95,7 @@ func TestUpdateRecords(t *testing.T) { assert.Assert(t, fc != nil) stopCh := make(chan struct{}) - go fc.updateRecords(stopCh, fc.siteController.Start(stopCh)) + go fc.updateRecords(stopCh, fc.siteRecordController.Start(stopCh)) recordUpdate := <-fc.recordOutgoing assert.Assert(t, recordUpdate != nil) @@ -157,13 +157,13 @@ func TestSiteController(t *testing.T) { Next: make(chan bool, 8), } testCases := []struct { - controller *siteController + controller *siteRecordController expectedOut []func(t *testing.T, record *SiteRecord, ok bool) waitTimeout time.Duration expectWaitTimeout bool }{ { - controller: &siteController{ + controller: &siteRecordController{ Identity: "basic-podman", Name: "basic", Platform: string(types.PlatformPodman), @@ -185,7 +185,7 @@ func TestSiteController(t *testing.T) { waitTimeout: time.Millisecond * 500, }, { - controller: &siteController{ + controller: &siteRecordController{ Identity: "basic-kube", Name: "basic", Platform: string(types.PlatformKubernetes), @@ -207,7 +207,7 @@ func TestSiteController(t *testing.T) { expectWaitTimeout: true, }, { - controller: &siteController{ + controller: &siteRecordController{ Identity: "policy-updates-kube", Name: "basic", Platform: string(types.PlatformKubernetes),