Skip to content

Commit

Permalink
Update policy status on CRD creation/removal
Browse files Browse the repository at this point in the history
Issue skupperproject#1291

Updates the flow controller to periodically poll for changes to the
skupper policy CRD, and to send a flow record to update the site.

Changes the collector to update network status when an update to a site
is detected.

Signed-off-by: c-kruse <ctkruse99@gmail.com>
  • Loading branch information
c-kruse committed Dec 4, 2023
1 parent e3bd1d4 commit 9e1dc84
Show file tree
Hide file tree
Showing 5 changed files with 296 additions and 50 deletions.
4 changes: 3 additions & 1 deletion cmd/service-controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,9 @@ func NewController(cli *client.VanClient, origin string, tlsConfig *certs.TlsCon

controller.serviceSync = service_sync.NewServiceSync(origin, ttl, version.Version, qdr.NewConnectionFactory("amqps://"+types.QualifiedServiceName(types.LocalTransportServiceName, cli.Namespace)+":5671", tlsConfig), handler, controller.eventHandler)

controller.flowController = flow.NewFlowController(origin, version.Version, siteCreationTime, qdr.NewConnectionFactory("amqps://"+types.QualifiedServiceName(types.LocalTransportServiceName, cli.Namespace)+":5671", tlsConfig))
controller.flowController = flow.NewFlowController(origin, version.Version, siteCreationTime,
qdr.NewConnectionFactory("amqps://"+types.QualifiedServiceName(types.LocalTransportServiceName, cli.Namespace)+":5671", tlsConfig),
client.NewClusterPolicyValidator(cli))
ipHandler := func(deleted bool, name string, process *flow.ProcessRecord) error {
return flow.UpdateProcess(controller.flowController, deleted, name, process)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/domain/podman/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (c *ControllerPodman) Run(stopCh <-chan struct{}) error {
c.site = sitePodman

siteCreationTime := uint64(time.Now().UnixNano()) / uint64(time.Microsecond)
flowController := flow.NewFlowController(c.origin, version.Version, siteCreationTime, qdr.NewConnectionFactory("amqps://"+types.LocalTransportServiceName+":5671", c.tlsConfig))
flowController := flow.NewFlowController(c.origin, version.Version, siteCreationTime, qdr.NewConnectionFactory("amqps://"+types.LocalTransportServiceName+":5671", c.tlsConfig), flow.WithPolicyDisabled)
flowController.Start(stopCh)
log.Println("Started flow-controller")

Expand Down
175 changes: 133 additions & 42 deletions pkg/flow/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import (
"log"
"os"
"strings"
"sync"
"time"

"github.com/skupperproject/skupper/api/types"
"github.com/skupperproject/skupper/client"
"github.com/skupperproject/skupper/pkg/config"
"github.com/skupperproject/skupper/pkg/messaging"
)
Expand All @@ -18,8 +18,6 @@ const (

type FlowController struct {
origin string
creationTime uint64
version string
connectionFactory messaging.ConnectionFactory
beaconOutgoing chan interface{}
heartbeatOutgoing chan interface{}
Expand All @@ -29,14 +27,19 @@ type FlowController struct {
processRecords map[string]*ProcessRecord
hostRecords map[string]*HostRecord
hostOutgoing chan *HostRecord
siteController siteController
startTime int64
}

func NewFlowController(origin string, version string, creationTime uint64, connectionFactory messaging.ConnectionFactory) *FlowController {
type PolicyEvaluator interface {
Enabled() bool
}

const WithPolicyDisabled = policyEnabledConst(false)

func NewFlowController(origin string, version string, creationTime uint64, connectionFactory messaging.ConnectionFactory, policyEvaluator PolicyEvaluator) *FlowController {
fc := &FlowController{
origin: origin,
creationTime: creationTime,
version: version,
connectionFactory: connectionFactory,
beaconOutgoing: make(chan interface{}, 10),
heartbeatOutgoing: make(chan interface{}, 10),
Expand All @@ -46,6 +49,7 @@ func NewFlowController(origin string, version string, creationTime uint64, conne
processRecords: make(map[string]*ProcessRecord),
hostRecords: make(map[string]*HostRecord),
hostOutgoing: make(chan *HostRecord, 10),
siteController: newSiteController(creationTime, version, policyEvaluator),
startTime: time.Now().Unix(),
}
return fc
Expand Down Expand Up @@ -159,46 +163,18 @@ func (c *FlowController) updateHeartbeats(stopCh <-chan struct{}) {
}
}

func (c *FlowController) updateRecords(stopCh <-chan struct{}) {
func (c *FlowController) updateRecords(stopCh <-chan struct{}, siteRecordsIncoming <-chan *SiteRecord) {
tickerAge := time.NewTicker(10 * time.Minute)
defer tickerAge.Stop()

name := os.Getenv("SKUPPER_SITE_NAME")
nameSpace := os.Getenv("SKUPPER_NAMESPACE")
policy := Disabled
var platformStr string
platform := config.GetPlatform()
if platform == "" || platform == types.PlatformKubernetes {
platformStr = string(types.PlatformKubernetes)
cli, err := client.NewClient(nameSpace, "", "")
if err == nil {
cpv := client.NewClusterPolicyValidator(cli)
if cpv.Enabled() {
policy = Enabled
}
}
} else if platform == types.PlatformPodman {
platformStr = string(types.PlatformPodman)
}
site := &SiteRecord{
Base: Base{
RecType: recordNames[Site],
Identity: os.Getenv("SKUPPER_SITE_ID"),
StartTime: c.creationTime,
},
Name: &name,
NameSpace: &nameSpace,
Platform: &platformStr,
Version: &c.version,
Policy: &policy,
}

c.recordOutgoing <- site

for {
select {
case process := <-c.processOutgoing:
c.recordOutgoing <- process
case site, ok := <-siteRecordsIncoming:
if !ok {
continue
}
c.recordOutgoing <- site
case host := <-c.hostOutgoing:
c.recordOutgoing <- host
case flushUpdates := <-c.flushIncoming:
Expand All @@ -208,7 +184,7 @@ func (c *FlowController) updateRecords(stopCh <-chan struct{}) {
log.Println("Unable to convert interface to flush")
}
}
c.recordOutgoing <- site
c.recordOutgoing <- c.siteController.Record()
for _, process := range c.processRecords {
c.recordOutgoing <- process
}
Expand Down Expand Up @@ -240,11 +216,126 @@ func (c *FlowController) run(stopCh <-chan struct{}) {

go c.updateBeacon(stopCh)
go c.updateHeartbeats(stopCh)
go c.updateRecords(stopCh)
go c.updateRecords(stopCh, c.siteController.Start(stopCh))
<-stopCh

beaconSender.stop()
heartbeatSender.stop()
recordSender.stop()
flushReceiver.stop()
}

type siteController struct {
mu sync.Mutex
Identity string
CreatedAt uint64
Version string
Name string
Namespace string
Platform string
PolicyEnabled bool

policyEvaluator PolicyEvaluator
pollInterval time.Duration
}

func newSiteController(createdAt uint64, version string, policyEvaluator PolicyEvaluator) siteController {
var policy bool
var platformStr string
platform := config.GetPlatform()
if platform == "" || platform == types.PlatformKubernetes {
platformStr = string(types.PlatformKubernetes)
policy = policyEvaluator.Enabled()
} else if platform == types.PlatformPodman {
platformStr = string(types.PlatformPodman)
}
return siteController{
Identity: os.Getenv("SKUPPER_SITE_ID"),
CreatedAt: createdAt,
Version: version,
Name: os.Getenv("SKUPPER_SITE_NAME"),
Namespace: os.Getenv("SKUPPER_NAMESPACE"),
Platform: platformStr,
PolicyEnabled: policy,

policyEvaluator: policyEvaluator,
}
}

func (c *siteController) Start(stopCh <-chan struct{}) <-chan *SiteRecord {
updates := make(chan *SiteRecord, 1)

go func() {
defer close(updates)
updates <- c.Record()

if c.Platform != string(types.PlatformKubernetes) {
return
}
// watch for changes to policy enabled
pollInterval := c.pollInterval
if pollInterval <= 0 {
pollInterval = time.Second * 30
}
ticker := time.NewTicker(pollInterval)
defer ticker.Stop()
for {
select {
case <-stopCh:
return
case <-ticker.C:
if update := c.updatePolicy(); update != nil {
updates <- update
}
}
}
}()
return updates
}

func (c *siteController) updatePolicy() *SiteRecord {
c.mu.Lock()
defer c.mu.Unlock()
enabled := c.policyEvaluator.Enabled()
if enabled == c.PolicyEnabled {
return nil
}
c.PolicyEnabled = enabled
policy := Disabled
if enabled {
policy = Enabled
}
return &SiteRecord{
Base: Base{
RecType: recordNames[Site],
Identity: c.Identity,
},
Policy: &policy,
}

}

func (c *siteController) Record() *SiteRecord {
c.mu.Lock()
defer c.mu.Unlock()
policy := Disabled
if c.PolicyEnabled {
policy = Enabled
}
return &SiteRecord{
Base: Base{
RecType: recordNames[Site],
Identity: c.Identity,
StartTime: c.CreatedAt,
},
Name: &c.Name,
NameSpace: &c.Namespace,
Platform: &c.Platform,
Version: &c.Version,
Policy: &policy,
}
}

type policyEnabledConst bool

func (c policyEnabledConst) Enabled() bool { return bool(c) }
Loading

0 comments on commit 9e1dc84

Please # to comment.