Skip to content

Commit

Permalink
Add option to provide initial cluster config
Browse files Browse the repository at this point in the history
Add an `--initial-cluster-config` option to the sentinel to specify a file
containing the clusterconfig to use at cluster initialization.
This option, will be ignored on an already initialized cluster.

This is useful to define a clusterconfig without using `stolonctl config
set/patch` after the cluster is initialized.

It's also useful when there's the need to define the `InitWithMultipleKeepers`
cluster config option at first cluster initialization.
  • Loading branch information
sgotti committed Jan 27, 2016
1 parent c4c748a commit dd7ddb1
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 14 deletions.
46 changes: 33 additions & 13 deletions cmd/sentinel/sentinel.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type config struct {
port string
keeperPort string
keeperKubeLabelSelector string
initialClusterConfig string
debug bool
}

Expand All @@ -71,6 +72,7 @@ func init() {
cmdSentinel.PersistentFlags().StringVar(&cfg.port, "port", "6431", "sentinel listening port")
cmdSentinel.PersistentFlags().StringVar(&cfg.keeperKubeLabelSelector, "keeper-kube-label-selector", "", "label selector for discoverying stolon-keeper(s) under kubernetes")
cmdSentinel.PersistentFlags().StringVar(&cfg.keeperPort, "keeper-port", "5431", "stolon-keeper(s) listening port (used by kubernetes discovery)")
cmdSentinel.PersistentFlags().StringVar(&cfg.initialClusterConfig, "initial-cluster-config", "", "a file providing the initial cluster config, used only at cluster initialization, ignored if cluster is already initialized")
cmdSentinel.PersistentFlags().BoolVar(&cfg.debug, "debug", false, "enable debug logging")
}

Expand Down Expand Up @@ -621,15 +623,27 @@ type Sentinel struct {
listenAddress string
port string

clusterConfig *cluster.Config
updateMutex sync.Mutex
leader bool
leaderMutex sync.Mutex
clusterConfig *cluster.Config
initialClusterNilConfig *cluster.NilConfig

updateMutex sync.Mutex
leader bool
leaderMutex sync.Mutex
}

func NewSentinel(id string, cfg config, stop chan bool, end chan bool) (*Sentinel, error) {
storePath := filepath.Join(common.StoreBasePath, cfg.clusterName)
var initialClusterNilConfig *cluster.NilConfig
if cfg.initialClusterConfig != "" {
configData, err := ioutil.ReadFile(cfg.initialClusterConfig)
if err != nil {
return nil, fmt.Errorf("cannot read provided initial cluster config file: %v", err)
}
if err := json.Unmarshal(configData, &initialClusterNilConfig); err != nil {
return nil, fmt.Errorf("cannot parse provided initial cluster config: %v", err)
}
}

storePath := filepath.Join(common.StoreBasePath, cfg.clusterName)
kvstore, err := store.NewStore(store.Backend(cfg.storeBackend), cfg.storeEndpoints)
if err != nil {
return nil, fmt.Errorf("cannot create store: %v", err)
Expand All @@ -639,14 +653,15 @@ func NewSentinel(id string, cfg config, stop chan bool, end chan bool) (*Sentine
candidate := leadership.NewCandidate(kvstore, filepath.Join(storePath, common.SentinelLeaderKey), id, 15*time.Second)

return &Sentinel{
id: id,
e: e,
listenAddress: cfg.listenAddress,
port: cfg.port,
candidate: candidate,
leader: false,
stop: stop,
end: end}, nil
id: id,
e: e,
listenAddress: cfg.listenAddress,
port: cfg.port,
candidate: candidate,
leader: false,
initialClusterNilConfig: initialClusterNilConfig,
stop: stop,
end: end}, nil
}

func (s *Sentinel) Start() {
Expand Down Expand Up @@ -759,9 +774,14 @@ func (s *Sentinel) clusterSentinelCheck(pctx context.Context) {
}

if cv.Version == 0 {
log.Infof("Initializing cluster")
// Cluster first initialization
newcv := cluster.NewClusterView()
newcv.Version = 1
if s.initialClusterNilConfig != nil {
newcv.Config = s.initialClusterNilConfig
}
log.Debugf(spew.Sprintf("new clusterView: %#v", newcv))
if _, err = e.SetClusterData(nil, newcv, nil); err != nil {
log.Errorf("error saving clusterdata: %v", err)
}
Expand Down
64 changes: 64 additions & 0 deletions tests/integration/init_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@ import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"testing"
"time"

"github.com/sorintlab/stolon/common"
"github.com/sorintlab/stolon/pkg/store"

"github.com/sorintlab/stolon/Godeps/_workspace/src/github.com/satori/go.uuid"
)

Expand Down Expand Up @@ -71,6 +75,66 @@ func TestInit(t *testing.T) {

}

func TestInitialClusterConfig(t *testing.T) {
dir, err := ioutil.TempDir("", "")
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
defer os.RemoveAll(dir)

tstore, err := NewTestStore(dir)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
if err := tstore.Start(); err != nil {
t.Fatalf("unexpected err: %v", err)
}
if err := tstore.WaitUp(10 * time.Second); err != nil {
t.Fatalf("error waiting on store up: %v", err)
}
defer tstore.Stop()

clusterName := uuid.NewV4().String()

storeEndpoints := fmt.Sprintf("%s:%s", tstore.listenAddress, tstore.port)
storePath := filepath.Join(common.StoreBasePath, clusterName)

kvstore, err := store.NewStore(tstore.storeBackend, storeEndpoints)
if err != nil {
t.Fatalf("cannot create store: %v", err)
}

e := store.NewStoreManager(kvstore, storePath)

tmpFile, err := ioutil.TempFile(dir, "initial-cluster-config.json")
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
defer tmpFile.Close()
tmpFile.WriteString(`{ "synchronous_replication": true }`)

ts, err := NewTestSentinel(dir, clusterName, tstore.storeBackend, storeEndpoints, fmt.Sprintf("--initial-cluster-config=%s", tmpFile.Name()))
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
if err := ts.Start(); err != nil {
t.Fatalf("unexpected err: %v", err)
}
defer ts.Stop()

if err := WaitClusterInitialized(e, 30*time.Second); err != nil {
t.Fatal("expected cluster initialized")
}

cv, _, err := e.GetClusterView()
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
if !*cv.Config.SynchronousReplication {
t.Fatal("expected cluster config with InitWithMultipleKeepers enabled")
}
}

func TestExclusiveLock(t *testing.T) {
dir, err := ioutil.TempDir("", "")
if err != nil {
Expand Down
21 changes: 20 additions & 1 deletion tests/integration/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ type TestSentinel struct {
port string
}

func NewTestSentinel(dir string, clusterName string, storeBackend store.Backend, storeEndpoints string) (*TestSentinel, error) {
func NewTestSentinel(dir string, clusterName string, storeBackend store.Backend, storeEndpoints string, a ...string) (*TestSentinel, error) {
u := uuid.NewV4()
id := fmt.Sprintf("%x", u[:4])

Expand All @@ -433,6 +433,7 @@ func NewTestSentinel(dir string, clusterName string, storeBackend store.Backend,
args = append(args, fmt.Sprintf("--store-backend=%s", storeBackend))
args = append(args, fmt.Sprintf("--store-endpoints=%s", storeEndpoints))
args = append(args, "--debug")
args = append(args, a...)

bin := os.Getenv("STSENTINEL_BIN")
if bin == "" {
Expand Down Expand Up @@ -731,6 +732,24 @@ func WaitClusterViewMaster(master string, e *store.StoreManager, timeout time.Du

}

func WaitClusterInitialized(e *store.StoreManager, timeout time.Duration) error {
start := time.Now()
for time.Now().Add(-timeout).Before(start) {
cv, _, err := e.GetClusterView()
if err != nil {
goto end
}
if cv != nil {
if cv.Version > 0 {
return nil
}
}
end:
time.Sleep(2 * time.Second)
}
return fmt.Errorf("timeout")
}

// Hack to find a free tcp port
func getFreeTCPPort() (string, string, error) {
ln, err := net.Listen("tcp", "localhost:0")
Expand Down

0 comments on commit dd7ddb1

Please # to comment.