同步任务基础配置

scheme.extract配置项 类型 说明
type hermesjdbcservice(枚举字符串) hermes:增量。jdbc:全量
is_log_change(is-log-change) boolean 控制变化数据是否打印到日志(type属于hermes才能生效)
pre_piplines(pre-pipelines) List<piplines配置项> 用于处理提取的数据type属于hermes才能生效
piplines(pipelines) List<piplines配置项> 用于处理提取的数据
retry_stage(retry-stage) INCREASESAME(枚举字符串) 重试策略 (type属于hermes才能生效)
retry_interval_milliseconds(retry-interval-milliseconds) long 重试间隔(毫秒)

注意:scheme.extract.type=jdbc时,表示当前对应的是一个全量同步任务(前面已经讲过一个scheme.extract对应一个同步任务),每一种同步任务都有对应的扩展配置,这些配置仅仅只在对应的同步任务类型下生效。下面讲讲全量同步任务的扩展配置

全量同步extract扩展配置

extract.type=jdbc的扩展配置项 类型 说明
command_datasource(command-datasource) rabbitmq@{datasourceName}(Datasource引用) 必须引用rabbitmq数据源
command_queue(command-queue) String rabbitmq队列名称(用于监听全量任务触发指令)
clear_queue(clear-queue) List<String> 定义提取数据之前需要清除数据的队列
datasource jdbc@{datasourceName}(Datasource引用) 提取数据的源数据库,必须引用jdbc数据源
cursor_mode(cursor-mode) boolean 执行sql语句读取数据时是否使用Jdbc游标读取数据
cursor_per_count(cursor-per-count) int 游标模式读取数据时(cursor_mode)每次加载的数据量(fetchSize)
repo sql@{sqlName}(Sql语句引用) 同步任务提取数据的sql语句配置
where Map reposql语句的参数
ignore_retry(ignore-retry) boolean 全量任务执行失败时是否忽略重试

注意: 全量任务只支持 piplines 不支持 prePiplines

全量同步任务的触发方式

全量同步任务是通过消费消息队列数据来触发的,所以如果需要触发全量同步任务,就要往全量同步任务订阅的消息队列里发送一条消息,Data-Sync任务调度器接受到这条消息就会按照消息的配置启动同步任务。消息的格式如下:

{
  "job_name": "String | {同步方案名称}",
  "clean": "boolean | {执行全量任务之前是否需要清空指定的队列数据}",
  "ext_where": "Map<String, Object> | {执行全量任务SQL的参数,会与配置参数做一个合并,这里的优先级高于配置where配置项}"
}

那么怎么知道全量任务订阅的是哪个消息队列呢?其实是有全量任务自己指定的,通过以下配置项指定全量任务订阅的消息队列:

  • command_datasource: 确定消息队列服务器信息,rabbitmq数据源

  • command_queue: 确定队列名称

  • clear_queue: 定义执行全量任务执行之前需要清空的消息队列名称

你可能想问为什么在任务执行前需要清空消息队列呢?因为一个全量任务往往有与之对应的增量任务,我们执行全量任务的场景可能是因为增量任务的数据太多,处理不过来导致线上几个业务系统的数据不一致,此时强制执行一个全量任务就能把数据刷成一致的,但是如果不停止增量任务的话,增量任务的历史数据会把业务系统的数据有刷回来,导致数据又不一致,而且此时增量任务的历史数据已经没有意义了。所以我们执行全量任务之前要把增量任务的数据清空。

注意: 一个同步方案最多只能有一个全量任务。 通过全量任务触发的消息格式就能看出来,job_name指的是“同步方案名称”,由于粒度没有细到指定对应的同步任务名称(在配置中同步任务是没有名称的,也没法指定),所以为了简单起见,规定一个同步方案最多只能有一个全量任务!

数据提取时SQL参数问题

全量任务提取数据的SQL语句使用repowhere配置项定义,repo用于指定引用的SQL资源,where用于定义sql脚本“参数”。注意这里的“参数”跟JBDC程序中的SQL参数完全是两回事。这里的“参数”本质上就是字符串替换!具体请看以下示例:

...
  sql: select * from tb_goods_ds where erp_no='BMC015001C' and store_no=#{where:store_no}

...
  repo: sql@xxx
  where:
    store_no: 1119828711216971778

上面示例最后执行的SQL语句为 select * from tb_goods_ds where erp_no='BMC015001C' and store_no=1119828711216971778
注意如果这里的store_no是一个字符串类型,我们需要使用以下的方式定义变量值:

...
  repo: sql@xxx
  where:
    store_no: "'1119828711216971778'"

这样最后执行的SQL语句为select * from tb_goods_ds where erp_no='BMC015001C' and store_no='1119828711216971778'

关于变量定义格式,在sql语句中使用#{where:变量名}的方式引用变量,在同步方案配置中使用如下方式定义变量名和变量值:

...
  where:
    {paramName_1}: {paramValue_1}
    {paramName_2}: {paramValue_2}
    ...

注意: 在SQL变量使用中需要注意以下几点

  • 变量名的定义与使用都是大小写敏感的
  • 当变量值为字符串时,需要使用paramName: "'xxxx'"的方式
  • 这里的SQL“参数”本质上就是字符串替换,并不是真正的SQL传参,所以一些特殊的数据类型如时间类型,需要装换成字符串或者使用SQL函数处理!

SQL查询结果数据量大问题

当一个全量SQL查询结果数据量很大时(10W以上)建议使用cursor_modecursor_per_count两个配置,有以下好处:

  • 防止数据一次重数据库全部加载到内存,导致服务端内存溢出 OutOfMemoryError
  • 可以降低第一次查询的响应时间,防止在获取大量数据时任务假死

TODO: 补充这两个配置带来的坏处(事务问题,数据一致性 完整性)

piplines使用Demo

前面已经说过piplines的目的是在在数据被extract(提取)之后load(更新)之前,做一些额外的加工处理。目前的piplines处理只支持使用JavaScript脚本来处理。JavaScript脚本的格式如下:

define(function () {
    return {
        process: function (data, config) {
            // 参数 data: 处理之前的数据
            // 参数 config: 当前piplines配置
            // 返回值: 处理之后的数据,必须是数组类型
            // 自定义代码逻辑...
            return data;
        }
    }
});

piplines的使用示例如下:

# 全量任务配置
scheme:
  PipelineTest-01:
    enable: true
    extract:
      - type: jdbc
        command_datasource: rabbitmq@b2b_third_prod
        command_queue: test-total-quantity-command-queue
        datasource: jdbc@b2b_third_prod
        cursor_mode: true
        repo: sql@PipelineTest-01
        piplines:
          - type: script
            file_url: PipelineTest/ScriptPipeline.js
    load:
    ......
// ScriptPipeline.js 脚本内容
define(function () {
    return {
        process: function (data, config) {
            // 读取当前piplines配置
            console.log("config --> {}", config);
            // 读取原始数据
            for (var i = 0; i < data.length; i++) {
                console.log("data[{}] --> {}", i, data[i]);
                console.log("data[{}] --> store_no=[{}]", i, data[i].store_no);
            }
            // 更新数据
            data[0].store_no = '123456';
            // 调用Java代码
            var SecurityUtil = Java.type("com.yvan.datax.node.toolkit.SecurityUtil");
            console.log("encryptPassword --> {}", SecurityUtil.encryptPassword('password', 'jzt2'));
            // 返回处理之后的数据必须是数组类型
            return data;
        }
    }
});

piplines的执行日志如下:

2020-03-17  13:27:38.947 [pool-20-thread-2] INFO  /.js - config --> ScriptPipelineConfig(fileUrl=PipelineTest/ScriptPipeline.js, code=null)
2020-03-17  13:27:38.971 [pool-20-thread-2] INFO  /.js - data[0.0] --> {ds_id=1169169393590603788, inner_code=SPH00000079, erp_no=BMC015001C, store_no=114, raw_approval_number=国药准字H45021137, approval_number=国药准字H45021137, raw_prod_name=硝酸异山梨酯缓释片, prod_name=硝酸异山梨酯缓释片, raw_prod_specification=20mg*20s, prod_specification=400, package_unit=盒, raw_manufacture=广西万寿堂药业有限公司, manufacture=广西万寿堂, prod_place=, smlpackage_barcode=, prodscopeno_id=C, prodscopeno_name=经营简码名称111111111, medical_category=处方药, is_unpick=1, midpackage_quantity=10.00000, bigpackage_quantity=400.00000, raw_retail_price=13.70000, retail_price=13.70000, raw_cost_price=0.00000, cost_price=0.00000, storage_category_name=处方药, storage_number=0.00, near_validtime=null, far_validtime=null, batch_number=null, near_batch_number=null, production_date=null, raw_sale_price=null, sale_price=null, price_unit=null, score_flag=0, create_at=2019-09-04 16:44:37.0, update_at=2019-09-04 18:12:24.0, raw_prod_place=null}
2020-03-17  13:27:38.989 [pool-20-thread-2] INFO  /.js - data[0.0] --> store_no=[114]
2020-03-17  13:27:38.990 [pool-20-thread-2] INFO  /.js - data[1.0] --> {ds_id=1169173837954813964, inner_code=SPH00000079, erp_no=BMC015001C, store_no=1119828711216971778, raw_approval_number=国药准字H45021137, approval_number=国药准字H45021137, raw_prod_name=硝酸异山梨酯缓释片, prod_name=硝酸异山梨酯缓释片, raw_prod_specification=20mg*20s, prod_specification=400, package_unit=盒, raw_manufacture=广西万寿堂药业有限公司, manufacture=广西万寿堂, prod_place=, smlpackage_barcode=, prodscopeno_id=C, prodscopeno_name=经营简码名称111111111, medical_category=处方药, is_unpick=1, midpackage_quantity=10.00000, bigpackage_quantity=400.00000, raw_retail_price=13.70000, retail_price=13.70000, raw_cost_price=0.00000, cost_price=0.00000, storage_category_name=处方药, storage_number=null, near_validtime=null, far_validtime=null, batch_number=null, near_batch_number=null, production_date=null, raw_sale_price=null, sale_price=null, price_unit=null, score_flag=0, create_at=2019-09-04 17:02:17.0, update_at=2019-09-04 17:02:17.0, raw_prod_place=null}
2020-03-17  13:27:38.990 [pool-20-thread-2] INFO  /.js - data[1.0] --> store_no=[1119828711216971778]
2020-03-17  13:27:39.023 [pool-20-thread-2] INFO  /.js - encryptPassword --> e27b83360df5015e717dbe35a72e9322

警告: 虽然使用piplines可以编写JavaScript代码来处理数据,灵活性非常高。但是由于这里的JavaScript代码难以调试,写法有一定的限制和规则。所以绝大多数情况下不建议使用。

重试以及其它问题

TODO: 待补充

全量任务配置Demo

scheme:
  全量任务配置Demo:
    enable: true
    extract:
      - type: jdbc
        command_datasource: rabbitmq@b2b_third_prod
        command_queue: test-total-quantity-command-queue
        clear_queue: ['xxx', 'xxx' , 'xxx']
        datasource: jdbc@b2b_third_prod
        cursor_mode: true
        repo: sql@Demo-01
        where:
          store_no: "'1119828711216971778'"
          status: 1
    load:
    ......
文档更新时间: 2020-03-18 12:21   作者:lizw