同步任务基础配置

scheme.extract配置项 类型 说明
type hermesjdbcservice(枚举字符串) hermes:增量。jdbc:全量
is_log_change(is-log-change) boolean 控制变化数据是否打印到日志(type属于hermes才能生效)
pre_piplines(pre-pipelines) List<piplines配置项> 用于处理提取的数据type属于hermes才能生效
piplines(pipelines) List<piplines配置项> 用于处理提取的数据
retry_stage(retry-stage) INCREASESAME(枚举字符串) 重试策略 (type属于hermes才能生效)
retry_interval_milliseconds(retry-interval-milliseconds) long 重试间隔(毫秒)

注意:scheme.extract.type=hermes时,表示当前对应的是一个增量同步任务(前面已经讲过一个scheme.extract对应一个同步任务),每一种同步任务都有对应的扩展配置,这些配置仅仅只在对应的同步任务类型下生效。下面讲讲增量同步任务的扩展配置

增量同步extract扩展配置

extract.type=hermes的扩展配置项 类型 说明
datasource hermes@{datasourceName}或者rabbitmq@{datasourceName} Mysql增量数据数据源
table String 增量任务所对应的数据库 table
queue_prefix(queue-prefix) String 监听的增量数据对应的队列前缀 (队列名称:{queue_prefix}{table})
care List<String> 增量任务所关心的 table 字段
pre_filter(pre-filter) Map<String, String>Map<String, Map<String, String>> 用于过滤提取的数据,按数据字段值匹配规则过滤
operation List<String> 支持 DELETEUPDATEINSERT 过滤增量数据的hermes_dml_operation类型,只处理特定类型的增量数据
ignore_delete(ignore-delete) boolean 是否忽略删除的增量数据
is_upsert_with_before(is-upsert-with-before) boolean 已使用,但不知道作用
transform transform配置项 用于转换提取的数据(只处理非删除类型的增量数据)
delete_transform(delete-transform) transform配置项 用于转换提取的数据(只处理删除类型的增量数据)

提示: pre_piplines、transform、piplines处理数据顺序: pre_piplines -> transform -> piplines

transform配置项如下:

transform配置项 类型 说明
datasource jdbc@{datasourceName}(Datasource引用) 做数据转换使用的数据源,引用jdbc数据源
repo sql@{sqlName}(Sql语句引用) 数据转换的sql语句配置
where Map reposql语句的参数

增量同步数据源

增量数据同步目前只支持Mysql、Oracle数据库,Mysql增量数据同步原理是通过canal实时订阅解析MySQL数据库的binlog数据,把得到的增量数据保存到RabbitMQ消息队列,所以增量数据源本质上就是一个RabbitMQ数据源。Oracle的增量数据也类似这样处理。

在增量数据源(RabbitMQ消息队列)中增量数据是按照数据库表隔离开的,一个消息队列数据对应一张表的增量数据,当多个同步任务需要使用同一张表的增量数据时也会出现多个不同的消息队列都对应同一张表的增量数据。以下配置用来指定当前增量同步任务订阅的是哪张表的增量数据:

  • table:指定增量任务所对应的数据库table,使用{schema}.{tableName}格式,例如:b2b_third.tb_package_main

  • queue_prefix:设置数据库table的增量数据保存到消息队列的队列名称前缀,最终的队列名称:{queue_prefix}{tableName}

提示: 一个增量任务只能订阅一张表的增量数据,如果一个业务需要同步几张表的增量数据,那么需要配置多个增量同步任务处理!

增量数据类型

Mysql、Oracle增量数据分为几种:新增的增量数据、删除的增量数据、更新的增量数据,分别对应hermes_dml_operation为:INSERTDELETEUPDATE

  • INSERT:表示新增数据库表数据产生的增量数据(执行insert into语句)

  • DELETE:表示删除数据库表数据产生的增量数据(执行delete from语句)

  • UPDATE:表示更新数据库表数据产生的增量数据(执行update from语句)

增量数据过滤

有时对于增量数据我们只关心其中一部分数据,如我们只关心一张表的某一个字段的变化而忽略其它字段的变化,为了提高同步效率我们支持对增量数据过滤,只处理业务上关心的增量数据,对于不关心的数据直接忽略不做任何处理。相关配置如下:

  • care:指定增量任务所关心的 table 字段,不指定表示关心所有字段

  • pre_filter:用于过滤提取的数据,按数据字段值匹配规则过滤,只处理匹配上的数据

  • operation:过滤增量数据的hermes_dml_operation类型,只处理特定类型的增量数据。支持 DELETEUPDATEINSERT

关于pre_filter配置格式,字段值匹配规则支持:gt(大于)、gte(大于等于)、lt(小于)、lte(小于等于)、eq(等于)。配置格式如下:

extract:
  - type: hermes
    ......
    pre_filter:
      {filedName_1}:
        {gt|gte|lt|lte|eq}: {filedValue}
      {filedName_2}:
        {gt|gte|lt|lte|eq}: {filedValue}
    ......

当使用eq(等于)比较时,还支持简写格式:

extract:
  - type: hermes
    ......
    pre_filter:
      {filedName_1}: {filedValue}
      {filedName_2}:
        {gt|gte|lt|lte|eq}: {filedValue}
    ......

# 该简写格式等效于:
extract:
  - type: hermes
    ......
    pre_filter:
      {filedName_1}:
        {gt|gte|lt|lte|eq}:
          eq: {filedValue}
      {filedName_2}:
        {gt|gte|lt|lte|eq}: {filedValue}
    ......

数据转换(transform)

数据转换的目的是为了在执行数据更新逻辑(load)之前,对数据进行处理。还可以对增量数据做一个回查,以确保数据的强一致性!transform分为两个配置,transformdelete_transform它们处理的增量数据类型不同,他们的配置是一致的

  • transform: 只处理hermes_dml_operation类型等于UPDATEINSERT的增量数据

  • delete_transform: 只处理hermes_dml_operation类型等于DELETE的增量数据

transform使用Demo

# 省略若干配置......
TransformTest-02:
  type: read
  driect: mysql
  sql: 'select * from tb_package_main where id=#{where:id} #{where}'

# 省略若干配置......

extract:
  - type: hermes
    # 省略若干配置......
    transform:
      datasource: jdbc@b2b_third_prod
      repo: sql@TransformTest-02
      where:
        id: 1107983649060958209
        '#{where}': " and id in #{ids:id} "

注意: transform.where的配置中有个特殊的配置参数#{ids:{table_primary_key_name}}用于回查数据库数据以确保数据的强一致性。

上面的配置可用等同于以下配置:

# 省略若干配置......
TransformTest-02:
  type: read
  driect: mysql
  sql: 'select * from tb_package_main where id=1107983649060958209 #{where}'

# 省略若干配置......

extract:
  - type: hermes
    # 省略若干配置......
    transform:
      datasource: jdbc@b2b_third_prod
      repo: sql@TransformTest-02
      where: ' and id in #{ids:id} '

增量任务配置Demo

scheme:
  增量任务配置Demo:
    enable: true
    extract:
      - type: hermes
        datasource: rabbitmq@b2b_third_prod
        table: b2b_third.tb_package_main
        queue_prefix: lizw-test-
        care:
           - syn_logistic_state
           - operator
         pre_filter:
           package_id:
             gte: 100
         operation:
           - DELETE
           - UPDATE
        transform:
          datasource: jdbc@b2b_third_prod
          repo: sql@TransformTest-02
          where:
            id: 1107983649060958209
            '#{where}': " and id in #{ids:id} "
    load:
    # 省略其它配置....
文档更新时间: 2020-03-18 16:55   作者:lizw