From dd7ddb1e53cc6430a1a278ee332ce19ad6e03873 Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Tue, 26 Jan 2016 14:34:59 +0100 Subject: [PATCH] Add option to provide initial cluster config Add an `--initial-cluster-config` option to the sentinel to specify a file containing the clusterconfig to use at cluster initialization. This option, will be ignored on an already initialized cluster. This is useful to define a clusterconfig without using `stolonctl config set/patch` after the cluster is initialized. It's also useful when there's the need to define the `InitWithMultipleKeepers` cluster config option at first cluster initialization. --- cmd/sentinel/sentinel.go | 46 +++++++++++++++++------- tests/integration/init_test.go | 64 ++++++++++++++++++++++++++++++++++ tests/integration/utils.go | 21 ++++++++++- 3 files changed, 117 insertions(+), 14 deletions(-) diff --git a/cmd/sentinel/sentinel.go b/cmd/sentinel/sentinel.go index 533cbb8b7..e190b0a1c 100644 --- a/cmd/sentinel/sentinel.go +++ b/cmd/sentinel/sentinel.go @@ -58,6 +58,7 @@ type config struct { port string keeperPort string keeperKubeLabelSelector string + initialClusterConfig string debug bool } @@ -71,6 +72,7 @@ func init() { cmdSentinel.PersistentFlags().StringVar(&cfg.port, "port", "6431", "sentinel listening port") cmdSentinel.PersistentFlags().StringVar(&cfg.keeperKubeLabelSelector, "keeper-kube-label-selector", "", "label selector for discoverying stolon-keeper(s) under kubernetes") cmdSentinel.PersistentFlags().StringVar(&cfg.keeperPort, "keeper-port", "5431", "stolon-keeper(s) listening port (used by kubernetes discovery)") + cmdSentinel.PersistentFlags().StringVar(&cfg.initialClusterConfig, "initial-cluster-config", "", "a file providing the initial cluster config, used only at cluster initialization, ignored if cluster is already initialized") cmdSentinel.PersistentFlags().BoolVar(&cfg.debug, "debug", false, "enable debug logging") } @@ -621,15 +623,27 @@ type Sentinel struct { listenAddress string port string - clusterConfig *cluster.Config - updateMutex sync.Mutex - leader bool - leaderMutex sync.Mutex + clusterConfig *cluster.Config + initialClusterNilConfig *cluster.NilConfig + + updateMutex sync.Mutex + leader bool + leaderMutex sync.Mutex } func NewSentinel(id string, cfg config, stop chan bool, end chan bool) (*Sentinel, error) { - storePath := filepath.Join(common.StoreBasePath, cfg.clusterName) + var initialClusterNilConfig *cluster.NilConfig + if cfg.initialClusterConfig != "" { + configData, err := ioutil.ReadFile(cfg.initialClusterConfig) + if err != nil { + return nil, fmt.Errorf("cannot read provided initial cluster config file: %v", err) + } + if err := json.Unmarshal(configData, &initialClusterNilConfig); err != nil { + return nil, fmt.Errorf("cannot parse provided initial cluster config: %v", err) + } + } + storePath := filepath.Join(common.StoreBasePath, cfg.clusterName) kvstore, err := store.NewStore(store.Backend(cfg.storeBackend), cfg.storeEndpoints) if err != nil { return nil, fmt.Errorf("cannot create store: %v", err) @@ -639,14 +653,15 @@ func NewSentinel(id string, cfg config, stop chan bool, end chan bool) (*Sentine candidate := leadership.NewCandidate(kvstore, filepath.Join(storePath, common.SentinelLeaderKey), id, 15*time.Second) return &Sentinel{ - id: id, - e: e, - listenAddress: cfg.listenAddress, - port: cfg.port, - candidate: candidate, - leader: false, - stop: stop, - end: end}, nil + id: id, + e: e, + listenAddress: cfg.listenAddress, + port: cfg.port, + candidate: candidate, + leader: false, + initialClusterNilConfig: initialClusterNilConfig, + stop: stop, + end: end}, nil } func (s *Sentinel) Start() { @@ -759,9 +774,14 @@ func (s *Sentinel) clusterSentinelCheck(pctx context.Context) { } if cv.Version == 0 { + log.Infof("Initializing cluster") // Cluster first initialization newcv := cluster.NewClusterView() newcv.Version = 1 + if s.initialClusterNilConfig != nil { + newcv.Config = s.initialClusterNilConfig + } + log.Debugf(spew.Sprintf("new clusterView: %#v", newcv)) if _, err = e.SetClusterData(nil, newcv, nil); err != nil { log.Errorf("error saving clusterdata: %v", err) } diff --git a/tests/integration/init_test.go b/tests/integration/init_test.go index 879cd7d69..44fdbc3e8 100644 --- a/tests/integration/init_test.go +++ b/tests/integration/init_test.go @@ -18,9 +18,13 @@ import ( "fmt" "io/ioutil" "os" + "path/filepath" "testing" "time" + "github.com/sorintlab/stolon/common" + "github.com/sorintlab/stolon/pkg/store" + "github.com/sorintlab/stolon/Godeps/_workspace/src/github.com/satori/go.uuid" ) @@ -71,6 +75,66 @@ func TestInit(t *testing.T) { } +func TestInitialClusterConfig(t *testing.T) { + dir, err := ioutil.TempDir("", "") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + defer os.RemoveAll(dir) + + tstore, err := NewTestStore(dir) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if err := tstore.Start(); err != nil { + t.Fatalf("unexpected err: %v", err) + } + if err := tstore.WaitUp(10 * time.Second); err != nil { + t.Fatalf("error waiting on store up: %v", err) + } + defer tstore.Stop() + + clusterName := uuid.NewV4().String() + + storeEndpoints := fmt.Sprintf("%s:%s", tstore.listenAddress, tstore.port) + storePath := filepath.Join(common.StoreBasePath, clusterName) + + kvstore, err := store.NewStore(tstore.storeBackend, storeEndpoints) + if err != nil { + t.Fatalf("cannot create store: %v", err) + } + + e := store.NewStoreManager(kvstore, storePath) + + tmpFile, err := ioutil.TempFile(dir, "initial-cluster-config.json") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + defer tmpFile.Close() + tmpFile.WriteString(`{ "synchronous_replication": true }`) + + ts, err := NewTestSentinel(dir, clusterName, tstore.storeBackend, storeEndpoints, fmt.Sprintf("--initial-cluster-config=%s", tmpFile.Name())) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if err := ts.Start(); err != nil { + t.Fatalf("unexpected err: %v", err) + } + defer ts.Stop() + + if err := WaitClusterInitialized(e, 30*time.Second); err != nil { + t.Fatal("expected cluster initialized") + } + + cv, _, err := e.GetClusterView() + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if !*cv.Config.SynchronousReplication { + t.Fatal("expected cluster config with InitWithMultipleKeepers enabled") + } +} + func TestExclusiveLock(t *testing.T) { dir, err := ioutil.TempDir("", "") if err != nil { diff --git a/tests/integration/utils.go b/tests/integration/utils.go index dad13b680..3efad2390 100644 --- a/tests/integration/utils.go +++ b/tests/integration/utils.go @@ -413,7 +413,7 @@ type TestSentinel struct { port string } -func NewTestSentinel(dir string, clusterName string, storeBackend store.Backend, storeEndpoints string) (*TestSentinel, error) { +func NewTestSentinel(dir string, clusterName string, storeBackend store.Backend, storeEndpoints string, a ...string) (*TestSentinel, error) { u := uuid.NewV4() id := fmt.Sprintf("%x", u[:4]) @@ -433,6 +433,7 @@ func NewTestSentinel(dir string, clusterName string, storeBackend store.Backend, args = append(args, fmt.Sprintf("--store-backend=%s", storeBackend)) args = append(args, fmt.Sprintf("--store-endpoints=%s", storeEndpoints)) args = append(args, "--debug") + args = append(args, a...) bin := os.Getenv("STSENTINEL_BIN") if bin == "" { @@ -731,6 +732,24 @@ func WaitClusterViewMaster(master string, e *store.StoreManager, timeout time.Du } +func WaitClusterInitialized(e *store.StoreManager, timeout time.Duration) error { + start := time.Now() + for time.Now().Add(-timeout).Before(start) { + cv, _, err := e.GetClusterView() + if err != nil { + goto end + } + if cv != nil { + if cv.Version > 0 { + return nil + } + } + end: + time.Sleep(2 * time.Second) + } + return fmt.Errorf("timeout") +} + // Hack to find a free tcp port func getFreeTCPPort() (string, string, error) { ln, err := net.Listen("tcp", "localhost:0")