Skip to content

Commit

Permalink
More dead code removed and cleanup master info
Browse files Browse the repository at this point in the history
  • Loading branch information
sandeepone committed Feb 20, 2020
1 parent 15c4102 commit a948a00
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 203 deletions.
15 changes: 8 additions & 7 deletions river/cron_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ import (
"context"
"sync"

"github.com/robfig/cron"
"github.com/sandeepone/mysql-manticore/util"

"github.com/robfig/cron"
"gopkg.in/birkirb/loggers.v1"
)

Expand Down Expand Up @@ -39,12 +40,12 @@ func (s *CronService) Serve() {
s.riverInstance.checkAllIndexesForOptimize()
})
}
if cfg.RebuildSchedule != "" {
s.cron.AddFunc(cfg.RebuildSchedule, func() {
s.log.Info("starting rebuild job")
s.riverInstance.rebuildAll(s.ctx, "scheduled rebuild")
})
}
// if cfg.RebuildSchedule != "" {
// s.cron.AddFunc(cfg.RebuildSchedule, func() {
// s.log.Info("starting rebuild job")
// s.riverInstance.rebuildAll(s.ctx, "scheduled rebuild")
// })
// }
s.cron.Run()
}

Expand Down
121 changes: 3 additions & 118 deletions river/master.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,18 @@
package river

import (
"bytes"
"os"
"path"
"sync"
"time"

"github.com/sandeepone/mysql-manticore/sphinx"

"github.com/BurntSushi/toml"
"github.com/davecgh/go-spew/spew"
"github.com/juju/errors"

"github.com/sandeepone/mysql-manticore/sphinx"
"github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go/ioutil2"

"gopkg.in/birkirb/loggers.v1/log"
)

type masterState struct {
sync.RWMutex
dataDir string
filePath string
flavor string
gtid *mysql.GTIDSet
pos *mysql.Position
Expand All @@ -47,114 +37,10 @@ type MysqlPositionProvider interface {

// newMasterState master state constructor
func newMasterState(c *Config) *masterState {
var m = &masterState{useGTID: c.UseGTID, flavor: c.Flavor, skipFileSyncState: c.SkipFileSyncState}
if c.DataDir != "" {
m.dataDir = c.DataDir
m.filePath = path.Join(c.DataDir, "master.info")
}
var m = &masterState{useGTID: c.UseGTID, flavor: c.Flavor, skipFileSyncState: true}
return m
}

func (m *masterState) load() (err error) {
var p positionData

if m.dataDir == "" {
log.Info("skipped loading master info: no data dir specified")
return nil
}

if m.skipFileSyncState {
// log.Info("use skip_file_sync_state option, skipped loading synchronization state from file")
return nil
}

m.Lock()
defer m.Unlock()

m.lastSaveTime = time.Now()

if err := os.MkdirAll(m.dataDir, 0755); err != nil {
return errors.Trace(err)
}

f, err := os.Open(m.filePath)
if err != nil && !os.IsNotExist(errors.Cause(err)) {
return errors.Trace(err)
} else if os.IsNotExist(errors.Cause(err)) {
log.Infof("skipped loading master info: %s file not found", m.filePath)
return nil
}
defer f.Close()

_, err = toml.DecodeReader(f, &p)
if err != nil {
return errors.Trace(err)
}

m.needPositionReset = p.Reset
m.pos = &mysql.Position{
Name: p.Name,
Pos: p.Pos,
}
if !m.useGTID {
return nil
}
gtid, err := mysql.ParseGTIDSet(m.flavor, p.GTID)
if err != nil {
return errors.Trace(err)
}
m.gtid = &gtid
log.Infof("loaded GTID: %v", *m.gtid)
return nil
}

func (m *masterState) save() error {
m.Lock()
defer m.Unlock()

if m.skipFileSyncState {
// log.Info("use skip_file_sync_state option, skipped saving synchronization state from file")
return nil
}

if len(m.dataDir) == 0 {
log.Infof("skipped saving position (no data dir specified)")
return nil
}

var pos mysql.Position
var gtidStr string
if m.pos != nil {
pos = *m.pos
}
if m.gtid == nil {
gtidStr = ""
} else {
gtidStr = (*m.gtid).String()
}
p := positionData{
Name: pos.Name,
Pos: pos.Pos,
GTID: gtidStr,
Reset: m.needPositionReset,
}

m.lastSaveTime = time.Now()
var buf bytes.Buffer
e := toml.NewEncoder(&buf)

e.Encode(p)

var err error
if err = ioutil2.WriteFileAtomic(m.filePath, buf.Bytes(), 0644); err != nil {
log.Errorf("could not save master info to file '%s': %v", m.filePath, err)
} else {
log.Debugf("saved master info to file '%s': %v", m.filePath, p)
}

return nil
}

func (m *masterState) updatePosition(posEvent positionEvent) bool {
m.Lock()
defer m.Unlock()
Expand Down Expand Up @@ -239,8 +125,7 @@ func (m *masterState) syncState() sphinx.SyncState {

func (m *masterState) String() string {
return spew.Sprintf(
"filePath=%v gtid=%v pos=%v useGTID=%v needPositionReset=%v",
m.filePath,
"gtid=%v pos=%v useGTID=%v needPositionReset=%v",
m.gtid,
m.pos,
m.useGTID,
Expand Down
96 changes: 21 additions & 75 deletions river/river.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package river

import (
"context"
"fmt"
"regexp"
"strings"
"sync"
Expand Down Expand Up @@ -185,15 +184,11 @@ func (r *River) run() error {
r.sphinxToken = r.sup.Add(r.sphinxService)
r.sphinxService.WaitUntilStarted()

err = r.initMasterState()
err = r.sphinxService.LoadSyncState(r.master.syncState())
if err != nil {
return errors.Trace(err)
}

if err = r.sphinxService.LoadSyncState(r.master.syncState()); err != nil {
err = r.rebuildAll(nil, fmt.Sprintf("one or more sphinx backends are not up to date: %v", err))
r.l.Errorf("one or more manticore backends are not up to date: %v", err)
} else {
err = r.rebuildIfNotReady(nil)
err = r.checkAllIndexesReady()
}

if err != nil {
Expand All @@ -212,17 +207,6 @@ func (r *River) run() error {
return nil
}

// // RebuildInProgress list of indexes that are being rebuilt right now
// func (r *River) RebuildInProgress() []string {
// p := r.rebuildInProgress.ToSlice()

// indexList := make([]string, len(p))
// for i, index := range p {
// indexList[i] = index.(string)
// }
// return indexList
// }

// Stop stops the River service
func (r *River) Stop() {
r.m.Lock()
Expand Down Expand Up @@ -264,25 +248,6 @@ func (r *River) IsRunning() bool {
return r.isRunning
}

func (r *River) initMasterState() (err error) {
m := r.master
err = m.load()

if err != nil {
return errors.Trace(err)
}

if !m.skipFileSyncState {
r.l.Infof("master state: %s", m.String())

if m.needPositionReset || (m.useGTID && m.gtid == nil) || m.pos == nil {
r.l.Infof("resetting master state to the current upstream position")
err = m.resetToCurrent(r.canal)
}
}
return
}

// SaveState saves current state to file and to sphinx backends
func (r *River) SaveState() {
err := r.sphinxService.SaveSyncState()
Expand Down Expand Up @@ -327,53 +292,34 @@ func (r *River) stopSyncRoutine() {
}
}

func (r *River) enableBuildMode() error {
r.syncM.Lock()
defer r.syncM.Unlock()

if r.syncToken == nil {
r.l.Infof("did not enable build mode since river sync thread is not running")
return nil
}
return errors.Trace(r.syncService.SwitchBuildMode(true, switchBuildModeTimeout))
}

func (r *River) disableBuildMode() error {
r.syncM.Lock()
defer r.syncM.Unlock()

if r.syncToken == nil {
r.l.Infof("did not disable build mode since river sync thread is not running")
return nil
}
return errors.Trace(r.syncService.SwitchBuildMode(false, switchBuildModeTimeout))
}

func (r *River) checkAllIndexesForOptimize() {
for index, indexConfig := range r.c.DataSource {
err := r.sphinxService.CheckIndexForOptimize(index, indexConfig.Parts)
for index, cfg := range r.c.DataSource {
err := r.sphinxService.CheckIndexForOptimize(index, cfg.Parts)
if err != nil {
log.Warnf("periodic optimize error: %s", errors.ErrorStack(err))
}
}
}

// rebuildAll rebuilds all configured indexes
func (r *River) rebuildAll(ctx context.Context, reason string) error {
if r.c.SkipRebuild {
r.l.Infof("use skip_rebuild option, skipped rebuildAll indexes: [%s]", reason)
return nil
}
func (r *River) checkAllIndexesReady() error {
indexes := []string{}
for index, cfg := range r.c.DataSource {
ok, err := r.sphinxService.IndexIsReady(index, cfg.Parts)
if err != nil {
r.l.Errorf("Index got error for [%s] waiting %v", index, err)
return errors.Trace(err)
}

return nil
}
if !ok {
indexes = append(indexes, index)
}
}

func (r *River) rebuildIfNotReady(ctx context.Context) error {
if len(indexes) == 0 {
r.l.Infof("All indexes are ready to listen for events")
return nil
}

// isReady := func(index string, cfg *SourceConfig) (bool, error) {
// return r.sphinxService.IndexIsReady(index, cfg.Parts)
// }
// return r.rebuildIfNot(ctx, "index is not ready", isReady)
return nil
}

Expand Down
4 changes: 1 addition & 3 deletions river/sphinx_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,7 @@ func (s *SphinxService) SaveSyncState() (err error) {
s.sphm.Lock()
defer s.sphm.Unlock()
var master = s.riverInstance.master
if err = master.save(); err != nil {
return errors.Trace(err)
}

if s.sph == nil {
return errors.Trace(errSphinxDisconnected)
}
Expand Down

0 comments on commit a948a00

Please # to comment.