Skip to content

2.6_HBase Writer

elevenqq edited this page Sep 30, 2018 · 3 revisions

一、使用场景

HBase-Writer插件实现了关系型数据库Rdbms向HBase数据同步的功能,主要使用场景为增量数据实时归档,解决DBA方归档数据表加字段困难的问题。

二、设计原理

HBase-Writer插件自定义的RdbEventRecordHandler继承自com.ucar.datalink.worker.api.handle.AbstractHandler,使用了系统提供的默认流程(详见深入领域),并根据需要自行扩展了一些功能,执行最终的HBase写入操作。具体的数据写入流程如下图所示:

HBase-Writer

  • 【Records分组】
    > 首先将Records按目标端库名、表名进行分组,不同分组并发写入。
    > 然后将每张表的Records根据mapping-id再次进行聚合,因为多表合一的场景下,会存在同一个TableGroup下mapping-id不相同的情况。
  • 【HTable加载】
    > 初始化org.apache.hadoop.conf.Configuration
    要向某个HBase写数据,首先要通过配置HConstants中的参数,来建立与该HBase的zk连接,包括zk的IP地址、端口、父节点。
    > 创建HTable
    由configuration和tableName创建相应的HTable,客户端通过HTable对象与HBase服务端进行CRUD操作(增删改查)。
  • 【Records转换】
    > 将batchRecords中的数据操作转换为HBase相应的数据操作,准备向HBase执行写入。
    > 转换原则:
    1)Rdb的Insert和Update操作转换为HBase的put操作,或者说,HBase的Put操作即相当于关系型数据库的replace操作;
    2)Rdb的Delete操作通过Mapping的扩展参数控制,默认不同步,需要同步时进行特殊配置。当设置Mapping参数syncDelete = true时,将Rdb的Delete转换为HBase的delete操作
    > 转换模型:
    1)HBase表的列族:默认为default;
    2)事件类型为Insert和Update时,Rdb的主键作为row key(不支持联合主键),主键和非主键均作为非row key同步,即主键在非row key中再存一遍。事件类型为delete时,只将Rdb的主键作为row key,在HBase端执行删除操作,非主键列无需同步
  • 【HTable写入】
    > 单表支持按BatchSize拆分写入。即将每张表按mapping-id聚合后的Records根据batchSize进行拆分,分批进行写入。每批次的batchRecords转换为对应的row key和columns之后,通过调用HTable的Put/Delete写入HBase,最后调用flushCommits显示发起请求提交。

三、功能介绍

  • 【并发写入】
    > 一批次的RdbEventRecord支持按表分组并发写入。HBase-Writer开启了多个HTable写线程,每个写线程负责一个HTable对象的flush操作,提高写数据的吞吐量。
    注:HTable对象对于客户端读写数据来说不是线程安全的,因此这里的多线程,是为目标端不同的表开启,每个线程单独创建复用自己的HTable对象,不同线程间不会共享HTable对象使用,能够保证数据的一致性。
  • 【批量写入】
    > HTable.put(Put)可以将一个指定的row key记录写入HBase,为了提高性能,HBase-Writer使用HTable.put(List<Put>)可以将指定的row key列表,批量写入多行记录。批量执行的好处是,只需要一次网络I/O开销,这对于对数据实时性要求高,网络传输RTT高的情景下来明显地提升性能。
    HBase-Writer初始化HTable时,设置了HTable.setAutoFlush(false),将HTable的自动flush关闭,可以启动Write Buffer模式,批量写入数据到HBase。否则的话,默认情况下auto flush是开启的,每有一个put就要向服务端发一个请求,执行一次更新。
    autoFlush = false这种模式下,向服务端提交的时机分为显式和隐式两种情况:
    1)  显式提交:客户端调用flushCommits()进行提交;
    2)  隐式提交:当Put填满Write Buffer时,客户端才实际向HBase服务端发起写请求,自动执行提交;或者调用了HTable的close()方法时无条件执行提交操作。
    HBase-Writer中,在HTable put每批次数据时进行一次显示提交,防止其运行出现问题时,会导致在Write Buffer中未提交的数据丢失。
    注:默认配置下,Write Buffer大小为2MB,可以根据应用实际情况,通过HTable.setWriteBufferSize(writeBufferSize)设置HTable客户端的写buffer大小。
  • 【缓存资源】
    > 对于同一个HBase,可能会有多次写入,但是不能每次访问都创建一个Configuration,为了提高性能,HBase-Writer使用CacheBuilder为每个HBase缓存一份配置,实现HTable对象之间共享Configuration对象,从而减少后续的网络传输开销,加快查找过程速度。
    > 对于HBase中的同一张表HTable,可能会有多次写入,所以HBase-Writer同样使用CacheBuilder进行缓存,可以规避HTable对象的重复创建开销。
  • 【删除事件同步】
    > 对于删除事件,由于HBase-Writer的主要使用场景为数据归档,所以默认不同步。其他场景可以通过配置Mapping映射的扩展参数,设置同步删除事件。

四、插件参数说明

在继承Writer插件通用参数基类(PluginWriterParameter,详见深入Task)的基础上,HBaseWriterParameter没有扩展自己的参数类,只是实现了插件参数类PluginParameter的基本方法。

五、Mapping参数说明

在Mapping通用配置参数基础上,HBase-Writer插件还定义了自己的参数类HBaseSyncParameter,用来设置是否同步Delete事件。相关Mapping配置参数如下:

Mapping参数

参数描述

默认值

备注

targetMediaSourceId

目标端数据源的id

目标数据源,这里指要同步到的HBase集群

targetMediaName

HBase中表的名字

支持目标端有表别名,用于源端和目标端表名不一致的情况

targetMediaNamespace

目标端数据源的namespace

目标端数据源schema,对HBase来说默认为空

ColumnMappingMode

列映射模式

NONE

支持列名黑白名单与列别名:

NONE,//所有列均同步到目标端

INCLUDE,//只同步白名单中的列,可以设置列别名

EXCLUDE;//黑名单中的列不同步

writePriority

同步优先级

5

数值越小优先级越高

interceptorId

拦截器id

拦截器可以对Records进行特殊处理,满足少数特定功能的需求

skipIds

主键黑名单

可以通过指定主键id来过滤源端的某些异常Records

valid

是否有效

同步映射有效时,才进行同步

syncDelete

HBase-Writer是否同步Delete事件 false

需要向HBase同步Delete事件时,在映射中配置参数,将syncDelete置为true:

{"@type":"com.ucar.datalink.domain.plugin.writer.hbase.HBaseSyncParameter","syncDelete":true}

六、版本说明

关联技术 稳定版本 待测版本
hbase 0.98.16.1-hadoop2

七、注意事项

  • 【HBase shell 常用命令】

    名称

    命令表达式

    创建表

    create '表名称', '列名称1','列名称2','列名称N'

    添加记录      

    put '表名称', '行rowkey值', '列名称:', '值'

    查看记录

    get '表名称', '行rowkey值'

    查看表中的记录总数

    count  '表名称'

    删除记录

    delete  '表名' ,'行rowkey值' , '列名称'

    删除一张表

    先要屏蔽该表,才能对该表进行删除,第一步 disable '表名称' 第二步  drop '表名称'

    查看所有记录

    scan "表名称"  

    查看某个表某个列中所有数据

    scan "表名称" , ['列名称:']

    更新记录 

    就是重写一遍进行覆盖

    清空表中所有记录 truncate '表名称'


  • 【HBase-Writer同步功能测试】
    > 配置同步
    在DataLink测试环境部署代码,新建ucar_datalink_2_hbase的任务,配置映射表t_dl_test_source同步到表tsqq。
    > HBase建表
    登陆测试环境的hbase-namenode-ip
    进入hbase shell命令:sh hbase shell
    hbase(main):001:0> create ‘tsqq','default'
    > 源端产生Binlog测试
    1)INSERT
    在源端进行插入一条记录,INSERT INTO `t_dl_test_source` (`name`, `create_time`, `modify_time`, `yy`, `cc`) VALUES ('gggg', NOW(), NOW(),222,111);
    查看目标端:
    hbase(main):006:0> scan 'tsqq'
    ROW COLUMN+CELL
    36 column=default:cc, timestamp=1512534970035, value=1
    36 column=default:create_time, timestamp=1512534970035, value=2017-12-06 11:43:13
    36 column=default:id, timestamp=1512534970035, value=36
    36 column=default:modify_time, timestamp=1512534970035, value=2017-12-06 11:43:13
    36 column=default:name, timestamp=1512534970035, value=gggg
    36 column=default:yy, timestamp=1512534970035, value=222
    1 row(s) in 0.0710 seconds
    2)UPDATE
    在源端更新刚插入的这条记录:UPDATE `t_dl_test_source` SET name = 'hhhh' WHERE id = 36;
    查看目标端:
    hbase(main):007:0> scan 'tsqq'
    ROW COLUMN+CELL 
    36 column=default:cc, timestamp=1512534970035, value=1 
    36 column=default:create_time, timestamp=1512534970035, value=2017-12-06 11:43:13 
    36 column=default:id, timestamp=1512534970035, value=36 
    36 column=default:modify_time, timestamp=1512534970035, value=2017-12-06 11:43:13 
    36 column=default:name, timestamp=1512534970035, value=hhhh 
    36 column=default:yy, timestamp=1512534970035, value=222 
    1 row(s) in 0.0710 seconds
    3)DELETE
    在源端删除刚插入的这条记录:DELETE FROM `t_dl_test_source` WHERE id = 36;
    查看目标端,记录仍存在:
    hbase(main):008:0> scan 'tsqq'
    ROW COLUMN+CELL 
    36 column=default:cc, timestamp=1512534970035, value=1 
    36 column=default:create_time, timestamp=1512534970035, value=2017-12-06 11:43:13 
    36 column=default:id, timestamp=1512534970035, value=36 
    36 column=default:modify_time, timestamp=1512534970035, value=2017-12-06 11:43:13 
    36 column=default:name, timestamp=1512534970035, value=hhhh 
    36 column=default:yy, timestamp=1512534970035, value=222 
    1 row(s) in 0.0710 seconds
    在映射中配置参数,将syncDelete置为true:{"@type":"com.ucar.datalink.domain.plugin.writer.hbase.HBaseSyncParameter","syncDelete":true}
    在源端再次删除这条记录:DELETE FROM `t_dl_test_source` WHERE id = 36;
    查看目标端,相应的删除了rowkey为36的所有记录:
    hbase(main):009:0> scan 'tsqq'
    ROW COLUMN+CELL
    0 row(s) in 0.0100 seconds

八、FAQ