Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

R4R: Solve the bug that snapshot is sometimes unsuccessful #2118

Merged
merged 8 commits into from
Dec 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/iris/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func main() {
tendermintCmd,
server.ResetCmd(ctx, cdc, resetAppState),
server.ExportCmd(ctx, cdc, exportAppStateAndTMValidators),
server.SnapshotCmd(cdc),
server.SnapshotCmd(ctx, cdc, resetAppState),
client.LineBreak,
)

Expand Down
143 changes: 89 additions & 54 deletions server/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"strings"

"github.com/irisnet/irishub/codec"

"github.com/spf13/cobra"
"github.com/spf13/viper"
bc "github.com/tendermint/tendermint/blockchain"
Expand All @@ -23,96 +22,101 @@ const flagTmpDir = "tmp-dir"
const pathSeparator = string(os.PathSeparator)

// SnapshotCmd delete historical block data and index data
func SnapshotCmd(cdc *codec.Codec) *cobra.Command {
func SnapshotCmd(ctx *Context, cdc *codec.Codec, appReset AppReset) *cobra.Command {
cmd := &cobra.Command{
Use: "snapshot",
Short: "snapshot the latest information and drop the others",
RunE: func(cmd *cobra.Command, args []string) error {
defer func() {
if r := recover(); r != nil {
err := r.(error)
ctx.Logger.Error("snapshot file is created failed", "err", err.Error())
}
}()

home := viper.GetString(tmcli.HomeFlag)
emptyState, err := isEmptyState(home)
if err != nil || emptyState {
fmt.Println("WARNING: State is not initialized.")
ctx.Logger.Error("State is not initialized")
return nil
}
srcDir := filepath.Join(home, "data")

targetDir := viper.GetString(flagTmpDir)
if len(targetDir) == 0 {
targetDir = filepath.Join(home, "data.bak")
}
dataDir := filepath.Join(home, "data")

if err = dumpData(srcDir, targetDir, cdc); err != nil {
if err = snapshot(ctx, cdc, dataDir, targetDir, appReset); err != nil {
_ = os.RemoveAll(targetDir)
ctx.Logger.Error("snapshot file is created failed")
return err
}
fmt.Println(fmt.Sprintf("snapshot file is stored in [%s]", targetDir))
ctx.Logger.Info("snapshot file is created successful", "location", targetDir)
return nil
},
}
cmd.Flags().String(flagTmpDir, "", "snapshot file storage directory")
cmd.Flags().String(flagTmpDir, "", "Snapshot file storage directory")
return cmd
}

func loadDb(name, path string) *dbm.GoLevelDB {
db, err := dbm.NewGoLevelDB(name, path)
if err != nil {
panic("load db failed")
panic(err)
}
return db
}

func dumpData(home, targetDir string, cdc *codec.Codec) error {
//save local current block and flush disk
lastHeight := snapshotBlock(home, targetDir)
func snapshot(ctx *Context, cdc *codec.Codec, dataDir, targetDir string, appReset AppReset) error {
blockDB := loadDb("blockstore", dataDir)
blockStore := bc.NewBlockStore(blockDB)

//save local current block height consensus data
_ = snapshotCsWAL(home, targetDir, lastHeight)
stateDB := loadDb("state", dataDir)
state := tmsm.LoadState(stateDB)

//save local current block height state
if err := snapshotState(home, targetDir, lastHeight, cdc); err != nil {
return err
defer func() {
blockDB.Close()
stateDB.Close()
}()
if blockStore.Height() != state.LastBlockHeight {
if err := reset(ctx, appReset, state.LastBlockHeight); err != nil {
return err
}
}

//save local current block and flush disk
snapshotBlock(blockStore, targetDir, state.LastBlockHeight)
//save local current block height state
snapshotState(cdc, stateDB, targetDir)
//save local current block height consensus data
snapshotCsWAL(ctx, dataDir, targetDir, state.LastBlockHeight)

//copy application
appDir := filepath.Join(home, "application.db")
appDir := filepath.Join(dataDir, "application.db")
appTargetDir := filepath.Join(targetDir, "application.db")
if err := copyDir(appDir, appTargetDir); err != nil {
return err
}

//copy evidence.db
evidenceDir := filepath.Join(home, "evidence.db")
evidenceDir := filepath.Join(dataDir, "evidence.db")
evidenceTargetDir := filepath.Join(targetDir, "evidence.db")
return copyDir(evidenceDir, evidenceTargetDir)
}

func snapshotState(home, targetDir string, height int64, cdc *codec.Codec) error {
originDb := loadDb("state", home)
defer originDb.Close()

func snapshotState(cdc *codec.Codec, tmDB *dbm.GoLevelDB, targetDir string) {
targetDb := loadDb("state", targetDir)
defer targetDb.Close()

state := tmsm.LoadState(originDb)

if height != state.LastBlockHeight {
return fmt.Errorf("wrong block height,should: %d, but got %d. please restart the node, then try to snapshot again", height, state.LastBlockHeight)
}
state := tmsm.LoadState(tmDB)

saveValidatorsInfo(cdc, originDb, targetDb, height)
saveConsensusParamsInfo(cdc, originDb, targetDb, height)
saveValidatorsInfo(cdc, tmDB, targetDb, state.LastBlockHeight)
saveConsensusParamsInfo(cdc, tmDB, targetDb, state.LastBlockHeight)
tmsm.SaveState(targetDb, state)

return nil
}

func snapshotBlock(home, targetDir string) int64 {
originDb := loadDb("blockstore", home)
defer originDb.Close()

originStore := bc.NewBlockStore(originDb)
height := originStore.Height()

func snapshotBlock(originStore *bc.BlockStore, targetDir string, height int64) int64 {
targetDb := loadDb("blockstore", targetDir)
defer targetDb.Close()

Expand All @@ -127,42 +131,54 @@ func snapshotBlock(home, targetDir string) int64 {
return height
}

func snapshotCsWAL(home, targetDir string, height int64) error {
func snapshotCsWAL(ctx *Context, home, targetDir string, height int64) {
walTargetDir := filepath.Join(targetDir, "cs.wal", "wal")
targetWAL, err := consensus.NewWAL(walTargetDir)

walSourceDir := filepath.Join(home, "cs.wal", "wal")
sourceWAL, err := consensus.NewWAL(walSourceDir)
if err != nil {
return fmt.Errorf("failed to open WAL for consensus state")
ctx.Logger.Info("failed to open WAL for consensus state", "err", err.Error())
return
}

gr, found, err := sourceWAL.SearchForEndHeight(height, &consensus.WALSearchOptions{IgnoreDataCorruptionErrors: true})
if gr != nil {
defer gr.Close()
}
if err != nil {
return err
}
if !found {
return fmt.Errorf("not found cs.wal file in %s", walSourceDir)

if err != nil || !found {
ctx.Logger.Info(fmt.Sprintf("cannot replay height %d. WAL does not contain #ENDHEIGHT for %d", height, height-1))
return
}

defer func() {
if err = gr.Close(); err != nil {
ctx.Logger.Info("resource release failed", "err", err.Error())
return
}
}()

var msg *consensus.TimedWALMessage
dec := consensus.NewWALDecoder(gr)
for {
msg, err = dec.Decode()
if err == io.EOF {
break
} else if consensus.IsDataCorruptionError(err) {
return fmt.Errorf("data has been corrupted in last height %d of consensus WAL", height)
ctx.Logger.Info("data has been corrupted in last height %d of consensus WAL", height)
return
} else if err != nil {
return err
ctx.Logger.Info("decode WALMessage failed", "err", err.Error())
return
}
if err := targetWAL.Write(msg.Msg); err != nil {
ctx.Logger.Info("write data to file failed", "err", err.Error())
return
}
_ = targetWAL.Write(msg.Msg)
}
_ = targetWAL.WriteSync(consensus.EndHeightMessage{Height: height})
return nil
err = targetWAL.WriteSync(consensus.EndHeightMessage{Height: height})
if err != nil {
ctx.Logger.Info("write data to file failed", "err", err.Error())
return
}
}

func copyDir(srcPath string, destPath string) error {
Expand Down Expand Up @@ -207,10 +223,10 @@ func copyFile(src, dest string) (w int64, err error) {
}
}
dstFile, err := os.Create(dest)
defer dstFile.Close()
if err != nil {
return
}
defer dstFile.Close()
return io.Copy(dstFile, srcFile)
}

Expand Down Expand Up @@ -282,3 +298,22 @@ func calcValidatorsKey(height int64) []byte {
func calcConsensusParamsKey(height int64) []byte {
return []byte(fmt.Sprintf("consensusParamsKey:%v", height))
}

func reset(ctx *Context, appReset AppReset, height int64) error {
cfg := ctx.Config
home := cfg.RootDir
traceWriterFile := viper.GetString(flagTraceStore)

db, err := openDB(home)
if err != nil {
return err
}
traceWriter, err := openTraceWriter(traceWriterFile)
if err != nil {
return err
}
if err := appReset(ctx, ctx.Logger, db, traceWriter, height); err != nil {
return err
}
return nil
}