数据更新基础配置

scheme.load配置项 类型 说明
type jdbckafkaservicerabbitmq 定义 目标数据源类型
batch_size(batch-size) int load批量操作的数据量, type=servicerabbitmq时才有效
batch_skip_error(batch-skip-error) boolean 批量load时是否跳过错误, type=service时才有效

注意:scheme.load.type=kafka时,表示数据同步的目标数据库是kafka,数据同步到kafka之后后续可以使用一些工具再把同步到kafka的数据同步到Elasticsearch。每一种目标数据源都有对应的扩展配置,下面讲讲数据同步到kafka数据源的扩展配置

数据同步到kafka扩展配置

load.type=kafka的扩展配置项 类型 说明
datasource kafka@{datasourceName}(Datasource引用) 目标数据库(Kafka数据源)
topic String 消息发送到Kafka对应的主题
es_index(es-index) String 设置elasticsearch索引(Index)
es_type(es-type) String 设置elasticsearch索引(Index)的类型(Type)
es_id(es-id) String elasticsearch数据id对应的字段名
es_parent(es-parent) String elasticsearch数据parent对应的字段名
es_delete_datasource
(es-delete-datasource)
service@{datasourceName}(Datasource引用) 目标数据源(一般使用elasticsearch,执行es_delete_query使用的数据源)
es_delete_query(es-delete-query) String 当数据更新时删除,执行elasticsearch_delete_by_query操作的body内容模板
es_delete_id(es-delete-id) Map<String, String> 定义处理删除类型的数据,用作删除elasticsearch中对应的数据
(当scheme.extract.type=hermes时生效)
es_delete_ext_action
(es-delete-ext-action)
Map<String, Object> 定义处理删除类型的数据(扩展esDeleteId配置)
(当scheme.extract.type=hermes时生效)

提示: 把数据同步到kafka的主要目的是为了最终把数据同步到Elasticsearch,这里Data-Sync只负责把数据同步到kafka,数据从kafka同步到Elasticsearch由其它工具实现。

同步新增或更新的数据

同步新增或更新数据比较简单,具体使用Demo如下:

scheme:
  LoadToKafkaTest-01:
    enable: true
    extract:
      - type: hermes
        datasource: rabbitmq@b2b_third_prod
        table: b2b_third.tb_package_main
        queue_prefix: lizw-test-
    load:
      - type: kafka
        datasource: kafka@b2b_third_prod
        topic: lzw-test
        es_index: "#{index:'lzw-index'}"
        es_type: tb_package_main
        es_id: id
        es_delete_datasource: service@b2b_third_prod_es
        es_delete_query: >
          {
            "query": {
              "bool": {
                "must": [
                  {
                    "term": {
                      "id": {
                        "value": "#{value:id}"
                      }
                    }
                  }
                ]
              }
            }
          }

数据发送到Kafka的关键日志:

2020-03-19  10:27:45.918 [pool-20-thread-3] DEBUG com.jzt.data.sync.datax.job.support.JobLogger - [scheme@LoadToKafkaTest-01[0]-437962474201808896] - [HermesExtract] 触发增量任务: message=[{"size":2,"data":[{"database":"b2b_third","type":"UPDATE","cols":[{"before":"1107986424536772609","name":"id","index":0,"after":"1107986424536772609"},{"before":"101","name":"package_id","index":1,"after":"101"},{"before":"1089704947936186369","name":"store_no","index":2,"after":"1089704947936186369"},{"before":"hubei0XS00000037","name":"order_code","index":3,"after":"hubei0XS00000037"},{"before":"","name":"erp_order_code","index":4,"after":""},{"before":"3","name":"distribution_mode","index":5,"after":"3"},{"before":{"isNull":true},"name":"out_stock_id","index":6,"after":{"isNull":true}},{"before":{"isNull":true},"name":"out_comment","index":7,"after":{"isNull":true}},{"before":"2019-06-25 11:35:46.000","name":"out_time","index":8,"after":"2019-06-25 11:35:46.000"},{"before":{"isNull":true},"name":"transport_code","index":9,"after":{"isNull":true}},{"before":"","name":"courier_company","index":10,"after":""},{"before":"1","name":"receive_status","index":11,"after":"1"},{"before":"系统2","name":"operator","index":12,"after":"系统1"},{"before":"2019-03-19 20:45:23.000","name":"create_at","index":13,"after":"2019-03-19 20:45:23.000"},{"before":"2020-03-19 09:29:18.899","name":"update_at","index":14,"after":"2020-03-19 10:27:20.355"},{"before":"0","name":"is_edit","index":15,"after":"0"},{"before":"0","name":"courier_other","index":16,"after":"0"},{"before":{"isNull":true},"name":"last_syn_time","index":17,"after":{"isNull":true}},{"before":{"isNull":true},"name":"syn_logistic_state","index":18,"after":{"isNull":true}},{"before":{"isNull":true},"name":"syn_res_json","index":19,"after":{"isNull":true}},{"before":{"isNull":true},"name":"shipper_code","index":20,"after":{"isNull":true}},{"before":{"isNull":true},"name":"syn_res_error_json","index":21,"after":{"isNull":true}},{"before":{"isNull":true},"name":"transport_code2","index":22,"after":{"isNull":true}},{"before":{"isNull":true},"name":"syn_res_json2","index":23,"after":{"isNull":true}},{"before":{"isNull":true},"name":"syn_logistic_state2","index":24,"after":{"isNull":true}},{"before":{"isNull":true},"name":"transport_code3","index":25,"after":{"isNull":true}},{"before":{"isNull":true},"name":"syn_res_json3","index":26,"after":{"isNull":true}},{"before":{"isNull":true},"name":"syn_logistic_state3","index":27,"after":{"isNull":true}},{"before":{"isNull":true},"name":"transport_code4","index":28,"after":{"isNull":true}},{"before":{"isNull":true},"name":"syn_res_json4","index":29,"after":{"isNull":true}},{"before":{"isNull":true},"name":"syn_logistic_state4","index":30,"after":{"isNull":true}},{"before":{"isNull":true},"name":"transport_code5","index":31,"after":{"isNull":true}},{"before":{"isNull":true},"name":"syn_res_json5","index":32,"after":{"isNull":true}},{"before":{"isNull":true},"name":"syn_logistic_state5","index":33,"after":{"isNull":true}}],"table":"b2b_third.tb_package_main"},{"database":"b2b_third","type":"UPDATE","cols":[{"before":"1107986424536772609","name":"id","index":0,"after":"1107986424536772609"},{"before":"101","name":"package_id","index":1,"after":"101"},{"before":"1089704947936186369","name":"store_no","index":2,"after":"1089704947936186369"},{"before":"hubei0XS00000037","name":"order_code","index":3,"after":"hubei0XS00000037"},{"before":"","name":"erp_order_code","index":4,"after":""},{"before":"3","name":"distribution_mode","index":5,"after":"3"},{"before":{"isNull":true},"name":"out_stock_id","index":6,"after":{"isNull":true}},{"before":{"isNull":true},"name":"out_comment","index":7,"after":{"isNull":true}},{"before":"2019-06-25 11:35:46.000","name":"out_time","index":8,"after":"2019-06-25 11:35:46.000"},{"before":{"isNull":true},"name":"transport_code","index":9,"after":{"isNull":true}},{"before":"","name":"courier_company","index":10,"after":""},{"before":"1","name":"receive_status","index":11,"after":"1"},{"before":"系统1","name":"operator","index":12,"after":"系统2"},{"before":"2019-03-19 20:45:23.000","name":"create_at","index":13,"after":"2019-03-19 20:45:23.000"},{"before":"2020-03-19 10:27:20.355","name":"update_at","index":14,"after":"2020-03-19 10:27:20.395"},{"before":"0","name":"is_edit","index":15,"after":"0"},{"before":"0","name":"courier_other","index":16,"after":"0"},{"before":{"isNull":true},"name":"last_syn_time","index":17,"after":{"isNull":true}},{"before":{"isNull":true},"name":"syn_logistic_state","index":18,"after":{"isNull":true}},{"before":{"isNull":true},"name":"syn_res_json","index":19,"after":{"isNull":true}},{"before":{"isNull":true},"name":"shipper_code","index":20,"after":{"isNull":true}},{"before":{"isNull":true},"name":"syn_res_error_json","index":21,"after":{"isNull":true}},{"before":{"isNull":true},"name":"transport_code2","index":22,"after":{"isNull":true}},{"before":{"isNull":true},"name":"syn_res_json2","index":23,"after":{"isNull":true}},{"before":{"isNull":true},"name":"syn_logistic_state2","index":24,"after":{"isNull":true}},{"before":{"isNull":true},"name":"transport_code3","index":25,"after":{"isNull":true}},{"before":{"isNull":true},"name":"syn_res_json3","index":26,"after":{"isNull":true}},{"before":{"isNull":true},"name":"syn_logistic_state3","index":27,"after":{"isNull":true}},{"before":{"isNull":true},"name":"transport_code4","index":28,"after":{"isNull":true}},{"before":{"isNull":true},"name":"syn_res_json4","index":29,"after":{"isNull":true}},{"before":{"isNull":true},"name":"syn_logistic_state4","index":30,"after":{"isNull":true}},{"before":{"isNull":true},"name":"transport_code5","index":31,"after":{"isNull":true}},{"before":{"isNull":true},"name":"syn_res_json5","index":32,"after":{"isNull":true}},{"before":{"isNull":true},"name":"syn_logistic_state5","index":33,"after":{"isNull":true}}],"table":"b2b_third.tb_package_main"}]}]
2020-03-19  10:27:45.922 [pool-20-thread-3] INFO  com.jzt.data.sync.datax.job.support.JobLogger - [scheme@LoadToKafkaTest-01[0]-437962474201808896] - [HermesExtract] 增量数据提取完成,数据量: 2
2020-03-19  10:27:45.923 [pool-20-thread-3] INFO  com.jzt.data.sync.datasource.service.ServiceDataSource - ---> 请求 [POST] http://10.4.9.175:9200/lzw-index/tb_package_main/_delete_by_query 
2020-03-19  10:27:45.931 [pool-20-thread-3] INFO  com.jzt.data.sync.datasource.service.ServiceDataSource - <--- 响应 [200] http://10.4.9.175:9200/lzw-index/tb_package_main/_delete_by_query (8ms)
2020-03-19  10:27:45.931 [pool-20-thread-3] INFO  com.jzt.data.sync.datax.job.support.JobLogger - [scheme@LoadToKafkaTest-01[0]-437962474201808896] - [KafkaLoad] 执行es_delete_query配置成功 result=[{"took":1,"timed_out":false,"total":0,"deleted":0,"batches":0,"version_conflicts":0,"noops":0,"retries":{"bulk":0,"search":0},"throttled_millis":0,"requests_per_second":-1.0,"throttled_until_millis":0,"failures":[]}]
2020-03-19  10:27:45.932 [pool-20-thread-3] INFO  com.jzt.data.sync.datasource.service.ServiceDataSource - ---> 请求 [POST] http://10.4.9.175:9200/lzw-index/tb_package_main/_delete_by_query 
2020-03-19  10:27:45.939 [pool-20-thread-3] INFO  com.jzt.data.sync.datasource.service.ServiceDataSource - <--- 响应 [200] http://10.4.9.175:9200/lzw-index/tb_package_main/_delete_by_query (8ms)
2020-03-19  10:27:45.939 [pool-20-thread-3] INFO  com.jzt.data.sync.datax.job.support.JobLogger - [scheme@LoadToKafkaTest-01[0]-437962474201808896] - [KafkaLoad] 执行es_delete_query配置成功 result=[{"took":1,"timed_out":false,"total":0,"deleted":0,"batches":0,"version_conflicts":0,"noops":0,"retries":{"bulk":0,"search":0},"throttled_millis":0,"requests_per_second":-1.0,"throttled_until_millis":0,"failures":[]}]
2020-03-19  10:27:45.940 [pool-20-thread-3] INFO  com.jzt.data.sync.datax.job.support.JobLogger - [scheme@LoadToKafkaTest-01[0]-437962474201808896] - [KafkaLoad] 数据发送到Kafka成功 -> topic=[lzw-test] | key=[lzw-index] | data=[{"transport_code3":null,"transport_code4":null,"transport_code5":null,"hermes_dml_operation":"UPDATE","store_no":"1089704947936186369","syn_res_json3":null,"syn_res_json2":null,"syn_res_json5":null,"package_id":"101","syn_res_json4":null,"operator":"系统1","courier_company":"","syn_logistic_state":null,"hermes_table":"b2b_third.tb_package_main","distribution_mode":"3","syn_logistic_state4":null,"syn_logistic_state3":null,"syn_res_error_json":null,"syn_logistic_state2":null,"id":"1107986424536772609","shipper_code":null,"out_comment":null,"transport_code":null,"transport_code2":null,"syn_logistic_state5":null,"out_time":"2019-06-25 11:35:46.000","syn_res_json":null,"erp_order_code":"","is_edit":"0","receive_status":"1","order_code":"hubei0XS00000037","last_syn_time":null,"out_stock_id":null,"courier_other":"0","update_at":"2020-03-19 10:27:20.355","create_at":"2019-03-19 20:45:23.000","es_index":"lzw-index","es_id":"1107986424536772609","es_type":"tb_package_main","es_action":"update"}]
2020-03-19  10:27:45.940 [pool-20-thread-3] INFO  com.jzt.data.sync.datax.job.support.JobLogger - [scheme@LoadToKafkaTest-01[0]-437962474201808896] - [KafkaLoad] 数据发送到Kafka成功 -> topic=[lzw-test] | key=[lzw-index] | data=[{"transport_code3":null,"transport_code4":null,"transport_code5":null,"hermes_dml_operation":"UPDATE","store_no":"1089704947936186369","syn_res_json3":null,"syn_res_json2":null,"syn_res_json5":null,"package_id":"101","syn_res_json4":null,"operator":"系统2","courier_company":"","syn_logistic_state":null,"hermes_table":"b2b_third.tb_package_main","distribution_mode":"3","syn_logistic_state4":null,"syn_logistic_state3":null,"syn_res_error_json":null,"syn_logistic_state2":null,"id":"1107986424536772609","shipper_code":null,"out_comment":null,"transport_code":null,"transport_code2":null,"syn_logistic_state5":null,"out_time":"2019-06-25 11:35:46.000","syn_res_json":null,"erp_order_code":"","is_edit":"0","receive_status":"1","order_code":"hubei0XS00000037","last_syn_time":null,"out_stock_id":null,"courier_other":"0","update_at":"2020-03-19 10:27:20.395","create_at":"2019-03-19 20:45:23.000","es_index":"lzw-index","es_id":"1107986424536772609","es_type":"tb_package_main","es_action":"update"}]
2020-03-19  10:27:45.941 [pool-20-thread-3] INFO  com.jzt.data.sync.datax.job.support.JobLogger - [scheme@LoadToKafkaTest-01[0]-437962474201808896] - [HermesExtract] 增量任务执行结果: accept, message=[{"size":2,"data":[{"database":"b2b_third","type":"UPDATE","cols":[{"before":"1107986424536772609","name":"id","index":0,"after":"1107986424536772609"},{"before":"101","name":"package_id","index":1,"after":"101"},{"before":"1089704947936186369","name":"store_no","index":2,"after":"1089704947936186369"},{"before":"hubei0XS00000037","name":"order_code","index":3,"after":"hubei0XS00000037"},{"before":"","name":"erp_order_code","index":4,"after":""},{"before":"3","name":"distribution_mode","index":5,"after":"3"},{"before":{"isNull":true},"name":"out_stock_id","index":6,"after":{"isNull":true}},{"before":{"isNull":true},"name":"out_comment","index":7,"after":{"isNull":true}},{"before":"2019-06-25 11:35:46.000","name":"out_time","index":8,"after":"2019-06-25 11:35:46.000"},{"before":{"isNull":true},"name":"transport_code","index":9,"after":{"isNull":true}},{"before":"","name":"courier_company","index":10,"after":""},{"before":"1","name":"receive_status","index":11,"after":"1"},{"before":"系统2","name":"operator","index":12,"after":"系统1"},{"before":"2019-03-19 20:45:23.000","name":"create_at","index":13,"after":"2019-03-19 20:45:23.000"},{"before":"2020-03-19 09:29:18.899","name":"update_at","index":14,"after":"2020-03-19 10:27:20.355"},{"before":"0","name":"is_edit","index":15,"after":"0"},{"before":"0","name":"courier_other","index":16,"after":"0"},{"before":{"isNull":true},"name":"last_syn_time","index":17,"after":{"isNull":true}},{"before":{"isNull":true},"name":"syn_logistic_state","index":18,"after":{"isNull":true}},{"before":{"isNull":true},"name":"syn_res_json","index":19,"after":{"isNull":true}},{"before":{"isNull":true},"name":"shipper_code","index":20,"after":{"isNull":true}},{"before":{"isNull":true},"name":"syn_res_error_json","index":21,"after":{"isNull":true}},{"before":{"isNull":true},"name":"transport_code2","index":22,"after":{"isNull":true}},{"before":{"isNull":true},"name":"syn_res_json2","index":23,"after":{"isNull":true}},{"before":{"isNull":true},"name":"syn_logistic_state2","index":24,"after":{"isNull":true}},{"before":{"isNull":true},"name":"transport_code3","index":25,"after":{"isNull":true}},{"before":{"isNull":true},"name":"syn_res_json3","index":26,"after":{"isNull":true}},{"before":{"isNull":true},"name":"syn_logistic_state3","index":27,"after":{"isNull":true}},{"before":{"isNull":true},"name":"transport_code4","index":28,"after":{"isNull":true}},{"before":{"isNull":true},"name":"syn_res_json4","index":29,"after":{"isNull":true}},{"before":{"isNull":true},"name":"syn_logistic_state4","index":30,"after":{"isNull":true}},{"before":{"isNull":true},"name":"transport_code5","index":31,"after":{"isNull":true}},{"before":{"isNull":true},"name":"syn_res_json5","index":32,"after":{"isNull":true}},{"before":{"isNull":true},"name":"syn_logistic_state5","index":33,"after":{"isNull":true}}],"table":"b2b_third.tb_package_main"},{"database":"b2b_third","type":"UPDATE","cols":[{"before":"1107986424536772609","name":"id","index":0,"after":"1107986424536772609"},{"before":"101","name":"package_id","index":1,"after":"101"},{"before":"1089704947936186369","name":"store_no","index":2,"after":"1089704947936186369"},{"before":"hubei0XS00000037","name":"order_code","index":3,"after":"hubei0XS00000037"},{"before":"","name":"erp_order_code","index":4,"after":""},{"before":"3","name":"distribution_mode","index":5,"after":"3"},{"before":{"isNull":true},"name":"out_stock_id","index":6,"after":{"isNull":true}},{"before":{"isNull":true},"name":"out_comment","index":7,"after":{"isNull":true}},{"before":"2019-06-25 11:35:46.000","name":"out_time","index":8,"after":"2019-06-25 11:35:46.000"},{"before":{"isNull":true},"name":"transport_code","index":9,"after":{"isNull":true}},{"before":"","name":"courier_company","index":10,"after":""},{"before":"1","name":"receive_status","index":11,"after":"1"},{"before":"系统1","name":"operator","index":12,"after":"系统2"},{"before":"2019-03-19 20:45:23.000","name":"create_at","index":13,"after":"2019-03-19 20:45:23.000"},{"before":"2020-03-19 10:27:20.355","name":"update_at","index":14,"after":"2020-03-19 10:27:20.395"},{"before":"0","name":"is_edit","index":15,"after":"0"},{"before":"0","name":"courier_other","index":16,"after":"0"},{"before":{"isNull":true},"name":"last_syn_time","index":17,"after":{"isNull":true}},{"before":{"isNull":true},"name":"syn_logistic_state","index":18,"after":{"isNull":true}},{"before":{"isNull":true},"name":"syn_res_json","index":19,"after":{"isNull":true}},{"before":{"isNull":true},"name":"shipper_code","index":20,"after":{"isNull":true}},{"before":{"isNull":true},"name":"syn_res_error_json","index":21,"after":{"isNull":true}},{"before":{"isNull":true},"name":"transport_code2","index":22,"after":{"isNull":true}},{"before":{"isNull":true},"name":"syn_res_json2","index":23,"after":{"isNull":true}},{"before":{"isNull":true},"name":"syn_logistic_state2","index":24,"after":{"isNull":true}},{"before":{"isNull":true},"name":"transport_code3","index":25,"after":{"isNull":true}},{"before":{"isNull":true},"name":"syn_res_json3","index":26,"after":{"isNull":true}},{"before":{"isNull":true},"name":"syn_logistic_state3","index":27,"after":{"isNull":true}},{"before":{"isNull":true},"name":"transport_code4","index":28,"after":{"isNull":true}},{"before":{"isNull":true},"name":"syn_res_json4","index":29,"after":{"isNull":true}},{"before":{"isNull":true},"name":"syn_logistic_state4","index":30,"after":{"isNull":true}},{"before":{"isNull":true},"name":"transport_code5","index":31,"after":{"isNull":true}},{"before":{"isNull":true},"name":"syn_res_json5","index":32,"after":{"isNull":true}},{"before":{"isNull":true},"name":"syn_logistic_state5","index":33,"after":{"isNull":true}}],"table":"b2b_third.tb_package_main"}]}]

es_delete_datasourcees_delete_query的配置的作用: 当数据更新之后强制删除Elasticsearch中对应的旧数据,这个配置可有可无。

TODO: 补充 数据从Kafka同步到Elasticsearch的关键逻辑

同步增量删除的数据

在“增量同步任务配置”中已经讲了增量数据分为NSERTDELETEUPDATE三种。当同步的数据是DELETE类型时,我们可能需要把目标数据源的数据也要删除,此时就需要使用es_delete_ides_delete_ext_action配置了,es_delete_ides_delete_ext_action配置格式如下:

es_delete_id:
  {tableName}: #{value:{fieldName}}

es_delete_ext_action:
  {tableName}:
    datasource: String `service@{datasourceName}`(Datasource引用)
    method: String (Http method)
    url: String (Http path 模板)
    body: String (Http body 模板)

使用es_delete_id配置是通过把删除数据发送到kafka,再通过其它工具同步删除Elasticsearch中的数据,使用Demo如下:

scheme:
  EsDeleteIdTest-01:
    enable: true
    extract:
      - type: hermes
        datasource: rabbitmq@b2b_third_prod
        table: b2b_third.tb_package_main
        queue_prefix: lizw-test-
    load:
      - type: kafka
        datasource: kafka@b2b_third_prod
        topic: lzw-test
        es_index: "#{index:'lzw-index'}"
        es_type: tb_package_main
        es_id: id
        es_delete_id:
          b2b_third.tb_package_main: "#{value:id}"
          # b2b5.tb_sale_fairs_merchandise: "#{value:'zxh_'id}"

数据发送到Kafka的关键日志:

2020-03-19  09:57:44.970 [pool-20-thread-3] DEBUG com.jzt.data.sync.datax.job.support.JobLogger - [scheme@EsDeleteIdTest-01[0]-437954889239232512] - [HermesExtract] 触发增量任务: message=[{"size":1,"data":[{"database":"b2b_third","type":"DELETE","cols":[{"before":"1240171062044143619","name":"id","index":0},{"before":"100","name":"package_id","index":1},{"before":"1089704947936186369","name":"store_no","index":2},{"before":"hubei0XS00000035","name":"order_code","index":3},{"before":"1234","name":"erp_order_code","index":4},{"before":"3","name":"distribution_mode","index":5},{"before":{"isNull":true},"name":"out_stock_id","index":6},{"before":{"isNull":true},"name":"out_comment","index":7},{"before":"2019-06-25 11:35:46.000","name":"out_time","index":8},{"before":{"isNull":true},"name":"transport_code","index":9},{"before":"","name":"courier_company","index":10},{"before":"1","name":"receive_status","index":11},{"before":"测试111","name":"operator","index":12},{"before":"2020-03-18 14:59:34.742","name":"create_at","index":13},{"before":"2020-03-18 23:29:40.135","name":"update_at","index":14},{"before":"0","name":"is_edit","index":15},{"before":"0","name":"courier_other","index":16},{"before":{"isNull":true},"name":"last_syn_time","index":17},{"before":{"isNull":true},"name":"syn_logistic_state","index":18},{"before":{"isNull":true},"name":"syn_res_json","index":19},{"before":{"isNull":true},"name":"shipper_code","index":20},{"before":{"isNull":true},"name":"syn_res_error_json","index":21},{"before":{"isNull":true},"name":"transport_code2","index":22},{"before":{"isNull":true},"name":"syn_res_json2","index":23},{"before":{"isNull":true},"name":"syn_logistic_state2","index":24},{"before":{"isNull":true},"name":"transport_code3","index":25},{"before":{"isNull":true},"name":"syn_res_json3","index":26},{"before":{"isNull":true},"name":"syn_logistic_state3","index":27},{"before":{"isNull":true},"name":"transport_code4","index":28},{"before":{"isNull":true},"name":"syn_res_json4","index":29},{"before":{"isNull":true},"name":"syn_logistic_state4","index":30},{"before":{"isNull":true},"name":"transport_code5","index":31},{"before":{"isNull":true},"name":"syn_res_json5","index":32},{"before":{"isNull":true},"name":"syn_logistic_state5","index":33}],"table":"b2b_third.tb_package_main"}]}]
2020-03-19  09:57:44.981 [pool-20-thread-3] INFO  com.jzt.data.sync.datax.job.support.JobLogger - [scheme@EsDeleteIdTest-01[0]-437954889239232512] - [HermesExtract] 增量数据提取完成,数据量: 1
2020-03-19  09:57:44.985 [pool-20-thread-3] DEBUG com.jzt.data.sync.datax.job.support.JobLogger - [scheme@EsDeleteIdTest-01[0]-437954889239232512] - [KafkaLoad] es_delete_id 执行成功: Topic=[lzw-test] | sendData=[{"transport_code3":null,"transport_code4":null,"transport_code5":null,"hermes_dml_operation":"DELETE","store_no":"1089704947936186369","syn_res_json3":null,"syn_res_json2":null,"syn_res_json5":null,"package_id":"100","syn_res_json4":null,"operator":"测试111","courier_company":"","syn_logistic_state":null,"hermes_table":"b2b_third.tb_package_main","distribution_mode":"3","syn_logistic_state4":null,"syn_logistic_state3":null,"syn_res_error_json":null,"syn_logistic_state2":null,"id":"1240171062044143619","shipper_code":null,"out_comment":null,"transport_code":null,"transport_code2":null,"syn_logistic_state5":null,"out_time":"2019-06-25 11:35:46.000","syn_res_json":null,"erp_order_code":"1234","is_edit":"0","receive_status":"1","order_code":"hubei0XS00000035","last_syn_time":null,"out_stock_id":null,"courier_other":"0","update_at":"2020-03-18 23:29:40.135","create_at":"2020-03-18 14:59:34.742","es_id":"1240171062044143619","es_index":"lzw-index","es_type":"tb_package_main","es_action":"delete"}]
2020-03-19  09:57:44.986 [pool-20-thread-3] INFO  com.jzt.data.sync.datax.job.support.JobLogger - [scheme@EsDeleteIdTest-01[0]-437954889239232512] - [HermesExtract] 增量任务执行结果: accept, message=[{"size":1,"data":[{"database":"b2b_third","type":"DELETE","cols":[{"before":"1240171062044143619","name":"id","index":0},{"before":"100","name":"package_id","index":1},{"before":"1089704947936186369","name":"store_no","index":2},{"before":"hubei0XS00000035","name":"order_code","index":3},{"before":"1234","name":"erp_order_code","index":4},{"before":"3","name":"distribution_mode","index":5},{"before":{"isNull":true},"name":"out_stock_id","index":6},{"before":{"isNull":true},"name":"out_comment","index":7},{"before":"2019-06-25 11:35:46.000","name":"out_time","index":8},{"before":{"isNull":true},"name":"transport_code","index":9},{"before":"","name":"courier_company","index":10},{"before":"1","name":"receive_status","index":11},{"before":"测试111","name":"operator","index":12},{"before":"2020-03-18 14:59:34.742","name":"create_at","index":13},{"before":"2020-03-18 23:29:40.135","name":"update_at","index":14},{"before":"0","name":"is_edit","index":15},{"before":"0","name":"courier_other","index":16},{"before":{"isNull":true},"name":"last_syn_time","index":17},{"before":{"isNull":true},"name":"syn_logistic_state","index":18},{"before":{"isNull":true},"name":"syn_res_json","index":19},{"before":{"isNull":true},"name":"shipper_code","index":20},{"before":{"isNull":true},"name":"syn_res_error_json","index":21},{"before":{"isNull":true},"name":"transport_code2","index":22},{"before":{"isNull":true},"name":"syn_res_json2","index":23},{"before":{"isNull":true},"name":"syn_logistic_state2","index":24},{"before":{"isNull":true},"name":"transport_code3","index":25},{"before":{"isNull":true},"name":"syn_res_json3","index":26},{"before":{"isNull":true},"name":"syn_logistic_state3","index":27},{"before":{"isNull":true},"name":"transport_code4","index":28},{"before":{"isNull":true},"name":"syn_res_json4","index":29},{"before":{"isNull":true},"name":"syn_logistic_state4","index":30},{"before":{"isNull":true},"name":"transport_code5","index":31},{"before":{"isNull":true},"name":"syn_res_json5","index":32},{"before":{"isNull":true},"name":"syn_logistic_state5","index":33}],"table":"b2b_third.tb_package_main"}]}]

使用es_delete_ext_action配置是直接调用目标系统API同步删除数据逻辑,经常用作在同步业务Service数据,而非Elasticsearch的数据,使用Demo如下:

scheme:
  EsDeleteIdTest-01:
    enable: true
    extract:
      - type: hermes
        datasource: rabbitmq@b2b_third_prod
        table: b2b_third.tb_package_main
        queue_prefix: lizw-test-
    load:
      - type: kafka
        datasource: kafka@b2b_third_prod
        topic: lzw-test
        es_index: "#{index:'lzw-index'}"
        es_type: tb_package_main
        es_id: id
        es_delete_ext_action:
          b2b_third.tb_package_main:
            datasource: service@b2b_third_prod_es
            method: post
            url: "#{url:'/lzw-index/tb_package_main/_delete_by_query'}"
            body: >
              {
                "query": {
                  "bool": {
                    "must": [
                    {
                      "term": {
                        "id": {
                          "value": "#{value:id}"
                        }
                      }
                    }
                    ]
                  }
                }
              }

关键日志:

2020-03-19  10:40:47.323 [pool-20-thread-3] DEBUG com.jzt.data.sync.datax.job.support.JobLogger - [scheme@EsDeleteIdTest-01[0]-437965725513023488] - [HermesExtract] 触发增量任务: message=[{"size":1,"data":[{"database":"b2b_third","type":"DELETE","cols":[{"before":"1240171070449528835","name":"id","index":0},{"before":"100","name":"package_id","index":1},{"before":"1089704947936186369","name":"store_no","index":2},{"before":"hubei0XS00000035","name":"order_code","index":3},{"before":"1234","name":"erp_order_code","index":4},{"before":"3","name":"distribution_mode","index":5},{"before":{"isNull":true},"name":"out_stock_id","index":6},{"before":{"isNull":true},"name":"out_comment","index":7},{"before":"2019-06-25 11:35:46.000","name":"out_time","index":8},{"before":{"isNull":true},"name":"transport_code","index":9},{"before":"","name":"courier_company","index":10},{"before":"1","name":"receive_status","index":11},{"before":"测试111","name":"operator","index":12},{"before":"2020-03-18 14:59:36.743","name":"create_at","index":13},{"before":"2020-03-18 23:29:40.135","name":"update_at","index":14},{"before":"0","name":"is_edit","index":15},{"before":"0","name":"courier_other","index":16},{"before":{"isNull":true},"name":"last_syn_time","index":17},{"before":{"isNull":true},"name":"syn_logistic_state","index":18},{"before":{"isNull":true},"name":"syn_res_json","index":19},{"before":{"isNull":true},"name":"shipper_code","index":20},{"before":{"isNull":true},"name":"syn_res_error_json","index":21},{"before":{"isNull":true},"name":"transport_code2","index":22},{"before":{"isNull":true},"name":"syn_res_json2","index":23},{"before":{"isNull":true},"name":"syn_logistic_state2","index":24},{"before":{"isNull":true},"name":"transport_code3","index":25},{"before":{"isNull":true},"name":"syn_res_json3","index":26},{"before":{"isNull":true},"name":"syn_logistic_state3","index":27},{"before":{"isNull":true},"name":"transport_code4","index":28},{"before":{"isNull":true},"name":"syn_res_json4","index":29},{"before":{"isNull":true},"name":"syn_logistic_state4","index":30},{"before":{"isNull":true},"name":"transport_code5","index":31},{"before":{"isNull":true},"name":"syn_res_json5","index":32},{"before":{"isNull":true},"name":"syn_logistic_state5","index":33}],"table":"b2b_third.tb_package_main"}]}]
2020-03-19  10:40:47.327 [pool-20-thread-3] INFO  com.jzt.data.sync.datax.job.support.JobLogger - [scheme@EsDeleteIdTest-01[0]-437965725513023488] - [HermesExtract] 增量数据提取完成,数据量: 1
2020-03-19  10:40:47.328 [pool-20-thread-3] INFO  com.jzt.data.sync.datasource.service.ServiceDataSource - ---> 请求 [POST] http://10.4.9.175:9200//lzw-index/tb_package_main/_delete_by_query 
2020-03-19  10:40:47.334 [pool-20-thread-3] INFO  com.jzt.data.sync.datasource.service.ServiceDataSource - <--- 响应 [400] http://10.4.9.175:9200//lzw-index/tb_package_main/_delete_by_query (6ms)
2020-03-19  10:40:47.334 [pool-20-thread-3] INFO  com.jzt.data.sync.datax.job.support.JobLogger - [scheme@EsDeleteIdTest-01[0]-437965725513023488] - [ServiceLoad] es_delete_ext_action执行成功: [No handler found for uri [//lzw-index/tb_package_main/_delete_by_query] and method [POST]]
2020-03-19  10:40:47.334 [pool-20-thread-3] WARN  com.jzt.data.sync.datax.job.support.JobLogger - [scheme@EsDeleteIdTest-01[0]-437965725513023488] - [KafkaLoad] es_delete_id.b2b_third.tb_package_main配置为空
2020-03-19  10:40:47.334 [pool-20-thread-3] INFO  com.jzt.data.sync.datax.job.support.JobLogger - [scheme@EsDeleteIdTest-01[0]-437965725513023488] - [HermesExtract] 增量任务执行结果: accept, message=[{"size":1,"data":[{"database":"b2b_third","type":"DELETE","cols":[{"before":"1240171070449528835","name":"id","index":0},{"before":"100","name":"package_id","index":1},{"before":"1089704947936186369","name":"store_no","index":2},{"before":"hubei0XS00000035","name":"order_code","index":3},{"before":"1234","name":"erp_order_code","index":4},{"before":"3","name":"distribution_mode","index":5},{"before":{"isNull":true},"name":"out_stock_id","index":6},{"before":{"isNull":true},"name":"out_comment","index":7},{"before":"2019-06-25 11:35:46.000","name":"out_time","index":8},{"before":{"isNull":true},"name":"transport_code","index":9},{"before":"","name":"courier_company","index":10},{"before":"1","name":"receive_status","index":11},{"before":"测试111","name":"operator","index":12},{"before":"2020-03-18 14:59:36.743","name":"create_at","index":13},{"before":"2020-03-18 23:29:40.135","name":"update_at","index":14},{"before":"0","name":"is_edit","index":15},{"before":"0","name":"courier_other","index":16},{"before":{"isNull":true},"name":"last_syn_time","index":17},{"before":{"isNull":true},"name":"syn_logistic_state","index":18},{"before":{"isNull":true},"name":"syn_res_json","index":19},{"before":{"isNull":true},"name":"shipper_code","index":20},{"before":{"isNull":true},"name":"syn_res_error_json","index":21},{"before":{"isNull":true},"name":"transport_code2","index":22},{"before":{"isNull":true},"name":"syn_res_json2","index":23},{"before":{"isNull":true},"name":"syn_logistic_state2","index":24},{"before":{"isNull":true},"name":"transport_code3","index":25},{"before":{"isNull":true},"name":"syn_res_json3","index":26},{"before":{"isNull":true},"name":"syn_logistic_state3","index":27},{"before":{"isNull":true},"name":"transport_code4","index":28},{"before":{"isNull":true},"name":"syn_res_json4","index":29},{"before":{"isNull":true},"name":"syn_logistic_state4","index":30},{"before":{"isNull":true},"name":"transport_code5","index":31},{"before":{"isNull":true},"name":"syn_res_json5","index":32},{"before":{"isNull":true},"name":"syn_logistic_state5","index":33}],"table":"b2b_third.tb_package_main"}]}]
文档更新时间: 2020-03-19 10:44   作者:lizw