Flink SQL
1、Sql命令行
1、使用方式
-- 1、启动一个flink集群,独立集群,yarn-session模式 yarn-session.sh -d -- 2、启动sql命令行 sql-client.sh -- 3、再流上定义表 -- 再flink中创建表相当于创建一个视图(视图中不存数据,只有查询视图时才会去原表中读取数据) CREATE TABLE abc ( sid STRING, name STRING, age INT, sex STRING, clazz STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'abc', 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv' ); -- 4、查询数据(连续查询) select clazz,count(1) as c from students group by clazz; kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic abc 1500100001,施笑槐,22,女,文科十班 1500100002,吕金鹏,24,男,文科六班 1500100003,单乐蕊,22,女,理科六班 1500100004,葛德曜,24,男,理科三班 1500100005,宣谷芹,22,女,理科五班
2、输出结果模式
-- 表格模式(默认)(table mode)在内存中实体化结果,并将结果用规则的分页表格可视化展示出来 SET 'sql-client.execution.result-mode' = 'table'; -- 变更日志模式(changelog mode)不会实体化和可视化结果,而是由插入(+)和撤销(-)组成的持续查询产生结果流。 SET 'sql-client.execution.result-mode' = 'changelog'; -- Tableau模式(tableau mode)更接近传统的数据库,会将执行的结果以制表的形式直接打在屏幕之上。具体显示的内容会取决于作业 执行模式的不同(execution.type): SET 'sql-client.execution.result-mode' = 'tableau';
2、SQL流批一体
1、流处理
1、流处理模式可以用于处理有界流和无界流
2、流处理模式输出连续结果
3、流处理模式底层十持续流模型,上游task和下游task同时启动等待数据到达
SET 'execution.runtime-mode' = 'streaming';
2、批处理
1、只能用于处理有界流
2、输出最终结果
3、批处理模式底层十mr模型,先执行上游task再执行下游task,会再map端对数据进行预聚合
SET 'execution.runtime-mode' = 'batch'; -- 创建一个有界流的表 CREATE TABLE students_hdfs ( sid STRING, name STRING, age INT, sex STRING, clazz STRING )WITH ( 'connector' = 'filesystem', -- 必选:指定连接器类型 'path' = 'hdfs://master:9000/data/spark/stu/students.txt', -- 必选:指定路径 'format' = 'csv' -- 必选:文件系统连接器指定 format ); select clazz,count(1) as c from students_hdfs group by clazz
3、Flink SQL连接器
1、kafka
1、kafka source
-- 创建kafka 表 CREATE TABLE students_kafka ( `offset` BIGINT METADATA VIRTUAL, -- 偏移量 `event_time` TIMESTAMP(3) METADATA FROM 'timestamp', --数据进入kafka的时间,可以当作事件时间使用 sid STRING, name STRING, age INT, sex STRING, clazz STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'students', -- 数据的topic 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表 'properties.group.id' = 'testGroup', -- 消费者组 'scan.startup.mode' = 'earliest-offset', -- 读取数据的位置earliest-offset latest-offset 'format' = 'csv' -- 读取数据的格式 );
2、kafka sink
-- 创建kafka 表 CREATE TABLE students_kafka_sink ( sid STRING, name STRING, age INT, sex STRING, clazz STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'students_sink', -- 数据的topic 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表 'properties.group.id' = 'testGroup', -- 消费者组 'scan.startup.mode' = 'earliest-offset', -- 读取数据的位置earliest-offset latest-offset 'format' = 'csv' -- 读取数据的格式 ); -- 将查询结果保存到kafka中 insert into students_kafka_sink select * from students_hdfs; kafka-console-consumer.sh --bootstrap-server master:9092,node1:9092,node2:9092 --from-beginning --topic students_sink
3、将更新的流写入kafka
CREATE TABLE clazz_num_kafka ( clazz STRING, num BIGINT ) WITH ( 'connector' = 'kafka', 'topic' = 'clazz_num', -- 数据的topic 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表 'properties.group.id' = 'testGroup', -- 消费者组 'scan.startup.mode' = 'earliest-offset', -- 读取数据的位置earliest-offset latest-offset 'format' = 'canal-json' -- 读取数据的格式 ); -- 将更新的数据写入kafka需要使用canal-json格式,数据中会带上操作类型 {"data":[{"clazz":"文科一班","num":71}],"type":"INSERT"} {"data":[{"clazz":"理科三班","num":67}],"type":"DELETE"} insert into clazz_num_kafka select clazz,count(1) as num from students group by clazz; kafka-console-consumer.sh --bootstrap-server master:9092,node1:9092,node2:9092 --from-beginning --topic clazz_num
2、hdfs
1、hdfs source
flink读取文件可以使用有界流方式,也可以使用无界流方式
-- 有界流 CREATE TABLE students_hdfs_batch ( sid STRING, name STRING, age INT, sex STRING, clazz STRING )WITH ( 'connector' = 'filesystem', -- 必选:指定连接器类型 'path' = 'hdfs://master:9000/data/student', -- 必选:指定路径 'format' = 'csv' -- 必选:文件系统连接器指定 format ); select * from students_hdfs_batch; -- 无界流 -- 基于hdfs做流处理,读取数据是以文件为单位,延迟比kafka大 CREATE TABLE students_hdfs_stream ( sid STRING, name STRING, age INT, sex STRING, clazz STRING )WITH ( 'connector' = 'filesystem', -- 必选:指定连接器类型 'path' = 'hdfs://master:9000/data/student', -- 必选:指定路径 'format' = 'csv' , -- 必选:文件系统连接器指定 format 'source.monitor-interval' = '5000' -- 每隔一段时间扫描目录,生成一个无界流 ); select * from students_hdfs_stream;
2、hdfs sink
-- 1、批处理模式(使用方式和底层原理和hive类似) SET 'execution.runtime-mode' = 'batch'; -- 创建表 CREATE TABLE clazz_num_hdfs ( clazz STRING, num BIGINT )WITH ( 'connector' = 'filesystem', -- 必选:指定连接器类型 'path' = 'hdfs://master:9000/data/clazz_num', -- 必选:指定路径 'format' = 'csv' -- 必选:文件系统连接器指定 format ); -- 将查询结果保存到表中 insert into clazz_num_hdfs select clazz,count(1) as num from students_hdfs_batch group by clazz; -- 2、流处理模式 SET 'execution.runtime-mode' = 'streaming'; -- 创建表,如果查询数据返回的十更新更改的流需要使用canal-json格式 CREATE TABLE clazz_num_hdfs_canal_json ( clazz STRING, num BIGINT )WITH ( 'connector' = 'filesystem', -- 必选:指定连接器类型 'path' = 'hdfs://master:9000/data/clazz_num_canal_json', -- 必选:指定路径 'format' = 'canal-json' -- 必选:文件系统连接器指定 format ); insert into clazz_num_hdfs_canal_json select clazz,count(1) as num from students_hdfs_stream group by clazz;
3、MySQL
1、整合
# 1、上传依赖包到flink 的lib目录下/usr/local/soft/flink-1.15.2/lib flink-connector-jdbc-1.15.2.jar mysql-connector-java-5.1.49.jar # 2、需要重启flink集群 yarn application -kill [appid] yarn-session.sh -d # 3、重新进入sql命令行 sql-client.sh
2、mysql source
-- 有界流 -- flink中表的字段类型和字段名需要和mysql保持一致 CREATE TABLE students_jdbc ( id BIGINT, name STRING, age BIGINT, gender STRING, clazz STRING, PRIMARY KEY (id) NOT ENFORCED -- 主键 ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://master:3306/student', 'table-name' = 'students', 'username' ='root', 'password' ='123456' ); select * from students_jdbc limit 10;
3、mysql sink
-- 创建kafka 表 CREATE TABLE students_kafka ( `offset` BIGINT METADATA VIRTUAL, -- 偏移量 `event_time` TIMESTAMP(3) METADATA FROM 'timestamp', --数据进入kafka的时间,可以当作事件时间使用 sid STRING, name STRING, age INT, sex STRING, clazz STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'students', -- 数据的topic 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表 'properties.group.id' = 'testGroup', -- 消费者组 'scan.startup.mode' = 'earliest-offset', -- 读取数据的位置earliest-offset latest-offset 'format' = 'csv' -- 读取数据的格式 ); -- 创建mysql sink表 CREATE TABLE clazz_num_mysql ( clazz STRING, num BIGINT, PRIMARY KEY (clazz) NOT ENFORCED -- 主键 ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://master:3306/student', 'table-name' = 'clazz_num', 'username' ='root', 'password' ='123456' ); --- 再mysql创建接收表 CREATE TABLE clazz_num ( clazz varchar(10), num BIGINT, PRIMARY KEY (clazz) -- 主键 ) ; -- 将sql查询结果实时写入mysql -- 将更新更改的流写入mysql,flink会自动按照主键更新数据 insert into clazz_num_mysql select clazz, count(1) as num from students_kafka group by clazz; kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic students 1500100001,施笑槐,22,女,文科六班
4、DataGen
用于生成随机数据,一般用在高性能测试上
-- 创建包(只能用于source表) CREATE TABLE students_datagen ( sid STRING, name STRING, age INT, sex STRING, clazz STRING ) WITH ( 'connector' = 'datagen', 'rows-per-second'='5', -- 每秒随机生成的数据量 'fields.age.min'='1', 'fields.age.max'='100', 'fields.sid.length'='10', 'fields.name.length'='2', 'fields.sex.length'='1', 'fields.clazz.length'='4' );
5、print
用于高性能测试
只能用于sink表
CREATE TABLE print_table ( sid STRING, name STRING, age INT, sex STRING, clazz STRING ) WITH ( 'connector' = 'print' ); insert into print_table select * from students_datagen;
6、BlackHole
用于高性能测试
CREATE TABLE blackhole_table ( sid STRING, name STRING, age INT, sex STRING, clazz STRING ) WITH ( 'connector' = 'blackhole' ); insert into blackhole_table select * from students_datagen;
4、SQL语法
1、Hints
提示执行,在flink中可以用于动态修改表的属性,在spark中可以用于广播表
CREATE TABLE students_kafka ( `offset` BIGINT METADATA VIRTUAL, -- 偏移量 `event_time` TIMESTAMP(3) METADATA FROM 'timestamp', --数据进入kafka的时间,可以当作事件时间使用 sid STRING, name STRING, age INT, sex STRING, clazz STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'students', -- 数据的topic 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表 'properties.group.id' = 'testGroup', -- 消费者组 'scan.startup.mode' = 'latest-offset', -- 读取数据的位置earliest-offset latest-offset 'format' = 'csv' -- 读取数据的格式 ); -- 动态修改表属性,可以在查询数据时修改读取kafka数据的位置,不需要重新创建表 select * from students_kafka /*+ OPTIONS('scan.startup.mode' = 'earliest-offset') */; -- 有界流 CREATE TABLE students_hdfs ( sid STRING, name STRING, age INT, sex STRING, clazz STRING )WITH ( 'connector' = 'filesystem', -- 必选:指定连接器类型 'path' = 'hdfs://master:9000/data/student', -- 必选:指定路径 'format' = 'csv' -- 必选:文件系统连接器指定 format ); -- 可以在查询hdfs时,动态改成无界流 select * from students_hdfs /*+ OPTIONS('source.monitor-interval' = '5000' ) */;
2、WITH
-- tmp别名代表的时子查询的sql,可以在后面的sql中多次使用 with tmp as ( select * from students_hdfs /*+ OPTIONS('source.monitor-interval' = '5000' ) */ where clazz='文科一班' ) select * from tmp union all select * from tmp;
3、DISTINCT
在flink 的流处理中,使用distinct,flink需要将之前的数据保存在状态中,如果数据一直增加,状态会越来越大
状态越来越大,checkpoint时间会增加,最终会导致flink任务出问题
select count(distinct sid) from students_kafka /*+ OPTIONS('scan.startup.mode' = 'earliest-offset') */; select count(sid) from ( select distinct * from students_kafka /*+ OPTIONS('scan.startup.mode' = 'earliest-offset') */ );
4、窗口函数(TVFs)
1、创建表
-- 创建kafka 表 CREATE TABLE bid ( bidtime TIMESTAMP(3), price DECIMAL(10, 2) , item STRING, WATERMARK FOR bidtime AS bidtime ) WITH ( 'connector' = 'kafka', 'topic' = 'bid', -- 数据的topic 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表 'properties.group.id' = 'testGroup', -- 消费者组 'scan.startup.mode' = 'latest-offset', -- 读取数据的位置earliest-offset latest-offset 'format' = 'csv' -- 读取数据的格式 ); kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic bid 2020-04-15 08:05:00,4.00,C 2020-04-15 08:07:00,2.00,A 2020-04-15 08:09:00,5.00,D 2020-04-15 08:11:00,3.00,B 2020-04-15 08:13:00,1.00,E 2020-04-15 08:17:00,6.00,F
2、滚动窗口
1、事件时间
-- TUMBLE: 滚动窗口函数,函数的作用时在原表的基础上增加[窗口开始时间,窗口结束时间,窗口时间] -- TABLE;表函数,将里面函数的结果转换成动态表 SELECT * FROM TABLE( TUMBLE(TABLE bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES) ); -- 在基于窗口函数提供的字段进行聚合计算 -- 实时统计每隔商品的总的金额,每隔10分钟统计一次 SELECT item, window_start, window_end, sum(price) as sum_price FROM TABLE( -- 滚动的事件时间窗口 TUMBLE(TABLE bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES) ) group by item,window_start,window_end;
2、处理时间
CREATE TABLE words ( word STRING, proctime as PROCTIME() -- 定义处理时间,PROCTIME:获取处理时间的函数 ) WITH ( 'connector' = 'kafka', 'topic' = 'words', -- 数据的topic 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表 'properties.group.id' = 'testGroup', -- 消费者组 'scan.startup.mode' = 'latest-offset', -- 读取数据的位置earliest-offset latest-offset 'format' = 'csv' -- 读取数据的格式 ); kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic words java spark -- 在flink SQL中处理时间和事件时间的sql语法没有区别 SELECT * FROM TABLE( TUMBLE(TABLE words, DESCRIPTOR(proctime), INTERVAL '5' SECOND) ); SELECT word,window_start,window_end, count(1) as c FROM TABLE( TUMBLE(TABLE words, DESCRIPTOR(proctime), INTERVAL '5' SECOND) ) group by word,window_start,window_end
3、滑动窗口
-- HOP: 滑动窗口函数 -- 滑动窗口一条数据可能会落到多个窗口中 SELECT * FROM TABLE( HOP(TABLE bid, DESCRIPTOR(bidtime),INTERVAL '5' MINUTES, INTERVAL '10' MINUTES) ); -- 每隔5分钟计算最近10分钟所有商品总的金额 SELECT window_start, window_end, sum(price) as sum_price FROM TABLE( HOP(TABLE bid, DESCRIPTOR(bidtime),INTERVAL '5' MINUTES, INTERVAL '10' MINUTES) ) group by window_start,window_end
4、会话窗口
CREATE TABLE words ( word STRING, proctime as PROCTIME() -- 定义处理时间,PROCTIME:获取处理时间的函数 ) WITH ( 'connector' = 'kafka', 'topic' = 'words', -- 数据的topic 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表 'properties.group.id' = 'testGroup', -- 消费者组 'scan.startup.mode' = 'latest-offset', -- 读取数据的位置earliest-offset latest-offset 'format' = 'csv' -- 读取数据的格式 ); kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic words java spark select word, SESSION_START(proctime,INTERVAL '5' SECOND) as window_start, SESSION_END(proctime,INTERVAL '5' SECOND) as window_end, count(1) as c from words group by word,SESSION(proctime,INTERVAL '5' SECOND);
5、OVER聚合
1、批处理
在flink批处理模式下,over函数和hive是一样的
SET 'execution.runtime-mode' = 'batch'; -- 有界流 CREATE TABLE students_hdfs_batch ( sid STRING, name STRING, age INT, sex STRING, clazz STRING )WITH ( 'connector' = 'filesystem', -- 必选:指定连接器类型 'path' = 'hdfs://master:9000/data/student', -- 必选:指定路径 'format' = 'csv' -- 必选:文件系统连接器指定 format ); -- row_number,sum,count,avg,lag,lead,max,min -- 获取每隔班级年龄最大的前两个学生 select * from( select *, row_number() over(partition by clazz order by age desc) as r from students_hdfs_batch ) as a where r <=2
2、流处理
flink流处理中over聚合使用限制
1、order by 字段必须是时间字段升序排序或者使用over_number时可以增加条件过滤
SET 'execution.runtime-mode' = 'streaming'; -- 创建kafka 表 CREATE TABLE students_kafka ( sid STRING, name STRING, age INT, sex STRING, clazz STRING, proctime as PROCTIME() ) WITH ( 'connector' = 'kafka', 'topic' = 'students', -- 数据的topic 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表 'properties.group.id' = 'testGroup', -- 消费者组 'scan.startup.mode' = 'earliest-offset', -- 读取数据的位置earliest-offset latest-offset 'format' = 'csv' -- 读取数据的格式 ); -- 在流处理模式下,flink只能按照时间字段进行升序排序 -- 如果按照一个普通字段进行排序,在流处理模式下,每来一条新的数据都需重新计算之前的顺序,计算代价太大 -- 在row_number基础上增加条件,可以限制计算的代价不断增加 select * from ( select *, row_number() over(partition by clazz order by age desc) as r from students_kafka ) where r <= 2; -- 在流处理模式下,flink只能按照时间字段进行升序排序 select *, sum(age) over(partition by clazz order by proctime) from students_kafka -- 时间边界 -- RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW select *, sum(age) over( partition by clazz order by proctime -- 统计最近10秒的数据 RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW ) from students_kafka /*+ OPTIONS('scan.startup.mode' = 'latest-offset') */; -- 数据边界 --ROWS BETWEEN 10 PRECEDING AND CURRENT ROW select *, sum(age) over( partition by clazz order by proctime -- 统计最近10秒的数据 ROWS BETWEEN 2 PRECEDING AND CURRENT ROW ) from students_kafka /*+ OPTIONS('scan.startup.mode' = 'latest-offset') */; kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic students 1500100003,单乐蕊,22,女,理科六班
6、ORDER BY
-- 排序字段必须带上时间升序排序 select * from students_kafka order by proctime,age; -- 限制排序的计算代价 select * from students_kafka order by age limit 10;
7、row_number去重
CREATE TABLE students_kafka ( sid STRING, name STRING, age INT, sex STRING, clazz STRING, proctime as PROCTIME() ) WITH ( 'connector' = 'kafka', 'topic' = 'students', -- 数据的topic 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表 'properties.group.id' = 'testGroup', -- 消费者组 'scan.startup.mode' = 'earliest-offset', -- 读取数据的位置earliest-offset latest-offset 'format' = 'csv' -- 读取数据的格式 ); kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic students 1500100003,单乐蕊,22,女,理科六班 select * from ( select sid,name,age, row_number() over(partition by sid order by proctime) as r from students_kafka /*+ OPTIONS('scan.startup.mode' = 'latest-offset') */ ) where r = 1;
8、JOIN
Regular Joins: 主要用于批处理,如果在流处理上使用,状态会越来越大
Interval Join: 主要用于双流join
Temporal Joins:用于流表关联时态表(不同时间状态不一样,比如汇率表)
Lookup Join:用于流表关联维表(不怎么变化的表)
1、Regular Joins
常规join,和hive spark sql的join是一样的
1、批处理模式
CREATE TABLE students_hdfs_batch ( sid STRING, name STRING, age INT, sex STRING, clazz STRING )WITH ( 'connector' = 'filesystem', -- 必选:指定连接器类型 'path' = 'hdfs://master:9000/data/student', -- 必选:指定路径 'format' = 'csv' -- 必选:文件系统连接器指定 format ); CREATE TABLE score_hdfs_batch ( sid STRING, cid STRING, score INT )WITH ( 'connector' = 'filesystem', -- 必选:指定连接器类型 'path' = 'hdfs://master:9000/data/score', -- 必选:指定路径 'format' = 'csv' -- 必选:文件系统连接器指定 format ); SET 'execution.runtime-mode' = 'batch'; -- inner join select a.sid,a.name,b.score from students_hdfs_batch as a inner join score_hdfs_batch as b on a.sid=b.sid; -- left join select a.sid,a.name,b.score from students_hdfs_batch as a left join score_hdfs_batch as b on a.sid=b.sid; -- full join select a.sid,a.name,b.score from students_hdfs_batch as a full join score_hdfs_batch as b on a.sid=b.sid;
2、流处理模式
CREATE TABLE students_kafka ( sid STRING, name STRING, age INT, sex STRING, clazz STRING )WITH ( 'connector' = 'kafka', 'topic' = 'students', -- 数据的topic 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表 'properties.group.id' = 'testGroup', -- 消费者组 'scan.startup.mode' = 'latest-offset', -- 读取数据的位置earliest-offset latest-offset 'format' = 'csv', -- 读取数据的格式 'csv.ignore-parse-errors' = 'true' -- 如果数据解析异常自动跳过当前行 ); kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic students 1500100001,施笑槐,22,女,文科六班 1500100002,吕金鹏,24,男,文科六班 1500100003,单乐蕊,22,女,理科六班 CREATE TABLE score_kafka ( sid STRING, cid STRING, score INT )WITH ( 'connector' = 'kafka', 'topic' = 'scores', -- 数据的topic 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表 'properties.group.id' = 'testGroup', -- 消费者组 'scan.startup.mode' = 'latest-offset', -- 读取数据的位置earliest-offset latest-offset 'format' = 'csv', -- 读取数据的格式 'csv.ignore-parse-errors' = 'true' ); kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic scores 1500100001,1000001,98 1500100001,1000002,5 1500100001,1000003,137 SET 'execution.runtime-mode' = 'streaming'; -- 使用常规关联方式做流处理,flink会将两个表的数据一直保存在状态中,状态会越来越大 -- 可以设置状态有效期避免状态无限增大 SET 'table.exec.state.ttl' = '5000'; -- full join select a.sid,b.sid,a.name,b.score from students_kafka as a full join score_kafka as b on a.sid=b.sid;
2、Interval Join
两个表在join时只关联一段时间内的数据,之前的数据就不需要保存在状态中,可以避免状态无限增大
CREATE TABLE students_kafka_time ( sid STRING, name STRING, age INT, sex STRING, clazz STRING, ts TIMESTAMP(3), WATERMARK FOR ts AS ts - INTERVAL '5' SECOND )WITH ( 'connector' = 'kafka', 'topic' = 'students', -- 数据的topic 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表 'properties.group.id' = 'testGroup', -- 消费者组 'scan.startup.mode' = 'latest-offset', -- 读取数据的位置earliest-offset latest-offset 'format' = 'csv', -- 读取数据的格式 'csv.ignore-parse-errors' = 'true' -- 如果数据解析异常自动跳过当前行 ); kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic students 1500100001,施笑槐,22,女,文科六班,2023-11-10 17:10:10 1500100001,吕金鹏,24,男,文科六班,2023-11-10 17:10:11 1500100001,单乐蕊,22,女,理科六班,2023-11-10 17:10:12 CREATE TABLE score_kafka_time ( sid STRING, cid STRING, score INT, ts TIMESTAMP(3), WATERMARK FOR ts AS ts - INTERVAL '5' SECOND )WITH ( 'connector' = 'kafka', 'topic' = 'scores', -- 数据的topic 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表 'properties.group.id' = 'testGroup', -- 消费者组 'scan.startup.mode' = 'latest-offset', -- 读取数据的位置earliest-offset latest-offset 'format' = 'csv', -- 读取数据的格式 'csv.ignore-parse-errors' = 'true' ); kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic scores 1500100001,1000001,98,2023-11-10 17:10:09 1500100001,1000002,5,2023-11-10 17:10:11 1500100001,1000003,137,2023-11-10 17:10:12 -- a.ts BETWEEN b.ts - INTERVAL '5' SECOND AND b.ts -- a表数据的时间需要在b表数据的时间减去5秒到b表数据时间的范围内 SELECT a.sid,b.sid,a.name,b.score FROM students_kafka_time a, score_kafka_time b WHERE a.sid = b.sid AND a.ts BETWEEN b.ts - INTERVAL '5' SECOND AND b.ts
3、Temporal Joins
用于流表关联时态表,比如订单表和汇率表的关联
-- 订单表 CREATE TABLE orders ( order_id STRING, -- 订单编号 price DECIMAL(32,2), --订单金额 currency STRING, -- 汇率编号 order_time TIMESTAMP(3), -- 订单时间 WATERMARK FOR order_time AS order_time -- 水位线 ) WITH ( 'connector' = 'kafka', 'topic' = 'orders', -- 数据的topic 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表 'properties.group.id' = 'testGroup', -- 消费者组 'scan.startup.mode' = 'latest-offset', -- 读取数据的位置earliest-offset latest-offset 'format' = 'csv' -- 读取数据的格式 ); kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic orders 001,100,CN,2023-11-11 09:48:10 002,200,CN,2023-11-11 09:48:11 003,300,CN,2023-11-11 09:48:14 004,400,CN,2023-11-11 09:48:16 005,500,CN,2023-11-11 09:48:18 -- 汇率表 CREATE TABLE currency_rates ( currency STRING, -- 汇率编号 conversion_rate DECIMAL(32, 2), -- 汇率 update_time TIMESTAMP(3), -- 汇率更新时间 WATERMARK FOR update_time AS update_time, -- 水位线 PRIMARY KEY(currency) NOT ENFORCED -- 主键 ) WITH ( 'connector' = 'kafka', 'topic' = 'currency_rates', -- 数据的topic 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表 'properties.group.id' = 'testGroup', -- 消费者组 'scan.startup.mode' = 'earliest-offset', -- 读取数据的位置earliest-offset latest-offset 'format' = 'canal-json' -- 读取数据的格式 ); insert into currency_rates values ('CN',7.2,TIMESTAMP'2023-11-11 09:48:05'), ('CN',7.1,TIMESTAMP'2023-11-11 09:48:10'), ('CN',6.9,TIMESTAMP'2023-11-11 09:48:15'), ('CN',7.4,TIMESTAMP'2023-11-11 09:48:20'); kafka-console-consumer.sh --bootstrap-server master:9092,node1:9092,node2:9092 --from-beginning --topic currency_rates -- 如果使用常规关联方式,取的时最新的汇率,不是对应时间的汇率 select a.order_id,b.* from orders as a left join currency_rates as b on a.currency=b.currency; -- 时态表join -- FOR SYSTEM_TIME AS OF orders.order_time: 使用订单表的时间到汇率表中查询对应时间的数据 SELECT order_id, price, conversion_rate, order_time FROM orders LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time ON orders.currency = currency_rates.currency;
4、lookup join
-- 学生表 CREATE TABLE students_jdbc ( id BIGINT, name STRING, age BIGINT, gender STRING, clazz STRING, PRIMARY KEY (id) NOT ENFORCED -- 主键 ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://master:3306/student', 'table-name' = 'students', 'username' ='root', 'password' ='123456', 'lookup.cache.max-rows' = '1000', -- 缓存的最大行数 'lookup.cache.ttl' = '20000' -- 缓存过期时间 ); -- 分数表 CREATE TABLE score_kafka ( sid BIGINT, cid STRING, score INT, proc_time as PROCTIME() )WITH ( 'connector' = 'kafka', 'topic' = 'scores', -- 数据的topic 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表 'properties.group.id' = 'testGroup', -- 消费者组 'scan.startup.mode' = 'latest-offset', -- 读取数据的位置earliest-offset latest-offset 'format' = 'csv', -- 读取数据的格式 'csv.ignore-parse-errors' = 'true' ); kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic scores 1500100001,1000001,98 1500100001,1000002,5 1500100001,1000003,137 -- 使用常规关联方式,关联维度表 -- 1、任务在启动的时候会将维表加载到flink 的状态中,如果数据库中学生表更新了,flink不知道,关联不到最新的数据 select b.id,b.name,a.score from score_kafka as a left join students_jdbc as b on a.sid=b.id; -- lookup join -- FOR SYSTEM_TIME AS OF a.proc_time : 使用关联字段到维表中查询最新的数据 -- 优点: 流表每来一条数据都会去mysql中查询,可以关联到最新的数据 -- 每次查询mysql会降低性能 select b.id,b.name,a.score from score_kafka as a left join students_jdbc FOR SYSTEM_TIME AS OF a.proc_time as b on a.sid=b.id;
5、Flink SQL Checkpoint
checkpoiny可以定时将flink任务的状态持久化到hdfs中,任务执行失败重启可以保证中间结果不丢失
1、开启checkpoint
# 修改flink配置文件 vim flink-conf.yaml # checkppint 间隔时间 execution.checkpointing.interval: 1min # 任务手动取消时保存checkpoint execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION # 同时允许1个checkpoint执行 execution.checkpointing.max-concurrent-checkpoints: 1 execution.checkpointing.min-pause: 0 # 数据处理的语义 execution.checkpointing.mode: EXACTLY_ONCE # checkpoint超时时间 execution.checkpointing.timeout: 10min execution.checkpointing.tolerable-failed-checkpoints: 0 execution.checkpointing.unaligned: false # 状态后端(保存状态的位置,hashmap:内存) state.backend: hashmap # checkpoint路径 state.checkpoints.dir: hdfs://master:9000/flink/checkpoint
2、编写一个flink sql脚本
vim word_count.sql
-- 实时从kafka中读取单词,统计单词的数量,将结果保存到mysql中 -- 1、创建source表 CREATE TABLE words ( word STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'words', -- 数据的topic 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表 'properties.group.id' = 'testGroup', -- 消费者组 'scan.startup.mode' = 'earliest-offset', -- 读取数据的位置earliest-offset latest-offset 'format' = 'csv' -- 读取数据的格式 ); -- 2、创建sink表 CREATE TABLE word_count ( word STRING, num BIGINT, PRIMARY KEY (word) NOT ENFORCED -- 主键 ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://master:3306/student', 'table-name' = 'word_count', -- 需要手动到mysql中创建表 'username' ='root', 'password' ='123456' ); -- 3、编写sql处理数据将结果保存到sink表中 insert into word_count select word, count(1) as num from words group by word;
3、使用sql-client -f 启动任务
sql-client.sh -f word_count.sql
4、任务失败重启
-- 1、获取checkpoint的路径 /file/checkpoint/47ee348d8c9edadadfc770cf7de8e7ee/chk-23 -- 2、再sql脚本中增加参数,增加到sql脚本的inwet into 的前面 -- 指定任务会的checkpoint的地址 SET 'execution.savepoint.path' = 'hdfs://master:9000/file/checkpoint/47ee348d8c9edadadfc770cf7de8e7ee/chk-23'; -- 3、启动sql任务 sql-client.sh -f word_count.sql
6、反压
1、测试反压
反压:下游消费数据的速度比上游生成数据的速度小时会出现反压,下游导致上游task反压
CREATE TABLE words ( word STRING ) WITH ( 'connector' = 'datagen', 'rows-per-second'='100000', -- 每秒随机生成的数据量 'fields.word.length'='4' ); CREATE TABLE blackhole_table ( wprd STRING, num BIGINT ) WITH ( 'connector' = 'blackhole' ); insert into blackhole_table select word, count(1) as num from words /*+ OPTIONS('rows-per-second'='1000000','fields.word.length'='5') */ group by word;
2、增加资源
--1、增加并行度,一个并行度对应一个slot SET 'parallelism.default' = '2'; --2、增加内存 -- 如果状态太大,内存放不下导致的反压可以通过增加内存解决 -- -tm : taskmanager的内存 -- -jm : jobmanager的内存 yarn-session.sh -tm 4G -d
3、微批和预聚合
开启微批处理和预聚合,可以减少shuffle过程中传输的数据量,减轻下游算子计算的压力
-- 开启微批处理 set 'table.exec.mini-batch.enabled'='true'; -- 批次的时间 set 'table.exec.mini-batch.allow-latency'='5 s'; -- 批次大小 set 'table.exec.mini-batch.size'='5000'; -- 开启预聚合 set 'table.optimizer.agg-phase-strategy'='TWO_PHASE';
7、Flink整合hive
1、整合
# 1、将依赖包上传到flink的lib目录下 flink-sql-connector-hive-3.1.2_2.12-1.15.2.jar # 2、重启flink集群 yarn application -list yarn application -kill application_1699579932721_0003 yarn-session.sh -d # 3、重新进入sql命令行 sql-client.sh
2、hive catalog
catalog(元数据) —> database —> table —> 数据 — > 列
-- 1、开启hive的元数据服务 nohup hive --service metastore & -- 2、创建hive catalog CREATE CATALOG myhive WITH ( 'type' = 'hive', 'hive-conf-dir' = '/usr/local/soft/hive-3.1.2/conf' ); -- 查看所有的catalog -- default_catalog: 默认的元数据,将元数据保存在内存中 show catalogs; --3、切换catalog use catalog myhive; --4、在flink中就可以使用hive中已经创建好的表 select * from student; -- 可以从catalog开始定位一张表 select * from myhive.`default`.student; -- 将flink的表结构保存到hive catalog中 -- hive中可以看到flink创建的流表,但是在hive中不能查询flink的流表 create database flink; use flink; -- 创建flink动态表 CREATE TABLE students_kafka ( `offset` BIGINT METADATA VIRTUAL, -- 偏移量 `event_time` TIMESTAMP(3) METADATA FROM 'timestamp', --数据进入kafka的时间,可以当作事件时间使用 sid STRING, name STRING, age INT, sex STRING, clazz STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'students', -- 数据的topic 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表 'properties.group.id' = 'testGroup', -- 消费者组 'scan.startup.mode' = 'earliest-offset', -- 读取数据的位置earliest-offset latest-offset 'format' = 'csv' -- 读取数据的格式 );
3、hive functions
-- 加载hive函数 LOAD MODULE hive WITH ('hive-version' = '3.1.2'); -- 使用hive的函数 select split('java,spark',',');
练习
-- 1、实时统计道路拥堵情况, -- 统计最近15分钟,每隔1分钟统计一次, -- 统计车流量和平均车速 -- 将统计结果保存到数据库中 -- 1、创建kafka source表 CREATE TABLE cars ( card_id String,-- 车牌号 road_id String,-- 道路编号 city_id String,-- 城市编号 car_id String,-- 卡口编号 com_id String,-- 摄像头编号 fx String,-- 方向 county_id String,-- 区县 ts BIGINT,-- 时间 speed Double, -- 速度 event_time as TO_TIMESTAMP(FROM_UNIXTIME(ts)) ,-- 将时间戳转换成时间对象 -- 指定事件时间和水位线,水位线前移5秒 WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'cars', -- 数据的topic 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表 'properties.group.id' = 'testGroup', -- 消费者组 'scan.startup.mode' = 'earliest-offset', -- 读取数据的位置earliest-offset latest-offset 'format' = 'csv' -- 读取数据的格式 ); -- 创建mysql sink表 CREATE TABLE road_flow_avg_speed ( road_id STRING, win_start TIMESTAMP(3), win_end TIMESTAMP(3), flow BIGINT, avg_speed DOUBLE, PRIMARY KEY (road_id,win_start) NOT ENFORCED -- 主键 ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://master:3306/student', 'table-name' = 'road_flow_avg_speed', 'username' ='root', 'password' ='123456' ); -- 在mysql中建表 CREATE TABLE road_flow_avg_speed ( road_id varchar(10), win_start DATETIME , win_end DATETIME , flow BIGINT, avg_speed DOUBLE, PRIMARY KEY (road_id,win_start) -- 主键 ); -- 将查询结果保存到mysql insert into road_flow_avg_speed select road_id, HOP_START(event_time, INTERVAL '1' MINUTE, INTERVAL '15' MINUTE) as win_start, -- 窗口开始时间 HOP_END(event_time, INTERVAL '1' MINUTE, INTERVAL '15' MINUTE) as win_end, -- 窗口结束时间 count(1) as flow, avg(speed) as avg_speed from cars group by road_id, -- 滑动的事件时间窗口 HOP(event_time, INTERVAL '1' MINUTE, INTERVAL '15' MINUTE) ;
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章