Datax配置

Datax支持yaml配置文件格式其主要配置如下:

配置 类型 说明
datasource 定义全局数据源
repo 定义全局sql脚本
schema(scheme) 定义数据同步逻辑,其中可以引用其他配置部分(datasourcerepotemplate)资源
template 配置文件模板,用于配置的重用,支持变量填充

datasource配置

datasource用于配置数据源(支持jdbc、rabbitmq、kafka、service)

数据源类型 类型 说明
jdbc Java JDBC数据源,如: mysql、oracle
rabbitmq RabbitMQ 消息队列
kafka Kafka 消息队列
service Http(Https) Api Service数据源(把Http接口看做一个数据源,通过Http接口读写数据)

datasource.jdbc

datasource.jdbc配置项 类型 说明
driver
url
user
password
max_active(max-active)
hermes
hermes_mq(hermes-mq)

datasource.rabbitmq

datasource.jdbc配置项 类型 说明
host
port
user
username
password
vhost

datasource.kafka

datasource.kafka配置项 类型 说明
servers
retries
batch_size(batch-size)
linger_ms(linger-ms)
buffer_memory(buffer-memory)
request_timeout(request-timeout)

datasource.service

datasource.service配置项 类型 说明
url
read_timeout(read-timeout)
connect_timeout(connect-timeout)
charset

repo配置

目前repo只有sql配置项,用于配置sql脚本

repo.sql

repo.sql配置项 类型 说明
type
driect(dialect)
sql

schema(scheme)配置

schema(scheme)用于配置整个数据同步逻辑,其配置可以引用repo和datasource中的资源

schema(scheme)配置项 类型 说明
enable
template
extract
load

scheme.template

template配置项 类型 说明
name
items

scheme.extract

extract 用于定义提取数据逻辑

extract配置项 类型 说明
type hermesjdbcservice(枚举字符串) hermes:增量。jdbc:全量
is_log_change(is-log-change) boolean 控制变化数据是否打印到日志(type属于hermes、、、才能生效)
pre_piplines(pre-pipelines) piplines配置项 用于处理提取的数据type属于hermes、、、才能生效
piplines(pipelines) piplines配置项 用于处理提取的数据
retry_stage(retry-stage) INCREASESAME(枚举字符串) 重试策略,INCREASE:重试时间间隔按规则增加(重试时间间隔 * 重试次数)。SAME: 重试时间间隔固定
retry_interval_milliseconds(retry-interval-milliseconds) long 重试间隔(毫秒)
piplines配置项 类型 说明
type
file_url(file-url)
code
当extract.type=hermes时支持的扩展配置
extract.type=hermes的扩展配置项 类型 说明
table String 增量任务所对应的数据库 table
queue_prefix(queue-prefix) String 监听的增量数据对应的队列前缀 (队列名称:{queue_prefix}{table})
care List<String> 增量任务所关心的 table 字段
extract List<String> 未知属性,并未使用
pre_filter(pre-filter) Map<String, String>Map<String, Map<String, String>> 用于过滤提取的数据,按字段匹配过滤,匹配上了就过滤调
operation List<String> 过滤增量数据的 hermes_dml_operation 类型,匹配不上就过滤(注意这里的operation是包含逻辑)
ignore_delete(ignore-delete) boolean 是否忽略删除的增量数据
is_upsert_with_before(is-upsert-with-before) boolean 已使用,但不知道作用
datasource rabbitmq@{datasourceName}(Datasource引用) 提取数据的源数据库(Mysql增量数据数据源)
transform transform配置项 用于转换提取的数据(只处理非删除类型的增量数据)
delete_transform(delete-transform) transform配置项 用于转换提取的数据(只处理删除类型的增量数据)

pre_piplines、transform、piplines处理数据顺序: pre_piplines -> transform -> piplines

transform配置项 类型 说明
datasource
repo
where
当extract.type=jdbc时支持的扩展配置
extract.type=jdbc的扩展配置项 类型 说明
command_datasource(command-datasource) rabbitmq@{datasourceName}(Datasource引用) 必须引用rabbitmq数据源
command_queue(command-queue) string rabbitmq队列名称(用于监听全量任务触发指令)
cursor_mode(cursor-mode) boolean 执行sql语句读取数据时是否使用Jdbc游标读取数据
cursor_per_count(cursor-per-count) int 游标模式读取数据时(cursor_mode)每次加载的数据量(fetchSize)
clear_queue(clear-queue) List<String> 定义提取数据之前需要清除数据的队列
pause_other(pause-other) boolean 未知属性,并未使用
ignore_retry(ignore-retry) boolean 全量任务执行失败时是否忽略重试
datasource jdbc@{datasourceName}(Datasource引用) 提取数据的源数据库
repo sql@{sqlName}(Sql语句引用) 同步任务提取数据的sql语句配置
where Map reposql语句的参数

只支持 piplines 不支持 prePiplines

当extract.type=service时支持的扩展配置
extract.type=service的扩展配置项 类型 说明
command_datasource(command-datasource)
command_queue(command-queue)
pause_other(pause-other)

scheme.load

load 用于定义数据的更新(增、删、改)逻辑

scheme.load配置项 类型 说明
type jdbckafkaservicerabbitmq
is_log_load(is-log-load)
batch_size(batch-size) int load批量操作的数据量, load.type=servicerabbitmq时才有效
batch_skip_error(batch-skip-error) boolean 批量load时是否跳过错误, load.type=service时才有效
当load.type=jdbc时支持的扩展配置
load.type=jdbc的扩展配置项 类型 说明
datasource jdbc@{datasourceName}(Datasource引用) 目标数据库(Jdbc数据源)
repo sql@{sqlName}(Sql语句引用) 更新数据的SQL配置
delete_ext_action(delete-ext-action) Map<String, Object> 定义处理删除类型的数据
delete_ext_action:
  {tableName}:
    repo: String `sql@{sqlName}`(Sql语句引用)
    where: String (Sql Where条件)
当load.type=kafka时支持的扩展配置
load.type=kafka的扩展配置项 类型 说明
datasource kafka@{datasourceName}(Datasource引用) 目标数据库(Kafka数据源)
topic String 消息发送到Kafka对应的主题
es_index(es-index) String 设置elasticsearch索引(Index)
es_id(es-id) String elasticsearch数据id对应的字段名
es_type(es-type) String 设置elasticsearch索引(Index)的类型(Type)
es_parent(es-parent) String elasticsearch数据parent对应的字段名
es_delete_id(es-delete-id) Map<String, String> 定义处理删除类型的数据,用作删除elasticsearch中对应的数据
es_delete_datasource(es-delete-datasource) service@{datasourceName}(Datasource引用) 目标数据库(Service数据源,一般使用elasticsearch)
es_delete_query(es-delete-query) String 定义处理非删除类型的数据,执行elasticsearch_delete_by_query操作的body内容模板
es_delete_ext_action(es-delete-ext-action) Map<String, Object> 定义处理删除类型的数据(扩展esDeleteId配置)
es_delete_id:
  {tableName}: #{value:{fieldName}}
es_delete_ext_action:
  {tableName}:
    datasource: String `service@{datasourceName}`(Datasource引用)
    method: String (Http method)
    url: String (Http path 模板)
    body: String (Http body 模板)
当load.type=service时支持的扩展配置
load.type=service的扩展配置项 类型 说明
datasource service@{datasourceName}(Datasource引用) 目标数据库(Service数据源)
is_with_raw(is-with-raw) boolean 是否发送原始数据到Service(特殊数据格式)
delete_ext_action(delete-ext-action) Map<String, Object> 定义处理删除类型的数据
delete_ext_action:
  {tableName}:
    datasource: String `service@{datasourceName}`(Datasource引用,Service数据源)
当load.type=rabbitmq时支持的扩展配置
load.type=rabbitmq的扩展配置项 类型 说明
datasource rabbitmq@{datasourceName}(Datasource引用) 必须引用rabbitmq数据源
exchange String 设置RabbitMQ交换器名称,默认: amq.default
route_key(route-key) String exchange 和 queueName 对应的 route key,默认: queueName配置值
queue_name(queue-name) String 队列名称,用于保存非删除类型的数据
delete_route_key(delete-route-key) String exchange 和 deleteQueueName 对应的 route key,默认: deleteQueueName配置值
delete_queue_name(delete-queue-name) String 队列名称,用于保存删除类型的数据
care List<String> 过滤数据,只发送关心的字段到MQ
compress String 消息发送到MQ中使用的压缩算法,可选snappy

template配置

template的配置项同scheme

文档更新时间: 2020-03-13 17:26   作者:lizw