From a948a00a828a0a11d7d87107c4b45eb06e137889 Mon Sep 17 00:00:00 2001 From: Gleez Technologies Date: Thu, 20 Feb 2020 04:03:36 +0000 Subject: [PATCH] More dead code removed and cleanup master info --- river/cron_service.go | 15 ++--- river/master.go | 121 +--------------------------------------- river/river.go | 96 +++++++------------------------ river/sphinx_service.go | 4 +- 4 files changed, 33 insertions(+), 203 deletions(-) diff --git a/river/cron_service.go b/river/cron_service.go index 3b1d481..32f143d 100644 --- a/river/cron_service.go +++ b/river/cron_service.go @@ -4,8 +4,9 @@ import ( "context" "sync" - "github.com/robfig/cron" "github.com/sandeepone/mysql-manticore/util" + + "github.com/robfig/cron" "gopkg.in/birkirb/loggers.v1" ) @@ -39,12 +40,12 @@ func (s *CronService) Serve() { s.riverInstance.checkAllIndexesForOptimize() }) } - if cfg.RebuildSchedule != "" { - s.cron.AddFunc(cfg.RebuildSchedule, func() { - s.log.Info("starting rebuild job") - s.riverInstance.rebuildAll(s.ctx, "scheduled rebuild") - }) - } + // if cfg.RebuildSchedule != "" { + // s.cron.AddFunc(cfg.RebuildSchedule, func() { + // s.log.Info("starting rebuild job") + // s.riverInstance.rebuildAll(s.ctx, "scheduled rebuild") + // }) + // } s.cron.Run() } diff --git a/river/master.go b/river/master.go index b736db9..74602c7 100644 --- a/river/master.go +++ b/river/master.go @@ -1,28 +1,18 @@ package river import ( - "bytes" - "os" - "path" "sync" "time" - "github.com/sandeepone/mysql-manticore/sphinx" - - "github.com/BurntSushi/toml" "github.com/davecgh/go-spew/spew" "github.com/juju/errors" + "github.com/sandeepone/mysql-manticore/sphinx" "github.com/siddontang/go-mysql/mysql" - "github.com/siddontang/go/ioutil2" - - "gopkg.in/birkirb/loggers.v1/log" ) type masterState struct { sync.RWMutex - dataDir string - filePath string flavor string gtid *mysql.GTIDSet pos *mysql.Position @@ -47,114 +37,10 @@ type MysqlPositionProvider interface { // newMasterState master state constructor func newMasterState(c *Config) *masterState { - var m = &masterState{useGTID: c.UseGTID, flavor: c.Flavor, skipFileSyncState: c.SkipFileSyncState} - if c.DataDir != "" { - m.dataDir = c.DataDir - m.filePath = path.Join(c.DataDir, "master.info") - } + var m = &masterState{useGTID: c.UseGTID, flavor: c.Flavor, skipFileSyncState: true} return m } -func (m *masterState) load() (err error) { - var p positionData - - if m.dataDir == "" { - log.Info("skipped loading master info: no data dir specified") - return nil - } - - if m.skipFileSyncState { - // log.Info("use skip_file_sync_state option, skipped loading synchronization state from file") - return nil - } - - m.Lock() - defer m.Unlock() - - m.lastSaveTime = time.Now() - - if err := os.MkdirAll(m.dataDir, 0755); err != nil { - return errors.Trace(err) - } - - f, err := os.Open(m.filePath) - if err != nil && !os.IsNotExist(errors.Cause(err)) { - return errors.Trace(err) - } else if os.IsNotExist(errors.Cause(err)) { - log.Infof("skipped loading master info: %s file not found", m.filePath) - return nil - } - defer f.Close() - - _, err = toml.DecodeReader(f, &p) - if err != nil { - return errors.Trace(err) - } - - m.needPositionReset = p.Reset - m.pos = &mysql.Position{ - Name: p.Name, - Pos: p.Pos, - } - if !m.useGTID { - return nil - } - gtid, err := mysql.ParseGTIDSet(m.flavor, p.GTID) - if err != nil { - return errors.Trace(err) - } - m.gtid = >id - log.Infof("loaded GTID: %v", *m.gtid) - return nil -} - -func (m *masterState) save() error { - m.Lock() - defer m.Unlock() - - if m.skipFileSyncState { - // log.Info("use skip_file_sync_state option, skipped saving synchronization state from file") - return nil - } - - if len(m.dataDir) == 0 { - log.Infof("skipped saving position (no data dir specified)") - return nil - } - - var pos mysql.Position - var gtidStr string - if m.pos != nil { - pos = *m.pos - } - if m.gtid == nil { - gtidStr = "" - } else { - gtidStr = (*m.gtid).String() - } - p := positionData{ - Name: pos.Name, - Pos: pos.Pos, - GTID: gtidStr, - Reset: m.needPositionReset, - } - - m.lastSaveTime = time.Now() - var buf bytes.Buffer - e := toml.NewEncoder(&buf) - - e.Encode(p) - - var err error - if err = ioutil2.WriteFileAtomic(m.filePath, buf.Bytes(), 0644); err != nil { - log.Errorf("could not save master info to file '%s': %v", m.filePath, err) - } else { - log.Debugf("saved master info to file '%s': %v", m.filePath, p) - } - - return nil -} - func (m *masterState) updatePosition(posEvent positionEvent) bool { m.Lock() defer m.Unlock() @@ -239,8 +125,7 @@ func (m *masterState) syncState() sphinx.SyncState { func (m *masterState) String() string { return spew.Sprintf( - "filePath=%v gtid=%v pos=%v useGTID=%v needPositionReset=%v", - m.filePath, + "gtid=%v pos=%v useGTID=%v needPositionReset=%v", m.gtid, m.pos, m.useGTID, diff --git a/river/river.go b/river/river.go index 2a1535e..82fab09 100644 --- a/river/river.go +++ b/river/river.go @@ -2,7 +2,6 @@ package river import ( "context" - "fmt" "regexp" "strings" "sync" @@ -185,15 +184,11 @@ func (r *River) run() error { r.sphinxToken = r.sup.Add(r.sphinxService) r.sphinxService.WaitUntilStarted() - err = r.initMasterState() + err = r.sphinxService.LoadSyncState(r.master.syncState()) if err != nil { - return errors.Trace(err) - } - - if err = r.sphinxService.LoadSyncState(r.master.syncState()); err != nil { - err = r.rebuildAll(nil, fmt.Sprintf("one or more sphinx backends are not up to date: %v", err)) + r.l.Errorf("one or more manticore backends are not up to date: %v", err) } else { - err = r.rebuildIfNotReady(nil) + err = r.checkAllIndexesReady() } if err != nil { @@ -212,17 +207,6 @@ func (r *River) run() error { return nil } -// // RebuildInProgress list of indexes that are being rebuilt right now -// func (r *River) RebuildInProgress() []string { -// p := r.rebuildInProgress.ToSlice() - -// indexList := make([]string, len(p)) -// for i, index := range p { -// indexList[i] = index.(string) -// } -// return indexList -// } - // Stop stops the River service func (r *River) Stop() { r.m.Lock() @@ -264,25 +248,6 @@ func (r *River) IsRunning() bool { return r.isRunning } -func (r *River) initMasterState() (err error) { - m := r.master - err = m.load() - - if err != nil { - return errors.Trace(err) - } - - if !m.skipFileSyncState { - r.l.Infof("master state: %s", m.String()) - - if m.needPositionReset || (m.useGTID && m.gtid == nil) || m.pos == nil { - r.l.Infof("resetting master state to the current upstream position") - err = m.resetToCurrent(r.canal) - } - } - return -} - // SaveState saves current state to file and to sphinx backends func (r *River) SaveState() { err := r.sphinxService.SaveSyncState() @@ -327,53 +292,34 @@ func (r *River) stopSyncRoutine() { } } -func (r *River) enableBuildMode() error { - r.syncM.Lock() - defer r.syncM.Unlock() - - if r.syncToken == nil { - r.l.Infof("did not enable build mode since river sync thread is not running") - return nil - } - return errors.Trace(r.syncService.SwitchBuildMode(true, switchBuildModeTimeout)) -} - -func (r *River) disableBuildMode() error { - r.syncM.Lock() - defer r.syncM.Unlock() - - if r.syncToken == nil { - r.l.Infof("did not disable build mode since river sync thread is not running") - return nil - } - return errors.Trace(r.syncService.SwitchBuildMode(false, switchBuildModeTimeout)) -} - func (r *River) checkAllIndexesForOptimize() { - for index, indexConfig := range r.c.DataSource { - err := r.sphinxService.CheckIndexForOptimize(index, indexConfig.Parts) + for index, cfg := range r.c.DataSource { + err := r.sphinxService.CheckIndexForOptimize(index, cfg.Parts) if err != nil { log.Warnf("periodic optimize error: %s", errors.ErrorStack(err)) } } } -// rebuildAll rebuilds all configured indexes -func (r *River) rebuildAll(ctx context.Context, reason string) error { - if r.c.SkipRebuild { - r.l.Infof("use skip_rebuild option, skipped rebuildAll indexes: [%s]", reason) - return nil - } +func (r *River) checkAllIndexesReady() error { + indexes := []string{} + for index, cfg := range r.c.DataSource { + ok, err := r.sphinxService.IndexIsReady(index, cfg.Parts) + if err != nil { + r.l.Errorf("Index got error for [%s] waiting %v", index, err) + return errors.Trace(err) + } - return nil -} + if !ok { + indexes = append(indexes, index) + } + } -func (r *River) rebuildIfNotReady(ctx context.Context) error { + if len(indexes) == 0 { + r.l.Infof("All indexes are ready to listen for events") + return nil + } - // isReady := func(index string, cfg *SourceConfig) (bool, error) { - // return r.sphinxService.IndexIsReady(index, cfg.Parts) - // } - // return r.rebuildIfNot(ctx, "index is not ready", isReady) return nil } diff --git a/river/sphinx_service.go b/river/sphinx_service.go index 5954067..69bc219 100644 --- a/river/sphinx_service.go +++ b/river/sphinx_service.go @@ -119,9 +119,7 @@ func (s *SphinxService) SaveSyncState() (err error) { s.sphm.Lock() defer s.sphm.Unlock() var master = s.riverInstance.master - if err = master.save(); err != nil { - return errors.Trace(err) - } + if s.sph == nil { return errors.Trace(errSphinxDisconnected) }