上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

Flink CDC和Flink SQL构建实时数仓Flink写入Doris

guduadmin14小时前

软件环境 Flink1.13.3

Scala 2.12

doris 0.14

一、MySQL 开启binlog日志、创建用户

1.开启bin log

MySQL 8.0默认开启了binlog,可以通过代码show variables like "%log_bin%";查询是否开启了,show variables like "%server_id%";查询服务器ID。
Flink CDC和Flink SQL构建实时数仓Flink写入Doris,第1张

上图分别显示了bin long是否开启以及bin log所在的位置。

2.创建用户

CREATE USER 'flinktest' IDENTIFIED BY '123456'; 

GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flinktest';

如果遇到报错:

Your password does not satisfy the current policy requirements

Mysql8版本输入

set global validate_password.policy=0; set global validate_password.length=6;

如果是mysql5.6版本 set global validate_password_policy=LOW;set global validate_password_length=6;

二、添加依赖

到仓库服务或者这里下载 cdc依赖flink-connector-mysql-cdc-2.0.2.jar 添加到$FLINK_HOME/lib下面

这里一定要注意一下cdc和flink版本的匹配关系,否则执行SQL的时候会报错:

[ERROR] Could not execute SQL statement. Reason:

java.lang.NoSuchMethodError: org.apache.flink.table.factories.DynamicTableFactory$Context.getCatalogTable()Lorg/apache/flink/table/catalog/ResolvedCatalogTable;

具体如下表:

Flink CDC Connector VersionFlink Version
1.0.01.11.*
1.1.01.11.*
1.2.01.12.*
1.3.01.12.*
1.4.01.13.*
2.0.*1.13.*

 

三、建表

1.MySQL建表:

CREATE TABLE flink_test(id INT ,name VARCHAR(20));

2.Doris建表

2.1启动doris

不懂如何启动可以看这里

2.2Flink连接doris驱动

Flink连接doris需要flink-doris-connector包,如果你懒得编译,可以从这边下载,下面的编译步骤就免了。

驱动编译过程:

首先到Doris官网把整个项目下载下来,然后解压

unzip incubator-doris-master.zip

cd incubator-doris-master/extension/flink-doris-connector

./build.sh  

 如果遇到报错./build.sh: Permission denied  那就修改权限 chmod 777 build.sh

如果遇到报错./build.sh: line 43: mvn: command not found

Error: mvn is not found 那就安装一下maven可以看到这里

等到N久之后,然后你可能遇到报错,无力吐槽啊:

[ERROR] thrift failed output:

[ERROR] thrift failed error: /bin/sh: /opt/pkg/incubator-doris-master/extension/flink-doris-connector/../../thirdparty/installed/bin/thrift: No such file or directory

[INFO] BUILD FAILURE

[ERROR] Failed to execute goal org.apache.thrift.tools:maven-thrift-plugin:0.1.11:compile (thrift-sources) on project doris-flink: thrift did not exit cleanly. Review output for more information. -> [Help 1]

好吧,那就安装thrift咯。安装过程中可能有报C++错误configure: No compiler with C++11 support was found,那就yum install -y gcc gcc-c++安装一下

#版本别太新哈0.93就行,不然可能报错

1.下载

wget http://mirrors.cnnic.cn/apache/thrift/0.9.3/thrift-0.9.3.tar.gz

或者wget http://archive.apache.org/dist/thrift/0.9.3/

2.解压编译

tar -zxf thrift-0.9.3.tar.gz

cd thrift-0.9.3

./configure --with-lua=no && make && make install

3.验证

thrift -version

4.把thrift复制到thirdparty/installed/bin 目录下,目录如果不存在需要手工创建

cp /usr/local/bin/thrift /opt/pkg/incubator-doris-master/thirdparty/installed/bin

又等待N久,继续执行./build.sh

注意,默认flink版本是1.12版本,如果是1.13版本,需要修改incubator-doris-master/extension/flink-doris-connector下面的pom.xml把property修改一下

Flink CDC和Flink SQL构建实时数仓Flink写入Doris,第2张

虽然短短几行代码,但是踩坑了不少,等待时间又很久,如果有人不想编译,可以到这边下载我编译好的。注意我这个flink版本是1.13.3,scala版本是2.12哈。

2.3 Doris建表

 mysql -h 172.16.37.29 -P 9030 -uroot

CREATE TABLE test_cnt
(
    id int,
    name varchar(50)
)
DISTRIBUTED BY HASH(id) BUCKETS 10
PROPERTIES("replication_num" = "1",
"in_memory" = "false",
"storage_format" = "V2");

 3.启动flink并建表

3.1启动fink

在$FLINK_HOME/bin目录输入./start-cluster.sh

3.2 flink建表

输入./sql-client.sh embedded启动FLINK SQL客户端,FLINK SQL有表模式,日志变更模式和Tableau模式,本次采用表模式,所以启动之后输入 SET sql-client.execution.result-mode=table;

创建mysql source:

 

 CREATE TABLE flink_mysql_src(

 id INT NOT NULL,

 name STRING

) WITH (

 'connector' = 'mysql-cdc',

 'hostname' = '139.xxx.xx.xx',

 'port' = '3306',

 'username' = 'xxxx',

 'password' = 'xxx',

 'database-name' = 'xx',

 'table-name' = 'flink_test',

 'scan.incremental.snapshot.enabled' = 'false'

);

注意,在flink1.13版本支持根据mysql主键多并发读取数据功能,如果mysql没有设置主键,with里面要加'scan.incremental.snapshot.enabled' = 'false'否则会报错:

[ERROR] Could not execute SQL statement. Reason:

org.apache.flink.table.api.ValidationException: The primary key is necessary when enable 'Key: 'scan.incremental.snapshot.enabled' , default: true (fallback keys: [])' to 'true'

看一下是否能监控到MySQL数据:

 

在mysql中输入

insert into flink_test values(1,'a');

insert into flink_test values(2,'b');

insert into flink_test values(3,'c');

在flink sql输入:

select * from flink_mysql_src;

可以看到结果已经输出到flink控制台了,说明flink到mysql这端数据传输是OK的

如果遇到报错:ClassNotFoundException: com.ververica.cdc.debezium.DebeziumSourceFunction那就把flink-connector-debezium-2.0.2.jar也放到lib目录下面

创建doris sink:

CREATE TABLE flink_doris_sink (

    id int,

    name string

    ) 

    WITH (

      'connector' = 'doris',

      'fenodes' = 'localhost:8030',

      'table.identifier' = 'zh.test_cnt',

      'sink.batch.size'='2',

      'username' = 'root',

      'password'=''

);

select * from flink_doris_sink看看有没有报错。 

Flink CDC和Flink SQL构建实时数仓Flink写入Doris,第3张

 

 

如果报错[ERROR] Could not execute SQL statement. Reason:

java.lang.RuntimeException: can not fetch partitions 说明数据库不存在或者表不存在,注意看建表语句。

如果报错[ERROR] Could not execute SQL statement. Reason: java.lang.ClassNotFoundException: org.apache.doris.flink.table.DorisRowDataInputForma 说明doris-flink-1.0-SNAPSHOT.jar编译有问题,看看自己版本对不对,不对重新改一下pom重新编译

四、实践

1.flink执行任务

INSERT INTO flink_doris_sink

SELECT id,name

FROM flink_mysql_src;

可以到flink网页端看到任务的情况了
Flink CDC和Flink SQL构建实时数仓Flink写入Doris,第4张

2.往mysql插入数据

insert into flink_test values(1,'a');

在doris 中查询,发现数据已经过来了

 Flink CDC和Flink SQL构建实时数仓Flink写入Doris,第5张

3.变更数据

在mysql中执行update flink_test  set name='tests' where id=1

在doris中查询发现数据已经变更了,不过变成了两条记录,flink_doris_connector暂时不支持删除,据说后续版本会更新,那就期待一下吧。 
Flink CDC和Flink SQL构建实时数仓Flink写入Doris,第6张

注意,在flink1.13版本支持根据mysql主键多并发读取数据功能,如果mysql没有设置主键,with里面要加'scan.incremental.snapshot.enabled' = 'false'否则会报错:

 

网友评论

搜索
最新文章
热门文章
热门标签