Skip to content

Commit

Permalink
instance:
Browse files Browse the repository at this point in the history
- fix to CanReplicateFrom(): can now replicate from master even if it
does not have log_slave_updates (this is expected behavior)
web:
- similar to the above, cluster.js supports dropping over a master
without log_slave_updates

instance_topology:
- better mutex release on MoveUpSlaves()
- better safety in RepointTo() (reemoving target slave from list)
- more audit, debug in RegroupSlaves()

binlog servers:
- instance_topology:
  - sortedSlaves() supports includeBinlogServerSubSlaves
  - added GetCandidateSlaveOfBinlogServerTopology()
  - getMostUpToDateActiveBinlogServer() returns also list of binlog
servers
  - thorough rewrite of
RegroupSlavesIncludingSubSlavesOfBinlogServers(): previous algorithm was
incorrect
  - safer RegroupSlavesBinlogServers()
- recovery:
  - added recoverDeadMasterInBinlogServerTopology(), binlgo server
topologies now covered and working

purge-binary-logs:
- added PurgeBinaryLogsTo(), PurgeBinaryLogsToCurrent() in instance_dao
- added CLI command
flush-binary-logs:
- returns updated instance

instance_dao:
- added SkipToNextBinaryLog()

web:
- hiding detach-slave, reattach-slave on master node modal
  • Loading branch information
shlomi-noach committed Sep 11, 2015
1 parent 06dea78 commit 215c242
Show file tree
Hide file tree
Showing 9 changed files with 294 additions and 87 deletions.
2 changes: 1 addition & 1 deletion build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#
set -e

RELEASE_VERSION="1.4.386"
RELEASE_VERSION="1.4.392"
TOPDIR=/tmp/orchestrator-release
export RELEASE_VERSION TOPDIR

Expand Down
21 changes: 20 additions & 1 deletion go/app/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func Cli(command string, strict bool, instance string, destination string, owner
}
var err error
if *config.RuntimeCLIFlags.BinlogFile == "" {
err = inst.FlushBinaryLogs(instanceKey, 1)
_, err = inst.FlushBinaryLogs(instanceKey, 1)
} else {
_, err = inst.FlushBinaryLogsTo(instanceKey, *config.RuntimeCLIFlags.BinlogFile)
}
Expand All @@ -335,6 +335,25 @@ func Cli(command string, strict bool, instance string, destination string, owner
}
fmt.Println(instanceKey.DisplayString())
}
case cliCommand("purge-binary-logs"):
{
if instanceKey == nil {
instanceKey = thisInstanceKey
}
if instanceKey == nil {
log.Fatal("Cannot deduce instance:", instance)
}
var err error
if *config.RuntimeCLIFlags.BinlogFile == "" {
log.Fatal("expecting --binlog value")
}

_, err = inst.PurgeBinaryLogsTo(instanceKey, *config.RuntimeCLIFlags.BinlogFile)
if err != nil {
log.Fatale(err)
}
fmt.Println(instanceKey.DisplayString())
}
case cliCommand("last-pseudo-gtid"):
{
if instanceKey == nil {
Expand Down
8 changes: 6 additions & 2 deletions go/inst/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,12 @@ func (this *Instance) CanReplicateFrom(other *Instance) (bool, error) {
if !other.LogBinEnabled {
return false, fmt.Errorf("instance does not have binary logs enabled: %+v", other.Key)
}
if !other.LogSlaveUpdatesEnabled {
return false, fmt.Errorf("instance does not have log_slave_updates enabled: %+v", other.Key)
if other.IsSlave() {
if !other.LogSlaveUpdatesEnabled {
return false, fmt.Errorf("instance does not have log_slave_updates enabled: %+v", other.Key)
}
// OK for a master to not have log_slave_updates
// Not OK for a slave, for it has to relay the logs.
}
if this.IsSmallerMajorVersion(other) && !this.IsBinlogServer() {
return false, fmt.Errorf("instance %+v has version %s, which is lower than %s on %+v ", this.Key, this.Version, other.Version, other.Key)
Expand Down
57 changes: 51 additions & 6 deletions go/inst/instance_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ var instanceReadChan = make(chan bool, backendDBConcurrency)
var instanceWriteChan = make(chan bool, backendDBConcurrency)

// Max concurrency for bulk topology operations
const topologyConcurrency = 100
const topologyConcurrency = 128

var topologyConcurrencyChan = make(chan bool, topologyConcurrency)

Expand Down Expand Up @@ -1645,18 +1645,18 @@ func RefreshInstanceSlaveHosts(instanceKey *InstanceKey) (*Instance, error) {
}

// FlushBinaryLogs attempts a 'FLUSH BINARY LOGS' statement on the given instance.
func FlushBinaryLogs(instanceKey *InstanceKey, count int) error {
func FlushBinaryLogs(instanceKey *InstanceKey, count int) (*Instance, error) {
for i := 0; i < count; i++ {
_, err := ExecInstance(instanceKey, `flush binary logs`)
if err != nil {
return log.Errore(err)
return nil, log.Errore(err)
}
}

log.Infof("flush-binary-logs count=%+v on %+v", count, *instanceKey)
AuditOperation("flush-binary-logs", instanceKey, "success")

return nil
return ReadTopologyInstance(instanceKey)
}

// FlushBinaryLogsTo attempts to 'FLUSH BINARY LOGS' until given binary log is reached
Expand All @@ -1670,8 +1670,29 @@ func FlushBinaryLogsTo(instanceKey *InstanceKey, logFile string) (*Instance, err
if distance < 0 {
return nil, log.Errorf("FlushBinaryLogsTo: target log file %+v is smaller than current log file %+v", logFile, instance.SelfBinlogCoordinates.LogFile)
}
err = FlushBinaryLogs(instanceKey, distance)
return instance, err
return FlushBinaryLogs(instanceKey, distance)
}

// FlushBinaryLogsTo attempts to 'PURGE BINARY LOGS' until given binary log is reached
func PurgeBinaryLogsTo(instanceKey *InstanceKey, logFile string) (*Instance, error) {
_, err := ExecInstanceNoPrepare(instanceKey, fmt.Sprintf("purge binary logs to '%s'", logFile))
if err != nil {
return nil, log.Errore(err)
}

log.Infof("purge-binary-logs to=%+v on %+v", logFile, *instanceKey)
AuditOperation("purge-binary-logs", instanceKey, "success")

return ReadTopologyInstance(instanceKey)
}

// FlushBinaryLogsTo attempts to 'PURGE BINARY LOGS' until given binary log is reached
func PurgeBinaryLogsToCurrent(instanceKey *InstanceKey) (*Instance, error) {
instance, err := ReadTopologyInstance(instanceKey)
if err != nil {
return instance, log.Errore(err)
}
return PurgeBinaryLogsTo(instanceKey, instance.SelfBinlogCoordinates.LogFile)
}

// StopSlaveNicely stops a slave such that SQL_thread and IO_thread are aligned (i.e.
Expand Down Expand Up @@ -1965,6 +1986,30 @@ func ChangeMasterTo(instanceKey *InstanceKey, masterKey *InstanceKey, masterBinl
return instance, err
}

// SkipToNextBinaryLog changes master position to beginning of next binlog
// USE WITH CARE!
// Use case is binlog servers where the master was gone & replaced by another.
func SkipToNextBinaryLog(instanceKey *InstanceKey) (*Instance, error) {
instance, err := ReadTopologyInstance(instanceKey)
if err != nil {
return instance, log.Errore(err)
}

nextFileCoordinates, err := instance.ExecBinlogCoordinates.NextFileCoordinates()
if err != nil {
return instance, log.Errore(err)
}
nextFileCoordinates.LogPos = 4
log.Debugf("Will skip replication on %+v to next binary log: %+v", instance.Key, nextFileCoordinates.LogFile)

instance, err = ChangeMasterTo(&instance.Key, &instance.MasterKey, &nextFileCoordinates, false, GTIDHintNeutral)
if err != nil {
return instance, log.Errore(err)
}
AuditOperation("skip-binlog", instanceKey, fmt.Sprintf("Skipped replication to next binary log: %+v", nextFileCoordinates.LogFile))
return StartSlave(instanceKey)
}

// ResetSlave resets a slave, breaking the replication
func ResetSlave(instanceKey *InstanceKey) (*Instance, error) {
instance, err := ReadTopologyInstance(instanceKey)
Expand Down
3 changes: 3 additions & 0 deletions go/inst/instance_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ func (this *InstanceKey) Formalize() *InstanceKey {

// Equals tests equality between this key and another key
func (this *InstanceKey) Equals(other *InstanceKey) bool {
if other == nil {
return false
}
return this.Hostname == other.Hostname && this.Port == other.Port
}

Expand Down
Loading

0 comments on commit 215c242

Please # to comment.