Skip to content

Commit

Permalink
add comment
Browse files Browse the repository at this point in the history
  • Loading branch information
GMHDBJD committed Oct 28, 2021
1 parent b16d2df commit fefa97c
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 0 deletions.
2 changes: 2 additions & 0 deletions dm/syncer/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type compactor struct {

// compactorWrap creates and runs a compactor instance.
func compactorWrap(inCh chan *job, syncer *Syncer) chan *job {
// Actually we can use a larger compact buffer-size, but if so, when user pause-task/stop-task, they may need to wait a longer time to wait all jobs flushed.
// TODO: implement ping-pong buffer.
bufferSize := syncer.cfg.QueueSize * syncer.cfg.WorkerCount / 4
compactor := &compactor{
inCh: inCh,
Expand Down
3 changes: 3 additions & 0 deletions dm/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,9 @@ func NewSyncer(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, notifier

func (s *Syncer) newJobChans() {
s.closeJobChans()
// DM originally cached s.cfg.QueueSize * s.cfg.WorkerCount dml jobs in memory in 2.0.X.
// Now if compact: false, dmlJobCh and dmlWorker will both cached s.cfg.QueueSize * s.cfg.WorkerCount/2 jobs.
// If compact: true, dmlJobCh, compactor buffer, compactor output channel and dmlWorker will all cached s.cfg.QueueSize * s.cfg.WorkerCount/4 jobs.
chanSize := s.cfg.QueueSize * s.cfg.WorkerCount / 2
if s.cfg.Compact {
chanSize /= 2
Expand Down

0 comments on commit fefa97c

Please # to comment.