Skip to content

Commit

Permalink
Merge pull request #307 from ipfs/feat/auto-migration
Browse files Browse the repository at this point in the history
Three small state features
  • Loading branch information
hsanjuan authored Feb 8, 2018
2 parents 800ebe2 + 0784802 commit a371fa4
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 10 deletions.
25 changes: 23 additions & 2 deletions ipfs-cluster-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,14 +240,28 @@ configuration.
},
},
{
Name: "daemon",
Usage: "run the IPFS Cluster peer (default)",
Name: "daemon",
Usage: "run the IPFS Cluster peer (default)",
Flags: []cli.Flag{
cli.BoolFlag{
Name: "upgrade, u",
Usage: "run necessary state migrations before starting cluster service",
},
},
Action: daemon,
},
{
Name: "state",
Usage: "Manage ipfs-cluster-state",
Subcommands: []cli.Command{
{
Name: "version",
Usage: "display the shared state format version",
Action: func(c *cli.Context) error {
fmt.Printf("%d\n", mapstate.Version)
return nil
},
},
{
Name: "upgrade",
Usage: "upgrade the IPFS Cluster state to the current version",
Expand Down Expand Up @@ -424,6 +438,13 @@ func daemon(c *cli.Context) error {

// Load all the configurations
cfg, clusterCfg, apiCfg, ipfshttpCfg, consensusCfg, trackerCfg, monCfg, diskInfCfg, numpinInfCfg := makeConfigs()

// Run any migrations
if c.Bool("upgrade") {
err := upgrade()
checkErr("upgrading state", err)
}

// Execution lock
err := locker.lock()
checkErr("acquiring execution lock", err)
Expand Down
39 changes: 31 additions & 8 deletions ipfs-cluster-service/state.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"bytes"
"encoding/json"
"errors"
"io"
Expand All @@ -15,11 +16,16 @@ import (
var errNoSnapshot = errors.New("no snapshot found")

func upgrade() error {
newState, err := restoreStateFromDisk()
newState, current, err := restoreStateFromDisk()
if err != nil {
return err
}

if current {
logger.Warning("Skipping migration of up-to-date state")
return nil
}

cfg, clusterCfg, _, _, consensusCfg, _, _, _, _ := makeConfigs()

err = cfg.LoadJSONFromFile(configPath)
Expand All @@ -32,38 +38,55 @@ func upgrade() error {
}

func export(w io.Writer) error {
stateToExport, err := restoreStateFromDisk()
stateToExport, _, err := restoreStateFromDisk()
if err != nil {
return err
}

return exportState(stateToExport, w)
}

func restoreStateFromDisk() (*mapstate.MapState, error) {
// restoreStateFromDisk returns a mapstate containing the latest
// snapshot, a flag set to true when the state format has the
// current version and an error
func restoreStateFromDisk() (*mapstate.MapState, bool, error) {
cfg, _, _, _, consensusCfg, _, _, _, _ := makeConfigs()

err := cfg.LoadJSONFromFile(configPath)
if err != nil {
return nil, err
return nil, false, err
}

r, snapExists, err := raft.LastStateRaw(consensusCfg)
if !snapExists {
err = errNoSnapshot
}
if err != nil {
return nil, err
return nil, false, err
}

stateFromSnap := mapstate.NewMapState()
err = stateFromSnap.Migrate(r)
// duplicate reader to both check version and migrate
var buf bytes.Buffer
r2 := io.TeeReader(r, &buf)
raw, err := ioutil.ReadAll(r2)
if err != nil {
return nil, false, err
}
err = stateFromSnap.Unmarshal(raw)
if err != nil {
return nil, err
return nil, false, err
}
if stateFromSnap.GetVersion() == mapstate.Version {
return stateFromSnap, true, nil
}

return stateFromSnap, nil
err = stateFromSnap.Migrate(&buf)
if err != nil {
return nil, false, err
}

return stateFromSnap, false, nil
}

func stateImport(r io.Reader) error {
Expand Down
7 changes: 7 additions & 0 deletions state/mapstate/map_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package mapstate

import (
"bytes"
"errors"
"io"
"io/ioutil"
"sync"
Expand Down Expand Up @@ -99,6 +100,9 @@ func (st *MapState) Migrate(r io.Reader) error {
return err
}
err = st.Unmarshal(bs)
if err != nil {
return err
}
if st.Version == Version { // Unmarshal restored for us
return nil
}
Expand Down Expand Up @@ -142,6 +146,9 @@ func (st *MapState) Marshal() ([]byte, error) {
func (st *MapState) Unmarshal(bs []byte) error {
// Check version byte
// logger.Debugf("The incoming bytes to unmarshal: %x", bs)
if len(bs) < 1 {
return errors.New("cannot unmarshal from empty bytes")
}
v := int(bs[0])
logger.Debugf("The interpreted version: %d", v)
if v != Version { // snapshot is out of date
Expand Down

0 comments on commit a371fa4

Please # to comment.