Skip to content

Commit

Permalink
Support for restart controller on ConfigMap change
Browse files Browse the repository at this point in the history
This change adds support for restarting the main controller
when a config change has been detected on disk from change
in the ConfigMap.

The implementation adds a ConfigWatcher go routine which will
check the if the config has been updated by loading from disk
and comparing with what is currently used. If change is detected,
it will signal to the controller to restart with the updated
config. This change will enhance the deployment and rollout story
for the provisioner as updates to the ConfigMap will be picked up
automatically by the provisioner without needing to explicitly
restart the pod itself.

Signed-off By: Yibo Zhuang <yibzhuang@gmail.com>
  • Loading branch information
yibozhuang committed Sep 29, 2021
1 parent 1cc3374 commit d16939b
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 20 deletions.
23 changes: 10 additions & 13 deletions cmd/local-volume-provisioner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
"k8s.io/klog/v2"
"sigs.k8s.io/sig-storage-local-static-provisioner/pkg/common"
"sigs.k8s.io/sig-storage-local-static-provisioner/pkg/config_watcher"
"sigs.k8s.io/sig-storage-local-static-provisioner/pkg/controller"
"sigs.k8s.io/sig-storage-local-static-provisioner/pkg/deleter"
"sigs.k8s.io/sig-storage-local-static-provisioner/pkg/metrics"
Expand All @@ -45,6 +46,7 @@ var (
optListenAddress string
optMetricsPath string
discoveryPeriod time.Duration
configSyncPeriod time.Duration
)

func main() {
Expand All @@ -53,6 +55,7 @@ func main() {
flag.StringVar(&optListenAddress, "listen-address", ":8080", "address on which to expose metrics and readiness status")
flag.StringVar(&optMetricsPath, "metrics-path", "/metrics", "path under which to expose metrics")
flag.DurationVar(&discoveryPeriod, "discovery-period", 10*time.Second, "the period for local volume discovery")
flag.DurationVar(&configSyncPeriod, "config-sync-period", 5*time.Second, "the period to check if there has been any config changes")
flag.Parse()
flag.Set("logtostderr", "true")

Expand Down Expand Up @@ -85,21 +88,15 @@ func main() {
client := common.SetupClient()
node := getNode(client, nodeName)

restartController := make(chan common.ProvisionerConfiguration)

configWatcher := config_watcher.NewConfigWatcher(common.ProvisionerConfigPath, configSyncPeriod, provisionerConfig)
klog.Info("Starting config watcher\n")
go configWatcher.Run(restartController)

klog.Info("Starting controller\n")
procTable := deleter.NewProcTable()
go controller.StartLocalController(client, procTable, discoveryPeriod, &common.UserConfig{
Node: node,
DiscoveryMap: provisionerConfig.StorageClassConfig,
NodeLabelsForPV: provisionerConfig.NodeLabelsForPV,
UseAlphaAPI: provisionerConfig.UseAlphaAPI,
UseJobForCleaning: provisionerConfig.UseJobForCleaning,
MinResyncPeriod: provisionerConfig.MinResyncPeriod,
UseNodeNameOnly: provisionerConfig.UseNodeNameOnly,
Namespace: namespace,
JobContainerImage: jobImage,
LabelsForPV: provisionerConfig.LabelsForPV,
SetPVOwnerRef: provisionerConfig.SetPVOwnerRef,
})
go controller.RunLocalController(configSyncPeriod, restartController, client, procTable, discoveryPeriod, node, namespace, jobImage, provisionerConfig)

klog.Infof("Starting metrics server at %s\n", optListenAddress)
prometheus.MustRegister([]prometheus.Collector{
Expand Down
19 changes: 18 additions & 1 deletion pkg/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ func ConfigMapDataToVolumeConfig(data map[string]string, provisionerConfig *Prov
}

provisionerConfig.StorageClassConfig[class] = config
klog.Infof("StorageClass %q configured with MountDir %q, HostDir %q, VolumeMode %q, FsType %q, BlockCleanerCommand %q, NamePattern %q",
klog.V(5).Infof("StorageClass %q configured with MountDir %q, HostDir %q, VolumeMode %q, FsType %q, BlockCleanerCommand %q, NamePattern %q",
class,
config.MountDir,
config.HostDir,
Expand All @@ -364,6 +364,23 @@ func insertSpaces(original string) string {
return spaced
}

// UserConfigFromProvisionerConfig creates a UserConfig from the provided ProvisionerConfiguration struct
func UserConfigFromProvisionerConfig(node *v1.Node, namespace, jobImage string, config ProvisionerConfiguration) *UserConfig {
return &UserConfig{
Node: node,
DiscoveryMap: config.StorageClassConfig,
NodeLabelsForPV: config.NodeLabelsForPV,
UseAlphaAPI: config.UseAlphaAPI,
UseJobForCleaning: config.UseJobForCleaning,
MinResyncPeriod: config.MinResyncPeriod,
UseNodeNameOnly: config.UseNodeNameOnly,
Namespace: namespace,
JobContainerImage: jobImage,
LabelsForPV: config.LabelsForPV,
SetPVOwnerRef: config.SetPVOwnerRef,
}
}

// LoadProvisionerConfigs loads all configuration into a string and unmarshal it into ProvisionerConfiguration struct.
// The configuration is stored in the configmap which is mounted as a volume.
func LoadProvisionerConfigs(configPath string, provisionerConfig *ProvisionerConfiguration) error {
Expand Down
48 changes: 48 additions & 0 deletions pkg/config_watcher/config_watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package config_watcher

import (
"reflect"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"

"sigs.k8s.io/sig-storage-local-static-provisioner/pkg/common"
)

type ConfigWatcher struct {
configPath string
resyncPeriod time.Duration
lastAppliedConfig common.ProvisionerConfiguration
}

func NewConfigWatcher(configPath string, resyncPeriod time.Duration, config common.ProvisionerConfiguration) *ConfigWatcher {
return &ConfigWatcher{
configPath: configPath,
resyncPeriod: resyncPeriod,
lastAppliedConfig: config,
}
}

func (cw *ConfigWatcher) Run(restartController chan<- common.ProvisionerConfiguration) {
for {
select {
case <-time.After(cw.resyncPeriod):
provisionerConfig := common.ProvisionerConfiguration{
StorageClassConfig: make(map[string]common.MountConfig),
MinResyncPeriod: metav1.Duration{Duration: 5 * time.Minute},
}
if err := common.LoadProvisionerConfigs(cw.configPath, &provisionerConfig); err != nil {
klog.Fatalf("Error parsing Provisioner's configuration: %#v. Exiting...\n", err)
}

if !reflect.DeepEqual(cw.lastAppliedConfig, provisionerConfig) {
klog.Infof("Loaded and detected updated configuration: %+v", provisionerConfig)
klog.Infof("Signalling controller to restart to pick up updated configuration...")

restartController <- provisionerConfig
cw.lastAppliedConfig = provisionerConfig
}
}
}
}
47 changes: 41 additions & 6 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,36 @@ import (
"k8s.io/utils/mount"
)

func RunLocalController(resync time.Duration, restartController <-chan common.ProvisionerConfiguration, client *kubernetes.Clientset, ptable deleter.ProcTable, discoveryPeriod time.Duration, node *v1.Node, namespace, jobImage string, config common.ProvisionerConfiguration) {
stopped := make(chan struct{})
stopChan := make(chan struct{})

go StartLocalController(stopChan, stopped, client, ptable, discoveryPeriod, common.UserConfigFromProvisionerConfig(node, namespace, jobImage, config))

for {
select {
case newConfig := <-restartController:
stopChan <- struct{}{}
<-stopped
go StartLocalController(stopChan, stopped, client, ptable, discoveryPeriod, common.UserConfigFromProvisionerConfig(node, namespace, jobImage, newConfig))
default:
time.Sleep(resync)
}
}
}

// StartLocalController starts the sync loop for the local PV discovery and deleter
func StartLocalController(client *kubernetes.Clientset, ptable deleter.ProcTable, discoveryPeriod time.Duration, config *common.UserConfig) {
func StartLocalController(stopChan <-chan struct{}, controllerStopped chan<- struct{}, client *kubernetes.Clientset, ptable deleter.ProcTable, discoveryPeriod time.Duration, config *common.UserConfig) {
defer func() {
controllerStopped <- struct{}{}
klog.Info("Controller stopped\n")
}()

klog.Info("Initializing volume cache\n")

informerStopChan := make(chan struct{})
jobControllerStopChan := make(chan struct{})

var provisionerName string
if config.UseNodeNameOnly {
provisionerName = fmt.Sprintf("local-volume-provisioner-%v", config.Node.Name)
Expand Down Expand Up @@ -97,7 +123,7 @@ func StartLocalController(client *kubernetes.Clientset, ptable deleter.ProcTable
deleter := deleter.NewDeleter(runtimeConfig, cleanupTracker)

// Start informers after all event listeners are registered.
runtimeConfig.InformerFactory.Start(wait.NeverStop)
runtimeConfig.InformerFactory.Start(informerStopChan)
// Wait for all started informers' cache were synced.
for v, synced := range runtimeConfig.InformerFactory.WaitForCacheSync(wait.NeverStop) {
if !synced {
Expand All @@ -106,12 +132,21 @@ func StartLocalController(client *kubernetes.Clientset, ptable deleter.ProcTable
}
// Run controller logic.
if jobController != nil {
go jobController.Run(wait.NeverStop)
go jobController.Run(jobControllerStopChan)
}
klog.Info("Controller started\n")
for {
deleter.DeletePVs()
discoverer.DiscoverLocalVolumes()
time.Sleep(discoveryPeriod)
select {
case <-stopChan:
close(informerStopChan)
if jobController != nil {
close(jobControllerStopChan)
}
return
default:
deleter.DeletePVs()
discoverer.DiscoverLocalVolumes()
time.Sleep(discoveryPeriod)
}
}
}

0 comments on commit d16939b

Please # to comment.