Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

canal支持aliyun rds的binlog订阅 #727

Closed
agapple opened this issue Jun 30, 2018 · 37 comments
Closed

canal支持aliyun rds的binlog订阅 #727

agapple opened this issue Jun 30, 2018 · 37 comments
Assignees
Milestone

Comments

@agapple
Copy link
Member

agapple commented Jun 30, 2018

aliyun是一家伟大的公司,也是目前国内云市场占比最高的,考虑前面有太多的小伙伴和我反馈过期望能更全面的支持RDS binlog的订阅。aliyun RDS主要满足用户对于MySQL的简单方便使用,针对用户业务发展迅速,对于未来存储/计算扩展性有预期的也会使用aliyun DRDS( Distributed Relational Database Service),一个基于MySQL sharding的分布式数据库解决方案,因为存储本身是MySQL也可以比较方便的基于canal进行数据订阅。

首先明确一下针对这类云MySQL的binlog订阅,通常会面临的几个问题:

  1. 账号权限问题 [已解决]
  • canal的策略是模拟了MySQL Slave的行为,因此需要有SELECT, REPLICATION SLAVE, REPLICATION CLIENT的权限
  • 解决思路:目前aliyun上的RDS默认创建的账号已经自带了这些权限,针对RDS 5.6/5.7的高权限实例,可以用root账号额外进行一下授权,授权操作可参考[QuickStart]
  1. binlog被删除的问题 [已解决]
  • 对应的binlog清理策略, 超过18小时之后会删除并备份到oss之上,如果canal任务停止超过18小时就会遇到xx类似错误,
  • 解决思路:RDS默认提供了一段时间oss binlog的下载能力,参考文档。canal可以识别位点中的时间戳,对比一下RDS中show binary logs里最早的一条binlog,如果不满足则会通过oss接口进行下载到本机进行解析,追平历史binlog之后再切换到RDS binlog中继续消费 【canal代码会支持】
    image
  1. 主备切换导致的问题 [已解决]
  • 一般云MySQL的主备方案都采用了vip模式,屏蔽了后端物理节点之间的主备切换,也就是对于canal来说只看到了单节点的MySQL ip,针对物理上进行主备切换时拿着老主库位点去订阅时会遇到xx类似错误
  • 解决思路: 针对MySQL5.6+可以使用canal gtid的订阅方式(针对出现问题2时,需要进行本地binlog解析就无法很好的支持),或者比较推荐的就是基于serverId自动识别主备切换,每次进行binlog订阅时,检查一下位点中的serverId和当前数据库节点的serverId是否一致,如果不一致说明服务端产生了主备切换,可以基于时间戳重新在新的主库中定位到对应的binlog,再继续后续的消费即可。ps. 这里定位位点时需要考虑binlog被删除的情况,参考问题2
@agapple agapple self-assigned this Jun 30, 2018
@agapple agapple added this to the v1.0.26 milestone Jun 30, 2018
@agapple
Copy link
Member Author

agapple commented Aug 9, 2018

代码提交:ea6391d

@agapple
Copy link
Member Author

agapple commented Aug 13, 2018

具体aliyun rds使用文档:https://github.com/alibaba/canal/wiki/aliyun-RDS-QuickStart

@agapple agapple closed this as completed Aug 13, 2018
lcybo pushed a commit to lcybo/canal that referenced this issue Aug 17, 2018
@hustwolf
Copy link

hustwolf commented Nov 20, 2018

我用的1.1.1版本的,还有不少bug

@agapple
Copy link
Member Author

agapple commented Nov 20, 2018

@hustwolf 有问题需要反馈

@fanqiejiang8
Copy link

现在需求要监控RDS的biglog日志,现在可能会遇到两个问题,可以解答一下吗 第一个问题是 rds实例切换还能监控biglog的日志吗 第二个问题是 poll的biglog日志跟rds上的一样吗 期待你的回复

@agapple
Copy link
Member Author

agapple commented Jun 19, 2019

  1. 主备切换可以自动切换
  2. poll拉取的rds oss binlog就是原始binlog备份上去的,是一致的

@kingyanglei
Copy link

启动canal时抛了异常:Caused by: java.io.IOException: connect rm-**.mysql.rds.aliyuncs.com/139.224..:3306 failure,Caused by: java.net.SocketTimeoutException: connect timed out,我是在instance配置文件中指定master.address为rds的域名,能否提供一下帮助?多谢!

@fanqiejiang8
Copy link

RDS切换时发现对应的数据会重复或者丢失,怎么可以避免这种情况

@agapple
Copy link
Member Author

agapple commented Jul 12, 2019

RDS切换数据丢失应该不至于啊,能描述一下你分析到的具体丢失原因?

@ShakespeareFeng
Copy link

  1. 主备切换可以自动切换
  2. poll拉取的rds oss binlog就是原始binlog备份上去的,是一致的

主备自动切换,需要怎么配置?

@agapple
Copy link
Member Author

agapple commented Aug 29, 2019

多看wiki

@fanqiejiang8
Copy link

fanqiejiang8 commented Sep 5, 2019 via email

@changwenwen
Copy link

@agapple 你好,我想问下,我这边rds是5.6的,canal版本是1.1.4的,一直报错SHOW command denied to user for table 'slave_master_info',一定需要root账号赋权限吗,然而阿里云rds的root账号又不给的,这怎么办呢

@fkc-zyk
Copy link

fkc-zyk commented Mar 6, 2020

通过canal的同步之后,主数据库和被数据库的binlog日志可以保持事务的一致性吗?也就是说备份之后,原事务是否会被拆散?

@fkc-zyk
Copy link

fkc-zyk commented Mar 6, 2020

通过canal的同步之后,主数据库和备份数据库的binlog日志可以保持事务的一致性吗?也就是说备份之后,原事务是否会被拆散?

@lt5227
Copy link

lt5227 commented Mar 23, 2020

我用Canal同步阿里云RDS MySQL数据库数据,周五的时候停止了服务,到周一的时候再开服务,控制台报错
EventParser] ERROR c.a.o.c.common.alarm.LogAlarmHandler - destination:example[java.io.IOException: Received error packet: errno = 1236, sqlstate = HY000 errmsg = Could not find first log file name in binary log index file
at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher.fetch(DirectLogFetcher.java:102)
at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.dump(MysqlConnection.java:235)
at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3.run(AbstractEventParser.java:265)
at java.lang.Thread.run(Thread.java:748)
]
这种情况我该如何去解决呢?

@Carrot0226
Copy link

我用canal 同步RDS MySQL版数据库中的数据时,启动canal,一直报这个错误,怎么解决呢?
ERROR com.alibaba.otter.canal.common.alarm.LogAlarmHandler - destination:example[com.alibaba.otter.canal.parse.exception.CanalParseException: com.alibaba.otter.canal.parse.exception.CanalParseException: com.alibaba.otter.canal.parse.exception.CanalParseException: fetch failed by table meta:dp.workflow_schedule
Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: com.alibaba.otter.canal.parse.exception.CanalParseException: fetch failed by table meta:dp.workflow_schedule
Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: fetch failed by table meta:dp.workflow_schedule

@saigu
Copy link

saigu commented Jun 18, 2020

使用过程中发现几个问题:
1.好像admin上instance的配置,会被canal.properties覆盖?导致在instance中配置canal.instance.rds.accesskey 其实没有用,必须在集群主配置中配置canal.aliyun.accessKey才行。

public class PlainCanalInstanceGenerator implements CanalInstanceGenerator {
    private static final Logger    logger      = LoggerFactory.getLogger(SpringCanalInstanceGenerator.class);
    private String                 springXml;
    private PlainCanalConfigClient canalConfigClient;
    private String                 defaultName = "instance";
    private BeanFactory            beanFactory;
    private Properties             canalConfig;

    public PlainCanalInstanceGenerator(Properties canalConfig){
        this.canalConfig = canalConfig;
    }

    public CanalInstance generate(String destination) {
        synchronized (CanalInstanceGenerator.class) {
            try {
//这里是从admin上拉到的instance配置
                PlainCanal canal = canalConfigClient.findInstance(destination, null);
                if (canal == null) {
                    throw new CanalException("instance : " + destination + " config is not found");
                }
                Properties properties = canal.getProperties();
                // merge local
                properties.putAll(canalConfig);// 这里是原本的写法
//                canalConfig.putAll(properties);//应该改成这样
                // 设置动态properties,替换掉本地properties
                com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer.propertiesLocal.set(properties);
                // 设置当前正在加载的通道,加载spring查找文件时会用到该变量
                System.setProperty("canal.instance.destination", destination);
                this.beanFactory = getBeanFactory(springXml);
                String beanName = destination;
Å                if (!beanFactory.containsBean(beanName)) {
                    beanName = defaultName;
                }

                return (CanalInstance) beanFactory.getBean(beanName);
            } catch (Throwable e) {
                logger.error("generator instance failed.", e);
                throw new CanalException(e);
            } finally {
                System.setProperty("canal.instance.destination", "");
            }
        }
    }

2.在instance中配置canal.instance.rds.accesskey其实也是不对的,代码里面真正构建spring bean的时候使用的canal.aliyun.accessKey。
虽然在canalController中有个改写的逻辑,但是其实这个只对初始化的时候的canal配置有效,对instance配置无效。

 // 兼容1.1.0版本的ak/sk参数名
        String accesskey = getProperty(properties, "canal.instance.rds.accesskey");
        String secretkey = getProperty(properties, "canal.instance.rds.secretkey");
        if (StringUtils.isNotEmpty(accesskey)) {
            System.setProperty(CanalConstants.CANAL_ALIYUN_ACCESSKEY, accesskey);
        }
        if (StringUtils.isNotEmpty(secretkey)) {
            System.setProperty(CanalConstants.CANAL_ALIYUN_SECRETKEY, secretkey);
        }

3.源码中给的xml文件是有问题的,base-instance.xml中的canal.aliyun.accesskey的"accesskey"小写了,应该改成大写accessKey

`<bean id="baseEventParser" class="com.alibaba.otter.canal.parse.inbound.mysql.rds.RdsBinlogEventParserProxy" abstract="true">
		<property name="accesskey" value="${canal.aliyun.accesskey:}" />
		<property name="secretkey" value="${canal.aliyun.secretkey:}" />
		<property name="instanceId" value="${canal.instance.rds.instanceId:}" />
	</bean>`

4.要在1.1.4版本正确使用,做如下操作:
1)在集群配置中配置阿里云ak
2) 把xml文件中的大小写改正确

@saigu
Copy link

saigu commented Jun 18, 2020

我用Canal同步阿里云RDS MySQL数据库数据,周五的时候停止了服务,到周一的时候再开服务,控制台报错
EventParser] ERROR c.a.o.c.common.alarm.LogAlarmHandler - destination:example[java.io.IOException: Received error packet: errno = 1236, sqlstate = HY000 errmsg = Could not find first log file name in binary log index file
at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher.fetch(DirectLogFetcher.java:102)
at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.dump(MysqlConnection.java:235)
at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3.run(AbstractEventParser.java:265)
at java.lang.Thread.run(Thread.java:748)
]
这种情况我该如何去解决呢?

不要暂停太久,实例上的binlog文件会定期刷掉,上传oss的。

@SevenBlue2018
Copy link

SevenBlue2018 commented Dec 8, 2020

我有些疑惑:通过阅读该issue和文档:https://github.com/alibaba/canal/wiki/aliyun-RDS-QuickStart 后,现在1.1.4稳定版本已经解决了阿里云rds自动主从切换导致binlog位点变更同步失败的问题吗?另外rds默认18小时清理,配置上传oss后,有什么作用?

@saigu
Copy link

saigu commented Dec 8, 2020

我有些疑惑:通过阅读该issue和文档:https://github.com/alibaba/canal/wiki/aliyun-RDS-QuickStart 后,现在1.1.4稳定版本已经解决了阿里云rds自动主从切换导致binlog位点变更同步失败的问题吗?另外rds默认18小时清理,配置上传oss后,有什么作用?

已经解决了主从切换的问题,通过时间戳定位。
历史binlog文件上传oss后,rds实例上只能读到当前正在写入的binlog,无法读到历史binlog。
如果配置了ak的话,当指定binlog时间开始同步,如果指定的时间位点比当前binlog中最早的时间位点仍然早,就会去oss上下载更早的binlog文件进行解析,然后开始同步。

@SevenBlue2018
Copy link

我有些疑惑:通过阅读该issue和文档:https://github.com/alibaba/canal/wiki/aliyun-RDS-QuickStart 后,现在1.1.4稳定版本已经解决了阿里云rds自动主从切换导致binlog位点变更同步失败的问题吗?另外rds默认18小时清理,配置上传oss后,有什么作用?

已经解决了主从切换的问题,通过时间戳定位。
历史binlog文件上传oss后,rds实例上只能读到当前正在写入的binlog,无法读到历史binlog。
如果配置了ak的话,当指定binlog时间开始同步,如果指定的时间位点比当前binlog中最早的时间位点仍然早,就会去oss上下载更早的binlog文件进行解析,然后开始同步。

此逻辑是canal自动完成吗?我刚接手公司这个项目,有人反馈rds做了主从切换导致同步失败了。我还在梳理中,就来官方issue里问下canal是否已经自己就具备该能力。您说的时间戳定位,是否还需要人工介入呢?当主从切换时,Canal会自动根据该时间戳去从原本订阅主切换到订阅从来保证同步的正常进行吗?

@saigu
Copy link

saigu commented Dec 8, 2020

我有些疑惑:通过阅读该issue和文档:https://github.com/alibaba/canal/wiki/aliyun-RDS-QuickStart 后,现在1.1.4稳定版本已经解决了阿里云rds自动主从切换导致binlog位点变更同步失败的问题吗?另外rds默认18小时清理,配置上传oss后,有什么作用?

已经解决了主从切换的问题,通过时间戳定位。
历史binlog文件上传oss后,rds实例上只能读到当前正在写入的binlog,无法读到历史binlog。
如果配置了ak的话,当指定binlog时间开始同步,如果指定的时间位点比当前binlog中最早的时间位点仍然早,就会去oss上下载更早的binlog文件进行解析,然后开始同步。

此逻辑是canal自动完成吗?我刚接手公司这个项目,有人反馈rds做了主从切换导致同步失败了。我还在梳理中,就来官方issue里问下canal是否已经自己就具备该能力。您说的时间戳定位,是否还需要人工介入呢?当主从切换时,Canal会自动根据该时间戳去从原本订阅主切换到订阅从来保证同步的正常进行吗?

嗯,我们使用的1.1.4版本。主从切换后会有一会失败,大概一分钟左右自动恢复,不需要人工介入。

@SevenBlue2018
Copy link

我有些疑惑:通过阅读该issue和文档:https://github.com/alibaba/canal/wiki/aliyun-RDS-QuickStart 后,现在1.1.4稳定版本已经解决了阿里云rds自动主从切换导致binlog位点变更同步失败的问题吗?另外rds默认18小时清理,配置上传oss后,有什么作用?

已经解决了主从切换的问题,通过时间戳定位。
历史binlog文件上传oss后,rds实例上只能读到当前正在写入的binlog,无法读到历史binlog。
如果配置了ak的话,当指定binlog时间开始同步,如果指定的时间位点比当前binlog中最早的时间位点仍然早,就会去oss上下载更早的binlog文件进行解析,然后开始同步。

此逻辑是canal自动完成吗?我刚接手公司这个项目,有人反馈rds做了主从切换导致同步失败了。我还在梳理中,就来官方issue里问下canal是否已经自己就具备该能力。您说的时间戳定位,是否还需要人工介入呢?当主从切换时,Canal会自动根据该时间戳去从原本订阅主切换到订阅从来保证同步的正常进行吗?

嗯,我们使用的1.1.4版本。主从切换后会有一会失败,大概一分钟左右自动恢复,不需要人工介入。

好的,我明天做下验证,感谢!

@SevenBlue2018
Copy link

我有些疑惑:通过阅读该issue和文档:https://github.com/alibaba/canal/wiki/aliyun-RDS-QuickStart 后,现在1.1.4稳定版本已经解决了阿里云rds自动主从切换导致binlog位点变更同步失败的问题吗?另外rds默认18小时清理,配置上传oss后,有什么作用?

已经解决了主从切换的问题,通过时间戳定位。
历史binlog文件上传oss后,rds实例上只能读到当前正在写入的binlog,无法读到历史binlog。
如果配置了ak的话,当指定binlog时间开始同步,如果指定的时间位点比当前binlog中最早的时间位点仍然早,就会去oss上下载更早的binlog文件进行解析,然后开始同步。

请教下,我查了下我们目前遇到的问题,并不是rds主从切换导致的问题。而是表结构变更,新增了一个字段,然后canal写入meta_history时出错了。一直提示这样一个错误:“Cause: org.h2.jdbc.JdbcSQLException: Table "META_HISTORY" not found; SQL statement:”。想请教下这是因为什么?

@Myosotissylvatica36
Copy link

启动canal时抛了异常:Caused by: java.io.IOException: connect rm-**__.mysql.rds.aliyuncs.com/139.224..:3306 failure,Caused by: java.net.SocketTimeoutException: connect timed out,我是在instance配置文件中指定master.address为rds的域名,能否提供一下帮助?多谢!

请问你是如何解决的?我也遇到了同样的问题

@saigu
Copy link

saigu commented Feb 3, 2021

我有些疑惑:通过阅读该issue和文档:https://github.com/alibaba/canal/wiki/aliyun-RDS-QuickStart 后,现在1.1.4稳定版本已经解决了阿里云rds自动主从切换导致binlog位点变更同步失败的问题吗?另外rds默认18小时清理,配置上传oss后,有什么作用?

已经解决了主从切换的问题,通过时间戳定位。
历史binlog文件上传oss后,rds实例上只能读到当前正在写入的binlog,无法读到历史binlog。
如果配置了ak的话,当指定binlog时间开始同步,如果指定的时间位点比当前binlog中最早的时间位点仍然早,就会去oss上下载更早的binlog文件进行解析,然后开始同步。

请教下,我查了下我们目前遇到的问题,并不是rds主从切换导致的问题。而是表结构变更,新增了一个字段,然后canal写入meta_history时出错了。一直提示这样一个错误:“Cause: org.h2.jdbc.JdbcSQLException: Table "META_HISTORY" not found; SQL statement:”。想请教下这是因为什么?

大概是tsdb的问题吧。
配置里面不要使用tsdb了。

@saigu
Copy link

saigu commented Feb 3, 2021

启动canal时抛了异常:Caused by: java.io.IOException: connect rm-**__.mysql.rds.aliyuncs.com/139.224..:3306 failure,Caused by: java.net.SocketTimeoutException: connect timed out,我是在instance配置文件中指定master.address为rds的域名,能否提供一下帮助?多谢!

请问你是如何解决的?我也遇到了同样的问题

在canal所在服务器上,尝试一下telnet host(你的数据库域名) 336,确认下网络是通的。
如果网络是通的,就检查下阿里云的白名单。

@gujiwork
Copy link

gujiwork commented Mar 5, 2021

RDS版本: 5.6 Canel: 1.1.4 遇到这个问题,请问该怎么解决?

2021-03-05 12:29:49.791 [pool-8-thread-1] ERROR c.a.o.c.adapter.launcher.loader.CanalAdapterKafkaWorker - java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.lang.RuntimeException: java.lang.RuntimeException: Target column: #alibaba_rds_row_id# not matched
java.lang.RuntimeException: java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.lang.RuntimeException: java.lang.RuntimeException: Target column: #alibaba_rds_row_id# not matched
at com.alibaba.otter.canal.client.adapter.rdb.RdbAdapter.sync(RdbAdapter.java:171)
at com.alibaba.otter.canal.adapter.launcher.loader.AbstractCanalAdapterWorker.batchSync(AbstractCanalAdapterWorker.java:201)
at com.alibaba.otter.canal.adapter.launcher.loader.AbstractCanalAdapterWorker.lambda$null$4(AbstractCanalAdapterWorker.java:106)
at java.util.ArrayList.forEach(ArrayList.java:1259)
at com.alibaba.otter.canal.adapter.launcher.loader.AbstractCanalAdapterWorker.lambda$null$5(AbstractCanalAdapterWorker.java:103)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.lang.RuntimeException: java.lang.RuntimeException: Target column: #alibaba_rds_row_id# not matched
at com.alibaba.otter.canal.client.adapter.rdb.service.RdbSyncService.lambda$sync$2(RdbSyncService.java:132)
at java.util.ArrayList.forEach(ArrayList.java:1259)
at com.alibaba.otter.canal.client.adapter.rdb.service.RdbSyncService.sync(RdbSyncService.java:128)
at com.alibaba.otter.canal.client.adapter.rdb.service.RdbMirrorDbSyncService.sync(RdbMirrorDbSyncService.java:83)
at com.alibaba.otter.canal.client.adapter.rdb.RdbAdapter.sync(RdbAdapter.java:169)
... 8 common frames omitted
Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: java.lang.RuntimeException: Target column: #alibaba_rds_row_id# not matched
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at com.alibaba.otter.canal.client.adapter.rdb.service.RdbSyncService.lambda$sync$2(RdbSyncService.java:130)
... 12 common frames omitted
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: Target column: #alibaba_rds_row_id# not matched
at com.alibaba.otter.canal.client.adapter.rdb.service.RdbSyncService.lambda$sync$1(RdbSyncService.java:123)

@michael-liumh
Copy link

启动canal时抛了异常:Caused by: java.io.IOException: connect rm-**__.mysql.rds.aliyuncs.com/139.224..:3306 failure,Caused by: java.net.SocketTimeoutException: connect timed out,我是在instance配置文件中指定master.address为rds的域名,能否提供一下帮助?多谢!

请问你是如何解决的?我也遇到了同样的问题

目前个人觉得可用的解决方式是设置canal_adapter配置文件conf/application.yml的retries为-1,然后发现原表执行DDL变更后,先停止canal_adapter,然后手动修改目标表结构后,再次启动canal_adapter,canal_adapter会把之前写入失败的数据重新写入

@michael-liumh
Copy link

michael-liumh commented Apr 22, 2021

canal_adapter的配置文件conf/rdb/mytest_user.yml配置了etlCondition: "where uid=1"不生效,请问有解决方案吗?

@Ryu-Z
Copy link

Ryu-Z commented Aug 25, 2021

使用过程中发现几个问题:
1.好像admin上instance的配置,会被canal.properties覆盖?导致在instance中配置canal.instance.rds.accesskey 其实没有用,必须在集群主配置中配置canal.aliyun.accessKey才行。

public class PlainCanalInstanceGenerator implements CanalInstanceGenerator {
    private static final Logger    logger      = LoggerFactory.getLogger(SpringCanalInstanceGenerator.class);
    private String                 springXml;
    private PlainCanalConfigClient canalConfigClient;
    private String                 defaultName = "instance";
    private BeanFactory            beanFactory;
    private Properties             canalConfig;

    public PlainCanalInstanceGenerator(Properties canalConfig){
        this.canalConfig = canalConfig;
    }

    public CanalInstance generate(String destination) {
        synchronized (CanalInstanceGenerator.class) {
            try {
//这里是从admin上拉到的instance配置
                PlainCanal canal = canalConfigClient.findInstance(destination, null);
                if (canal == null) {
                    throw new CanalException("instance : " + destination + " config is not found");
                }
                Properties properties = canal.getProperties();
                // merge local
                properties.putAll(canalConfig);// 这里是原本的写法
//                canalConfig.putAll(properties);//应该改成这样
                // 设置动态properties,替换掉本地properties
                com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer.propertiesLocal.set(properties);
                // 设置当前正在加载的通道,加载spring查找文件时会用到该变量
                System.setProperty("canal.instance.destination", destination);
                this.beanFactory = getBeanFactory(springXml);
                String beanName = destination;
Å                if (!beanFactory.containsBean(beanName)) {
                    beanName = defaultName;
                }

                return (CanalInstance) beanFactory.getBean(beanName);
            } catch (Throwable e) {
                logger.error("generator instance failed.", e);
                throw new CanalException(e);
            } finally {
                System.setProperty("canal.instance.destination", "");
            }
        }
    }

2.在instance中配置canal.instance.rds.accesskey其实也是不对的,代码里面真正构建spring bean的时候使用的canal.aliyun.accessKey。
虽然在canalController中有个改写的逻辑,但是其实这个只对初始化的时候的canal配置有效,对instance配置无效。

 // 兼容1.1.0版本的ak/sk参数名
        String accesskey = getProperty(properties, "canal.instance.rds.accesskey");
        String secretkey = getProperty(properties, "canal.instance.rds.secretkey");
        if (StringUtils.isNotEmpty(accesskey)) {
            System.setProperty(CanalConstants.CANAL_ALIYUN_ACCESSKEY, accesskey);
        }
        if (StringUtils.isNotEmpty(secretkey)) {
            System.setProperty(CanalConstants.CANAL_ALIYUN_SECRETKEY, secretkey);
        }

3.源码中给的xml文件是有问题的,base-instance.xml中的canal.aliyun.accesskey的"accesskey"小写了,应该改成大写accessKey

`<bean id="baseEventParser" class="com.alibaba.otter.canal.parse.inbound.mysql.rds.RdsBinlogEventParserProxy" abstract="true">
		<property name="accesskey" value="${canal.aliyun.accesskey:}" />
		<property name="secretkey" value="${canal.aliyun.secretkey:}" />
		<property name="instanceId" value="${canal.instance.rds.instanceId:}" />
	</bean>`

4.要在1.1.4版本正确使用,做如下操作:
1)在集群配置中配置阿里云ak 2) 把xml文件中的大小写改正确

请问下这个问题现在官方有fixbug吗,我这边目前canal-admin 配置ak 失败

@zhuzejun123
Copy link

2022-06-02 16:19:16.147 [rds-binlog-daemon-thread] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\\\..*$ 2022-06-02 16:19:16.148 [rds-binlog-daemon-thread] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$ 2022-06-02 16:19:16.290 [destination = test , address = rm-wz99h50q061dhb101go.mysql.rds.aliyuncs.com/47.120.211.2:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsLocalBinlogEventParser - ---> begin to find start position, it will be long time for reset or first position 2022-06-02 16:19:16.298 [destination = test , address = rm-wz99h50q061dhb101go.mysql.rds.aliyuncs.com/47.120.211.2:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsLocalBinlogEventParser - ---> find start position successfully, EntryPosition[included=false,journalName=<null>,position=<null>,serverId=<null>,gtid=<null>,timestamp=1654074000000] cost : 0ms , the next step is binlog dump 2022-06-02 16:19:16.493 [destination = test , address = rm-wz99h50q061dhb101go.mysql.rds.aliyuncs.com/47.120.211.2:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsLocalBinlogEventParser - parse local binlog file : mysql-bin.000249 , timestamp : 1654075524000 , try the next binlog ! 2022-06-02 16:19:26.661 [destination = test , address = rm-wz99h50q061dhb101go.mysql.rds.aliyuncs.com/47.120.211.2:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsLocalBinlogEventParser - parse local binlog file : mysql-bin.000250 , timestamp : 1654096633000 , try the next binlog ! 2022-06-02 16:19:26.786 [destination = test , address = rm-wz99h50q061dhb101go.mysql.rds.aliyuncs.com/47.120.211.2:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsLocalBinlogEventParser - parse local binlog file : mysql-bin.000251 , timestamp : 1654118148000 , try the next binlog ! 2022-06-02 16:19:36.383 [destination = test , address = rm-wz99h50q061dhb101go.mysql.rds.aliyuncs.com/47.120.211.2:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsLocalBinlogEventParser - last file : mysql-bin.000252 , timestamp : 1654140413000 , all file parse complete, switch to mysql parser! 2022-06-02 16:19:36.386 [download-/tmp/test] ERROR c.a.o.canal.parse.inbound.mysql.rds.BinlogDownloadQueue - task process failed java.lang.InterruptedException: null at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014) ~[na:1.8.0_181] at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2088) ~[na:1.8.0_181] at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467) ~[na:1.8.0_181] at com.alibaba.otter.canal.parse.inbound.mysql.rds.BinlogDownloadQueue$DownloadThread.run(BinlogDownloadQueue.java:295) ~[canal.parse-1.1.5.jar:na] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_181] 2022-06-02 16:19:36.390 [destination = test , address = rm-wz99h50q061dhb101go.mysql.rds.aliyuncs.com/47.120.211.2:3306 , EventParser] WARN c.a.o.canal.parse.inbound.mysql.LocalBinLogConnection - LocalBinLogConnection dump interrupted 2022-06-02 16:19:36.513 [rds-binlog-daemon-thread] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\\\..*$ 2022-06-02 16:19:36.513 [rds-binlog-daemon-thread] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$ 2022-06-02 16:19:36.841 [destination = test , address = rm-wz99h50q061dhb101go.mysql.rds.aliyuncs.com/47.120.211.2:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position 2022-06-02 16:19:36.857 [destination = test , address = rm-wz99h50q061dhb101go.mysql.rds.aliyuncs.com/47.120.211.2:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just last position {"identity":{"slaveId":-1,"sourceAddress":{"address":"rm-wz99h50q061dhb101go.mysql.rds.aliyuncs.com","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000252","position":439060,"serverId":21181272,"timestamp":1654140503000}} 2022-06-02 16:19:36.908 [destination = test , address = rm-wz99h50q061dhb101go.mysql.rds.aliyuncs.com/47.120.211.2:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000252,position=439060,serverId=21181272,gtid=,timestamp=1654140503000] cost : 67ms , the next step is binlog dump

还是有报错信息,也没解析成功

@thesky2017
Copy link

image
这个 canal.aliyun.accessKey 和 canal.aliyun.secretKey 能用子账号创建吗?

@HelloLyfing
Copy link

HelloLyfing commented Oct 22, 2024

最近,我公司需要通过阿里云Rds的binlog历史文件进行消息回溯(我公司使用的是阿里云全家桶),我下载了最新的canal-1.1.7正式版的版本后,尝试了很多次都没办法正常使用该功能,在此之前也尝试过canal-1.1.0的版本都是无法正常跑通。于是自己抽时间改造了一个我公司可以使用的版本,该版本是从本项目的canal-1.1.7-hotfix-1分支fork出来的:
https://github.com/HelloLyfing/canal/tree/feature/canal-1.1.7-hotfix-lyfing2410 , 改动点只涉及canal.parse模块。
此处共享出来,所以有需要的朋友可以考虑单独打包parse模块并上传服务器使用。如果官方觉得代码改动OK的话,也可以call我下,我可以pull req,个人非常乐意为canal的发展做点贡献。

我改造后的canal.parse-1.1.7.jar版本已在我公司的生产环境运行测试,一切正常。以下为改动、优化内容概述:
1)阿里云RDS binlog消费的触发点扩充,@see handleMysqlParserException,否则我公司的场景无法触发Rds的binlog消费逻辑(即处理 Could not find first log file name in binary log index file异常);
2)RDS下载逻辑的部分代码优化,取消一些不常用变量;增加maxWait超时机制;将消费位点中的journalName作为精确下载的开始(之前是直接拿OSS接口返回中的第一个);消费位点配置简化,仅依赖journalName + position即可,不再强依赖时间戳,方便快速配置回溯(注意需要关闭tsdb.enable,如果启动了tsdb,则需要设置时间戳,最好是选择Binlog开始前1小时作为时间戳起点); BinlogDownloadQueue#download逻辑收口,保证单一线程执行,其他线程通过Wait + File接口查询是否已下载好;
3) 阿里云RDS的解析退出机制完善; 最后一个文件处理完成后,即返回,不再通过queue.waitNextFile等待;
4)增加一些注释,输出更多日志,方便开发者查看问题或锚点;增加更多定向destination日志输出,方便查看给定destination的全链路状况;

简单的使用文档: https://github.com/HelloLyfing/canal/wiki/canal.parse%E2%80%901.1.7.jar

另外官方的使用文档也可以适当更新,比如: canal.aliyun.accessKey的配置貌似不可以(注意key的大小写填错了不行),应该是:
canal.aliyun.accesskey / canal.aliyun.secretkey

@callmedba
Copy link

最近,我公司需要通过阿里云Rds的binlog历史文件进行消息回溯(我公司使用的是阿里云全家桶),我下载了最新的canal-1.1.7正式版的版本后,尝试了很多次都没办法正常使用该功能,在此之前也尝试过canal-1.1.0的版本都是无法正常跑通。于是自己抽时间改造了一个我公司可以使用的版本,该版本是从本项目的canal-1.1.7-hotfix-1分支fork出来的: https://github.com/HelloLyfing/canal/tree/feature/canal-1.1.7-hotfix-lyfing2410 , 改动点只涉及canal.parse模块。 此处共享出来,所以有需要的朋友可以考虑单独打包parse模块并上传服务器使用。如果官方觉得代码改动OK的话,也可以call我下,我可以pull req,个人非常乐意为canal的发展做点贡献。

我改造后的canal.parse-1.1.7.jar版本已在我公司的生产环境运行测试,一切正常。以下为改动、优化内容概述: 1)阿里云RDS binlog消费的触发点扩充,@see handleMysqlParserException,否则我公司的场景无法触发Rds的binlog消费逻辑(即处理 Could not find first log file name in binary log index file异常); 2)RDS下载逻辑的部分代码优化,取消一些不常用变量;增加maxWait超时机制;将消费位点中的journalName作为精确下载的开始(之前是直接拿OSS接口返回中的第一个);消费位点配置简化,仅依赖journalName + position即可,不再强依赖时间戳,方便快速配置回溯; BinlogDownloadQueue#download逻辑收口,保证单一线程执行,其他线程通过Wait + File接口查询是否已下载好; 3) 阿里云RDS的解析退出机制完善; 最后一个文件处理完成后,即返回,不再通过queue.waitNextFile等待; 4)增加一些注释,输出更多日志,方便开发者查看问题或锚点;增加更多定向destination日志输出,方便查看给定destination的全链路状况;

简单的使用文档: https://github.com/HelloLyfing/canal/wiki/canal.parse%E2%80%901.1.7.jar

另外官方的使用文档也可以适当更新,比如: canal.aliyun.accessKey的配置貌似不可以(注意key的大小写填错了不行),应该是: canal.aliyun.accesskey / canal.aliyun.secretkey

大佬 ,替换了 canal.parse-1.1.7.jar 文件后,重启canal ,设置位点方式从oss拉取 ,也没有成功

新日志文件报下面这个错误
[fixed timestamp] can't found begin/commit position before with fixed position mysql-bin.001406:91742908

canal.aliyun.accesskey / canal.aliyun.secretkey 也设置了 ,但貌似没有去 oss 拉取 ,oss 上是有这个文件的

@HelloLyfing
Copy link

HelloLyfing commented Oct 31, 2024

@callmedba
注意看,我上面给出的简单的使用文档里,有给出消费位点的模板信息,需要设置

  • 1)canal.aliyun.accesskeycanal.aliyun.secretkeycanal.instance.rds.instanceId;
  • 2)消费位点,设置三项: journalName + position + timestamp即可;timestamp最好是OSS上的Binlog开始前的一个时间,比如提前1小时;

以下为zk存储的消费位点模板

{"@type":"com.alibaba.otter.canal.protocol.position.LogPosition","identity":{"slaveId":-1,"sourceAddress":{"address":"rm-bp1v****ab60.mysql.rds.aliyuncs.com","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.003515","position":4,"timestamp":1234567890000}}

另外,在调试阶段可以考虑修改conf/logback.xml中的日志等级为INFO,可以看到更多日志信息

# for free to join this conversation on GitHub. Already have an account? # to comment
Projects
None yet
Development

No branches or pull requests