Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Update policy status on CRD creation/removal #1304

Merged
merged 3 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/service-controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,9 @@ func NewController(cli *client.VanClient, origin string, tlsConfig *certs.TlsCon

controller.serviceSync = service_sync.NewServiceSync(origin, ttl, version.Version, qdr.NewConnectionFactory("amqps://"+types.QualifiedServiceName(types.LocalTransportServiceName, cli.Namespace)+":5671", tlsConfig), handler, controller.eventHandler)

controller.flowController = flow.NewFlowController(origin, version.Version, siteCreationTime, qdr.NewConnectionFactory("amqps://"+types.QualifiedServiceName(types.LocalTransportServiceName, cli.Namespace)+":5671", tlsConfig))
controller.flowController = flow.NewFlowController(origin, version.Version, siteCreationTime,
qdr.NewConnectionFactory("amqps://"+types.QualifiedServiceName(types.LocalTransportServiceName, cli.Namespace)+":5671", tlsConfig),
client.NewClusterPolicyValidator(cli))
ipHandler := func(deleted bool, name string, process *flow.ProcessRecord) error {
return flow.UpdateProcess(controller.flowController, deleted, name, process)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/domain/podman/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (c *ControllerPodman) Run(stopCh <-chan struct{}) error {
c.site = sitePodman

siteCreationTime := uint64(time.Now().UnixNano()) / uint64(time.Microsecond)
flowController := flow.NewFlowController(c.origin, version.Version, siteCreationTime, qdr.NewConnectionFactory("amqps://"+types.LocalTransportServiceName+":5671", c.tlsConfig))
flowController := flow.NewFlowController(c.origin, version.Version, siteCreationTime, qdr.NewConnectionFactory("amqps://"+types.LocalTransportServiceName+":5671", c.tlsConfig), flow.WithPolicyDisabled)
flowController.Start(stopCh)
log.Println("Started flow-controller")

Expand Down
218 changes: 154 additions & 64 deletions pkg/flow/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import (
"log"
"os"
"strings"
"sync"
"time"

"github.com/skupperproject/skupper/api/types"
"github.com/skupperproject/skupper/client"
"github.com/skupperproject/skupper/pkg/config"
"github.com/skupperproject/skupper/pkg/messaging"
)
Expand All @@ -17,36 +17,40 @@ const (
)

type FlowController struct {
origin string
creationTime uint64
version string
connectionFactory messaging.ConnectionFactory
beaconOutgoing chan interface{}
heartbeatOutgoing chan interface{}
recordOutgoing chan interface{}
flushIncoming chan []interface{}
processOutgoing chan *ProcessRecord
processRecords map[string]*ProcessRecord
hostRecords map[string]*HostRecord
hostOutgoing chan *HostRecord
startTime int64
origin string
connectionFactory messaging.ConnectionFactory
beaconOutgoing chan interface{}
heartbeatOutgoing chan interface{}
recordOutgoing chan interface{}
flushIncoming chan []interface{}
processOutgoing chan *ProcessRecord
processRecords map[string]*ProcessRecord
hostRecords map[string]*HostRecord
hostOutgoing chan *HostRecord
siteRecordController siteRecordController
startTime int64
}

func NewFlowController(origin string, version string, creationTime uint64, connectionFactory messaging.ConnectionFactory) *FlowController {
type PolicyEvaluator interface {
Enabled() bool
}

const WithPolicyDisabled = policyEnabledConst(false)

func NewFlowController(origin string, version string, creationTime uint64, connectionFactory messaging.ConnectionFactory, policyEvaluator PolicyEvaluator) *FlowController {
fc := &FlowController{
origin: origin,
creationTime: creationTime,
version: version,
connectionFactory: connectionFactory,
beaconOutgoing: make(chan interface{}, 10),
heartbeatOutgoing: make(chan interface{}, 10),
recordOutgoing: make(chan interface{}, 10),
flushIncoming: make(chan []interface{}, 10),
processOutgoing: make(chan *ProcessRecord, 10),
processRecords: make(map[string]*ProcessRecord),
hostRecords: make(map[string]*HostRecord),
hostOutgoing: make(chan *HostRecord, 10),
startTime: time.Now().Unix(),
origin: origin,
connectionFactory: connectionFactory,
beaconOutgoing: make(chan interface{}, 10),
heartbeatOutgoing: make(chan interface{}, 10),
recordOutgoing: make(chan interface{}, 10),
flushIncoming: make(chan []interface{}, 10),
processOutgoing: make(chan *ProcessRecord, 10),
processRecords: make(map[string]*ProcessRecord),
hostRecords: make(map[string]*HostRecord),
hostOutgoing: make(chan *HostRecord, 10),
siteRecordController: newSiteRecordController(creationTime, version, policyEvaluator),
startTime: time.Now().Unix(),
}
return fc
}
Expand Down Expand Up @@ -159,46 +163,18 @@ func (c *FlowController) updateHeartbeats(stopCh <-chan struct{}) {
}
}

func (c *FlowController) updateRecords(stopCh <-chan struct{}) {
func (c *FlowController) updateRecords(stopCh <-chan struct{}, siteRecordsIncoming <-chan *SiteRecord) {
tickerAge := time.NewTicker(10 * time.Minute)
defer tickerAge.Stop()

name := os.Getenv("SKUPPER_SITE_NAME")
nameSpace := os.Getenv("SKUPPER_NAMESPACE")
policy := Disabled
var platformStr string
platform := config.GetPlatform()
if platform == "" || platform == types.PlatformKubernetes {
platformStr = string(types.PlatformKubernetes)
cli, err := client.NewClient(nameSpace, "", "")
if err == nil {
cpv := client.NewClusterPolicyValidator(cli)
if cpv.Enabled() {
policy = Enabled
}
}
} else if platform == types.PlatformPodman {
platformStr = string(types.PlatformPodman)
}
site := &SiteRecord{
Base: Base{
RecType: recordNames[Site],
Identity: os.Getenv("SKUPPER_SITE_ID"),
StartTime: c.creationTime,
},
Name: &name,
NameSpace: &nameSpace,
Platform: &platformStr,
Version: &c.version,
Policy: &policy,
}

c.recordOutgoing <- site

for {
select {
case process := <-c.processOutgoing:
c.recordOutgoing <- process
case site, ok := <-siteRecordsIncoming:
if !ok {
continue
}
c.recordOutgoing <- site
case host := <-c.hostOutgoing:
c.recordOutgoing <- host
case flushUpdates := <-c.flushIncoming:
Expand All @@ -208,7 +184,7 @@ func (c *FlowController) updateRecords(stopCh <-chan struct{}) {
log.Println("Unable to convert interface to flush")
}
}
c.recordOutgoing <- site
c.recordOutgoing <- c.siteRecordController.Record()
for _, process := range c.processRecords {
c.recordOutgoing <- process
}
Expand Down Expand Up @@ -240,11 +216,125 @@ func (c *FlowController) run(stopCh <-chan struct{}) {

go c.updateBeacon(stopCh)
go c.updateHeartbeats(stopCh)
go c.updateRecords(stopCh)
go c.updateRecords(stopCh, c.siteRecordController.Start(stopCh))
<-stopCh

beaconSender.stop()
heartbeatSender.stop()
recordSender.stop()
flushReceiver.stop()
}

type siteRecordController struct {
mu sync.Mutex
Identity string
CreatedAt uint64
Version string
Name string
Namespace string
Platform string
PolicyEnabled bool

policyEvaluator PolicyEvaluator
pollInterval time.Duration
}

func newSiteRecordController(createdAt uint64, version string, policyEvaluator PolicyEvaluator) siteRecordController {
var policy bool
var platformStr string
platform := config.GetPlatform()
if platform == "" || platform == types.PlatformKubernetes {
platformStr = string(types.PlatformKubernetes)
policy = policyEvaluator.Enabled()
} else if platform == types.PlatformPodman {
platformStr = string(types.PlatformPodman)
}
return siteRecordController{
Identity: os.Getenv("SKUPPER_SITE_ID"),
CreatedAt: createdAt,
Version: version,
Name: os.Getenv("SKUPPER_SITE_NAME"),
Namespace: os.Getenv("SKUPPER_NAMESPACE"),
Platform: platformStr,
PolicyEnabled: policy,
policyEvaluator: policyEvaluator,
}
}

func (c *siteRecordController) Start(stopCh <-chan struct{}) <-chan *SiteRecord {
updates := make(chan *SiteRecord, 1)

go func() {
defer close(updates)
updates <- c.Record()

if c.Platform != string(types.PlatformKubernetes) {
return
}
// watch for changes to policy enabled
pollInterval := c.pollInterval
if pollInterval <= 0 {
pollInterval = time.Second * 30
}
ticker := time.NewTicker(pollInterval)
defer ticker.Stop()
for {
select {
case <-stopCh:
return
case <-ticker.C:
if update := c.updatePolicy(); update != nil {
updates <- update
}
}
}
}()
return updates
}

func (c *siteRecordController) updatePolicy() *SiteRecord {
c.mu.Lock()
defer c.mu.Unlock()
enabled := c.policyEvaluator.Enabled()
if enabled == c.PolicyEnabled {
return nil
}
c.PolicyEnabled = enabled
policy := Disabled
if enabled {
policy = Enabled
}
return &SiteRecord{
Base: Base{
RecType: recordNames[Site],
Identity: c.Identity,
},
Policy: &policy,
}

}

func (c *siteRecordController) Record() *SiteRecord {
c.mu.Lock()
defer c.mu.Unlock()
policy := Disabled
if c.PolicyEnabled {
policy = Enabled
}
return &SiteRecord{
Base: Base{
RecType: recordNames[Site],
Identity: c.Identity,
StartTime: c.CreatedAt,
},
Name: &c.Name,
NameSpace: &c.Namespace,
Platform: &c.Platform,
Version: &c.Version,
Policy: &policy,
}
}

type policyEnabledConst bool

func (c policyEnabledConst) Enabled() bool { return bool(c) }
Loading