diff --git a/dm/syncer/compactor.go b/dm/syncer/compactor.go index 217fd2e7626..f4ef7627020 100644 --- a/dm/syncer/compactor.go +++ b/dm/syncer/compactor.go @@ -39,6 +39,8 @@ type compactor struct { // compactorWrap creates and runs a compactor instance. func compactorWrap(inCh chan *job, syncer *Syncer) chan *job { + // Actually we can use a larger compact buffer-size, but if so, when user pause-task/stop-task, they may need to wait a longer time to wait all jobs flushed. + // TODO: implement ping-pong buffer. bufferSize := syncer.cfg.QueueSize * syncer.cfg.WorkerCount / 4 compactor := &compactor{ inCh: inCh, diff --git a/dm/syncer/syncer.go b/dm/syncer/syncer.go index 44dce10c748..4327c8c82d4 100644 --- a/dm/syncer/syncer.go +++ b/dm/syncer/syncer.go @@ -268,6 +268,9 @@ func NewSyncer(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, notifier func (s *Syncer) newJobChans() { s.closeJobChans() + // DM originally cached s.cfg.QueueSize * s.cfg.WorkerCount dml jobs in memory in 2.0.X. + // Now if compact: false, dmlJobCh and dmlWorker will both cached s.cfg.QueueSize * s.cfg.WorkerCount/2 jobs. + // If compact: true, dmlJobCh, compactor buffer, compactor output channel and dmlWorker will all cached s.cfg.QueueSize * s.cfg.WorkerCount/4 jobs. chanSize := s.cfg.QueueSize * s.cfg.WorkerCount / 2 if s.cfg.Compact { chanSize /= 2