使用Datax web 创建同步任务
准备工作
创建数据源 配置数据库相关信息
创建执行器 配置执行器执行地址相关信息
1.构建reade
1.1 SQL语句 (querySql)
在json文件中此部分配置就是 querySql
在有些业务场景下,where这一配置项不足以描述所筛选的条件,用户可以通过该配置型来自定义筛选SQL。当用户配置了这一项之后,DataX系统就会忽略table,column这些配置型,直接使用这个配置项的内容对数据进行筛选,例如需要进行多表join后同步数据,使用select a,b from table_a join table_b on table_a.id = table_b.id
当用户配置querySql时,xxxReader直接忽略table、column、where条件的配置。如果配置了querySql又配置了table,column、where等,在log中会有警告日志
1.2 表所有字段 (column)
在json文件中此部分配置就是 column
2.构建reade
2.1 前置sql语句 (preSql)
在json文件中此部分配置就是 preSql
写入数据到目的表前,会先执行这里的标准语句。如果 Sql 中有你需要操作到的表名称,请使用 @table 表示,这样在实际执行 Sql 语句时,会对变量按照实际表名称进行替换。比如你的任务是要写入到目的端的100个同构分表(表名称为:datax_00,datax01, … datax_98,datax_99),并且你希望导入数据前,先对表中数据进行删除操作,那么你可以这样配置:“preSql”:[“delete from 表名”],效果是:在执行到每个表写入数据前,会先执行对应的 delete from 对应表名称。
// 也可以通过配置参数进行部分数据删除 lastTime为传入的辅助参数 "preSql": [ "delete from biz_ad_facebook where date =${lastTime}" ]
2.2 postSql (postSql)
在json文件中此部分配置就是 postSql
== 写入数据到目的表后,会执行这里的标准语句。(原理同 preSql ) ==
3.字段映射
4.构建json ----选择模板
5.增量更新的配置
通过上述步骤创建出来的任务,在任务列表找到并编辑。
其中用于增量更新的辅助参数。
1.任务类型选DataX任务
2.辅助参数选择时间自增
3.增量开始时间选择,即sql中查询时间的开始时间,用户使用此选项方便第一次的全量同步。第一次同步完成后,该时间被更新为上一次的任务触发时间,任务失败不更新。
4.增量时间字段,-DlastTime=‘%s’ -DcurrentTime=‘%s’ 先来解析下这段字符串
1.-D是DataX参数的标识符,必配 2.-D后面的lastTime和currentTime是DataX json中where条件的时间字段标识符,必须和json中的变量名称保持一致 3.='%s'是项目用来去替换时间的占位符,比配并且格式要完全一致 4.注意-DlastTime='%s'和-DcurrentTime='%s'中间有一个空格,空格必须保留并且是一个空格
6.遇到的问题—增量数据导入 可能重复的主键
解决办法:
datax的writeMode参数: "writeMode": "update",
1.insert
这个参数可以设置为insert,这样子就是对于同步的主键进行设置。主要主键存在,那么在更新的时候,就不会将结果表中的数据进行修改。只会增加新的数据。
2.update
在同步到时候,设置主键,那么会查看表中主键内容的数据,如果有变动,就会直接进行替换。
// 示例json { "job": { "setting": { "speed": { "channel": 3, "byte": 1048576 }, "errorLimit": { "record": 0, "percentage": 0.02 } }, "content": [ { "reader": { "name": "****", "parameter": { "username": "****", "password": "****", "splitPk": "", "connection": [ { "querySql": [ "select * from tablename where date >= ${lastTime} and date< ${currentTime}" ], "jdbcUrl": [ "****" ] } ] } }, "writer": { "name": "mysqlwriter", "parameter": { "writeMode": "update", "username": "******", "password": "*******", "column": [ "`id`", "`account_id`", "`account_name`", "`campaign_id`", "`campaign_name`", "`ad_set_id`", "`ad_set_name`", "`ad_id`", "`ad_name`", "`date`", "`date_start`", "`date_stop`", "`spend`", "`cpm`", "`ctr`", "`cpc`", "`purchase`", "`order_num`", "`roas`", "`clicks`", "`video_avg_time_watched_actions`", "`video_creator`", "`level`", "`create_by`", "`create_time`", "`update_by`", "`update_time`", "`sys_org_code`", "`product_name`", "`site_short_name`", "`operator`", "`date_timestamp`", "`impressions`", "`add_to_card`", "`video_thirty_sec_watched`", "`unique_clicks`", "`unique_checkout`", "`unique_add_to_card`", "`conversion_rate_ranking`", "`engagement_rate_ranking`", "`quality_ranking`", "`video_p100_watched`", "`video_p25_watched`", "`video_p50_watched`", "`video_p75_watched`", "`video_p95_watched`", "`view_content`", "`landing_page_view`", "`unique_link_clicks`", "`checkout`", "`designer`", "`campaign_objective`", "`inline_clicks`", "`unique_inline_clicks`", "`outbound_clicks`", "`unique_outbound_clicks`", "`frequency`", "`inline_post_engagement`", "`cost_per_inline_click`", "`cost_per_inline_post`", "`cost_per_outbound_click`", "`cost_per_unique_click`", "`cost_per_unique_inline_click`", "`cost_per_unique_outbound_click`", "`watch_avg_time`", "`video_play_curve`", "`asc_flag`" ], "preSql": [ "delete from tablename where date =${lastTime}" ], "connection": [ { "table": [ "biz_ad_facebook" ], "jdbcUrl": "jdbc:mysql://localhost:3322/powerbi" } ] } } } ] } }
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章