Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#8824
Browse files Browse the repository at this point in the history
close tikv#8823

Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
rleungx authored and ti-chi-bot committed Nov 20, 2024
1 parent 57d5bdc commit 0a1eda9
Showing 1 changed file with 23 additions and 2 deletions.
25 changes: 23 additions & 2 deletions pkg/syncer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const (
keepaliveTime = 10 * time.Second
keepaliveTimeout = 3 * time.Second
msgSize = 8 * units.MiB
retryInterval = time.Second
)

// StopSyncWithLeader stop to sync the region with leader.
Expand Down Expand Up @@ -150,7 +151,12 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
}
}
log.Error("server failed to establish sync stream with leader", zap.String("server", s.server.Name()), zap.String("leader", s.server.GetLeader().GetName()), errs.ZapError(err))
time.Sleep(time.Second)
select {
case <-ctx.Done():
log.Info("stop synchronizing with leader due to context canceled")
return
case <-time.After(retryInterval):
}
continue
}
log.Info("server starts to synchronize with leader", zap.String("server", s.server.Name()), zap.String("leader", s.server.GetLeader().GetName()), zap.Uint64("request-index", s.history.GetNextIndex()))
Expand All @@ -162,7 +168,12 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
if err = stream.CloseSend(); err != nil {
log.Error("failed to terminate client stream", errs.ZapError(errs.ErrGRPCCloseSend, err))
}
time.Sleep(time.Second)
select {
case <-ctx.Done():
log.Info("stop synchronizing with leader due to context canceled")
return
case <-time.After(retryInterval):
}
break
}
if s.history.GetNextIndex() != resp.GetStartIndex() {
Expand Down Expand Up @@ -206,7 +217,17 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
log.Debug("region is stale", zap.Stringer("origin", origin.GetMeta()), errs.ZapError(err))
continue
}
<<<<<<< HEAD
saveKV, _, _ := regionGuide(region, origin)
=======
cctx := &core.MetaProcessContext{
Context: ctx,
TaskRunner: ratelimit.NewSyncRunner(),
Tracer: core.NewNoopHeartbeatProcessTracer(),
// no limit for followers.
}
saveKV, _, _, _ := regionGuide(cctx, region, origin)
>>>>>>> 41ec8dced (syncer: exit watch leader immediately (#8824))
overlaps := bc.PutRegion(region)

if hasBuckets {
Expand Down

0 comments on commit 0a1eda9

Please # to comment.