diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 89a8fc9da47..defa236bcf3 100755 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -789,11 +789,7 @@ func (c *changefeed) releaseResources(ctx context.Context) { c.wg.Wait() if c.ddlSink != nil { - canceledCtx, cancel := context.WithCancel(context.Background()) - cancel() - // TODO(dongmen): remove ctx from func ddlSink.close(), it is useless. - // We don't need to wait ddlSink Close, pass a canceled context is ok - if err := c.ddlSink.close(canceledCtx); err != nil { + if err := c.ddlSink.close(); err != nil { log.Warn("owner close ddlSink failed", zap.String("namespace", c.id.Namespace), zap.String("changefeed", c.id.ID), diff --git a/cdc/owner/changefeed_test.go b/cdc/owner/changefeed_test.go index e1b6d183b0b..b63ea777b3d 100644 --- a/cdc/owner/changefeed_test.go +++ b/cdc/owner/changefeed_test.go @@ -148,7 +148,7 @@ func (m *mockDDLSink) getCheckpointTsAndTableNames() (uint64, []*model.TableInfo return m.mu.checkpointTs, m.mu.currentTables } -func (m *mockDDLSink) close(_ context.Context) error { +func (m *mockDDLSink) close() error { m.wg.Wait() return nil } diff --git a/cdc/owner/ddl_sink.go b/cdc/owner/ddl_sink.go index ecbe8c3444a..538bec8df72 100644 --- a/cdc/owner/ddl_sink.go +++ b/cdc/owner/ddl_sink.go @@ -58,7 +58,7 @@ type DDLSink interface { // It will return after the bootstrap event is sent. emitBootstrap(ctx context.Context, bootstrap *model.DDLEvent) error // close the ddlsink, cancel running goroutine. - close(ctx context.Context) error + close() error } type ddlSinkImpl struct { @@ -406,7 +406,7 @@ func (s *ddlSinkImpl) emitSyncPoint(ctx context.Context, checkpointTs uint64) (e } } -func (s *ddlSinkImpl) close(ctx context.Context) (err error) { +func (s *ddlSinkImpl) close() (err error) { s.cancel() s.wg.Wait() diff --git a/cdc/owner/owner.go b/cdc/owner/owner.go index d0fd43b5093..56cce168ee5 100644 --- a/cdc/owner/owner.go +++ b/cdc/owner/owner.go @@ -483,6 +483,7 @@ func (o *ownerImpl) updateMetrics() { ownershipCounter.Add(float64(now.Sub(o.lastTickTime)) / float64(time.Second)) o.lastTickTime = now + // todo: move this to each changefeed individually ? for cfID, cf := range o.changefeeds { if cf.latestInfo != nil { changefeedStatusGauge.WithLabelValues(cfID.Namespace, cfID.ID).