Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

flink cdc可不可以支持,配置从最早或者最新的地方开始消费数据 #24

Closed
wcf1 opened this issue Aug 26, 2020 · 5 comments
Labels
question Further information is requested

Comments

@wcf1
Copy link

wcf1 commented Aug 26, 2020

现在一个新的flink cdc直接消费mysql表,默认首先会处理历史全量数据,但某些情况下,之前的数据可以不用管,只需要从最新的地方开始消费就可以了。

@wcf1 wcf1 changed the title flink cdc可不可以支持从最早或者最新的地方开始消费数据 flink cdc可不可以支持配置从最早或者最新的地方开始消费数据 Aug 26, 2020
@wcf1 wcf1 changed the title flink cdc可不可以支持配置从最早或者最新的地方开始消费数据 flink cdc可不可以支持,配置从最早或者最新的地方开始消费数据 Aug 26, 2020
@wcf1
Copy link
Author

wcf1 commented Aug 26, 2020

看了下代码,大于1.0.0的版本,可以通过类似下面的设置,避免全表扫描数据,而从最新的数据开始监控。
properties.put("snapshot.select.statement.overrides", "test.table1");
properties.put("snapshot.select.statement.overrides.test.table1", "select * from test.table1 where 1 != 1");
SourceFunction sourceFunction = MySQLSource.builder()
.hostname("localhost")
.port(3306)
.tableList("test.table1")
.username("user")
.password("password")
.debeziumProperties(properties)
.deserializer(debeziumDeserializationSchema) // converts SourceRecord to String
.build();

@wcf1
Copy link
Author

wcf1 commented Aug 26, 2020

这样设置更简单......

properties.put("snapshot.mode", "schema_only");

SourceFunction sourceFunction = MySQLSource.builder()
.hostname("localhost")
.port(3306)
.tableList("test.table1")
.username("user")
.password("password")
.debeziumProperties(properties)
.deserializer(debeziumDeserializationSchema) // converts SourceRecord to String
.build();

@wuchong
Copy link
Member

wuchong commented Aug 26, 2020

You are right. I have added this to FAQ.

Reply here again:

This can be controlled by the option debezium.snapshot.mode, you can set to:

  • never: Specifies that the connect should never use snapshots and that upon first startup with a logical server name the connector should read from the beginning of the binlog; this should be used with care, as it is only valid when the binlog is guaranteed to contain the entire history of the database.
  • schema_only: If you don’t need a consistent snapshot of the data but only need them to have the changes since the connector was started, you can use the schema_only option, where the connector only snapshots the schemas (not the data).

@wuchong wuchong closed this as completed Aug 26, 2020
@wuchong wuchong added the question Further information is requested label Aug 26, 2020
@niemanxingdadi01
Copy link

可以向canal 那样 消费就提交不会重新消费了

@409713427
Copy link

老哥 我是用postgresql-cdc按照properties.put("snapshot.mode", "schema_only");这个配置启动的时候会报错
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
你知道是为什么吗

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

4 participants