数据更新基础配置

scheme.load配置项 类型 说明
type jdbckafkaservicerabbitmq 定义 目标数据源类型
batch_size(batch-size) int load批量操作的数据量, type=servicerabbitmq时才有效
batch_skip_error(batch-skip-error) boolean 批量load时是否跳过错误, type=service时才有效

注意:scheme.load.type=jdbc时,表示数据同步的目标数据库是关系型数据库MySQL、Oracle等,每一种目标数据源都有对应的扩展配置,下面讲讲数据同步到JDBC数据源的扩展配置

数据同步到JDBC扩展配置

load.type=jdbc的扩展配置项 类型 说明
datasource jdbc@{datasourceName}(Datasource引用) 目标数据库(Jdbc数据源)
repo sql@{sqlName}(Sql语句引用) 更新数据的SQL配置
delete_ext_action(delete-ext-action) Map<String, Object> 定义处理删除类型的数据(当scheme.extract.type=hermes时生效)

同步新增或更新的数据

当需要把一条新增的数据同步到JDBC数据源时,就是说此时要在目标数据源执行一条insert intoSQL语句来新增数据,但是看了上面的配置,你发现貌似不支持传递SQL参数,那么如何指定更新或新增的SQL语句的参数数据呢?答案是:更新或新增的SQL语句需要按照固定的格式编写,如下:

replace into t_biz(f1, f2, f3) #{values:f1, f2, f3}
-- 支持特殊值@IdWorker(生成唯一的IDLong类型)
replace into t_biz(pk,f1, f2, f3) #{values:@IdWorker, f1, f2, f3}

具体的使用Demo如下:

repo:
  sql:
    LoadToJdbcTest-01:
      type: read
      driect: mysql
      sql: >
        replace into
          tb_package_main(id,package_id,store_no,order_code,erp_order_code,distribution_mode,out_stock_id,out_comment,out_time,transport_code,courier_company,operator)
        values
          #{values:@IdWorker,package_id,store_no,order_code,erp_order_code,distribution_mode,out_stock_id,out_comment,out_time,transport_code,courier_company,'测试111'}

scheme:
  LoadToJdbcTest-01:
    enable: true
    extract:
      - type: hermes
        datasource: rabbitmq@b2b_third_prod
        table: b2b_third.tb_package_main
        queue_prefix: lizw-test-
        operation: ['UPDATE']
    load:
      - type: jdbc
        datasource: jdbc@b2b_third_prod
        repo: sql@LoadToJdbcTest-01

执行SQL语句相关日志:

2020-03-18  15:29:40.146 [pool-20-thread-2] INFO  com.jzt.data.sync.datax.job.support.JobLogger - [scheme@LoadToJdbcTest-01[0]-437676076521488384] - [JdbcLoad] 批量更新数据 Sql=[replace into  tb_package_main(id,package_id,store_no,order_code,erp_order_code,distribution_mode,out_stock_id,out_comment,out_time,transport_code,courier_company,operator)values   (:@IdWorker,:package_id,:store_no,:order_code,:erp_order_code,:distribution_mode,:out_stock_id,:out_comment,:out_time,:transport_code,:courier_company,:constantField0) ]
2020-03-18  15:29:40.152 [pool-20-thread-2] DEBUG com.jzt.data.sync.datax.job.support.JobLogger - [scheme@LoadToJdbcTest-01[0]-437676076521488384] - [JdbcLoad] 批量更新数据 Sql Values=[[{"@IdWorker":"1240178540630794242","package_id":"100","store_no":"1089704947936186369","order_code":"hubei0XS00000035","erp_order_code":"1234","distribution_mode":"3","out_stock_id":null,"out_comment":null,"out_time":"2019-06-25 11:35:46.000","transport_code":null,"courier_company":"","constantField0":"测试111"},{"@IdWorker":"1240178540630794243","package_id":"100","store_no":"1089704947936186369","order_code":"hubei0XS00000035","erp_order_code":"1234","distribution_mode":"3","out_stock_id":null,"out_comment":null,"out_time":"2019-06-25 11:35:46.000","transport_code":null,"courier_company":"","constantField0":"测试111"},{"@IdWorker":"1240178540630794244","package_id":"101","store_no":"1089704947936186369","order_code":"hubei0XS00000037","erp_order_code":"","distribution_mode":"3","out_stock_id":null,"out_comment":null,"out_time":"2019-06-25 11:35:46.000","transport_code":null,"courier_company":"","constantField0":"测试111"},{"@IdWorker":"1240178540630794245","package_id":"101","store_no":"1089704947936186369","order_code":"hubei0XS00000037","erp_order_code":"","distribution_mode":"3","out_stock_id":null,"out_comment":null,"out_time":"2019-06-25 11:35:46.000","transport_code":null,"courier_company":"","constantField0":"测试111"}]]
2020-03-18  15:29:40.157 [pool-20-thread-2] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing SQL batch update [replace into  tb_package_main(id,package_id,store_no,order_code,erp_order_code,distribution_mode,out_stock_id,out_comment,out_time,transport_code,courier_company,operator)values   (?,?,?,?,?,?,?,?,?,?,?,?) ]
2020-03-18  15:29:40.158 [pool-20-thread-2] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL statement [replace into  tb_package_main(id,package_id,store_no,order_code,erp_order_code,distribution_mode,out_stock_id,out_comment,out_time,transport_code,courier_company,operator)values   (?,?,?,?,?,?,?,?,?,?,?,?) ]
2020-03-18  15:29:40.162 [pool-20-thread-2] DEBUG org.springframework.jdbc.datasource.DataSourceUtils - Fetching JDBC Connection from DataSource

同步增量删除的数据

在“增量同步任务配置”中已经讲了增量数据分为NSERTDELETEUPDATE三种。当同步的数据是DELETE类型时,我们可能需要把目标数据源的数据也要删除,此时就需要使用delete_ext_action配置了,delete_ext_action配置格式如下:

delete_ext_action:
  {tableName}:
    repo: String `sql@{sqlName}`(Sql语句引用)
    where: String (Sql Where条件)

具体的使用Demo如下:

# 使用 #{ids:data_key_name} 的方式传值
repo:
  sql:
    JdbcDeleteExtActionTest-01:
      sql: "update tb_package_main set operator='666' where id in #{ids:id}"

load:
  - type: jdbc
    datasource: jdbc@b2b_third_prod
    repo: sql@JdbcDeleteExtActionTest-02
    delete_ext_action:
      b2b_third.tb_package_main:
        repo: sql@JdbcDeleteExtActionTest-01

# =================================================================================
# 使用 #{where} 的方式传值
repo:
  sql:
    JdbcDeleteExtActionTest-01:
      sql: "update tb_package_main set operator='666' where store_no='abc' #{where}"

load:
  - type: jdbc
    datasource: jdbc@b2b_third_prod
    repo: sql@JdbcDeleteExtActionTest-02
    delete_ext_action:
      b2b_third.tb_package_main:
        repo: sql@JdbcDeleteExtActionTest-01
        where: ' or id=123456 '

注意:Demo中的两种传值方式只能二选一,不能混用

同步到JDBC配置Demo

scheme:
  同步到JDBC配置Demo:
    enable: true
    extract:
      - type: hermes
        datasource: rabbitmq@b2b_third_prod
        table: b2b_third.tb_package_main
        queue_prefix: lizw-test-
        operation: ['UPDATE']
    load:
      - type: jdbc
        datasource: jdbc@b2b_third_prod
        repo: sql@LoadToJdbcTest-01
文档更新时间: 2020-03-18 16:53   作者:lizw