上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

Flink SQL

guduadmin24小时前

Flink SQL

1、Sql命令行

1、使用方式

-- 1、启动一个flink集群,独立集群,yarn-session模式
yarn-session.sh -d
-- 2、启动sql命令行
sql-client.sh
-- 3、再流上定义表
-- 再flink中创建表相当于创建一个视图(视图中不存数据,只有查询视图时才会去原表中读取数据)
CREATE TABLE abc (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'abc',
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
);
-- 4、查询数据(连续查询)
select clazz,count(1) as c from 
students
group by clazz;
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic abc
1500100001,施笑槐,22,女,文科十班
1500100002,吕金鹏,24,男,文科六班
1500100003,单乐蕊,22,女,理科六班
1500100004,葛德曜,24,男,理科三班
1500100005,宣谷芹,22,女,理科五班

2、输出结果模式

-- 表格模式(默认)(table mode)在内存中实体化结果,并将结果用规则的分页表格可视化展示出来
SET 'sql-client.execution.result-mode' = 'table';
-- 变更日志模式(changelog mode)不会实体化和可视化结果,而是由插入(+)和撤销(-)组成的持续查询产生结果流。
SET 'sql-client.execution.result-mode' = 'changelog';
-- Tableau模式(tableau mode)更接近传统的数据库,会将执行的结果以制表的形式直接打在屏幕之上。具体显示的内容会取决于作业 执行模式的不同(execution.type):
SET 'sql-client.execution.result-mode' = 'tableau';

2、SQL流批一体

1、流处理

1、流处理模式可以用于处理有界流和无界流

2、流处理模式输出连续结果

3、流处理模式底层十持续流模型,上游task和下游task同时启动等待数据到达

SET 'execution.runtime-mode' = 'streaming'; 

2、批处理

1、只能用于处理有界流

2、输出最终结果

3、批处理模式底层十mr模型,先执行上游task再执行下游task,会再map端对数据进行预聚合

SET 'execution.runtime-mode' = 'batch';
-- 创建一个有界流的表
CREATE TABLE students_hdfs (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
)WITH (
  'connector' = 'filesystem',           -- 必选:指定连接器类型
  'path' = 'hdfs://master:9000/data/spark/stu/students.txt',  -- 必选:指定路径
  'format' = 'csv'                     -- 必选:文件系统连接器指定 format
);
select clazz,count(1) as c from 
students_hdfs
group by clazz

3、Flink SQL连接器

1、kafka

1、kafka source
-- 创建kafka 表
CREATE TABLE students_kafka (
    `offset` BIGINT METADATA VIRTUAL, -- 偏移量
    `event_time` TIMESTAMP(3) METADATA FROM 'timestamp', --数据进入kafka的时间,可以当作事件时间使用
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'students', -- 数据的topic
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
  'properties.group.id' = 'testGroup', -- 消费者组
  'scan.startup.mode' = 'earliest-offset', -- 读取数据的位置earliest-offset latest-offset
  'format' = 'csv' -- 读取数据的格式
);
2、kafka sink
-- 创建kafka 表
CREATE TABLE students_kafka_sink (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'students_sink', -- 数据的topic
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
  'properties.group.id' = 'testGroup', -- 消费者组
  'scan.startup.mode' = 'earliest-offset', -- 读取数据的位置earliest-offset latest-offset
  'format' = 'csv' -- 读取数据的格式
);
-- 将查询结果保存到kafka中
insert into students_kafka_sink
select * from students_hdfs;
kafka-console-consumer.sh --bootstrap-server  master:9092,node1:9092,node2:9092 --from-beginning --topic students_sink
3、将更新的流写入kafka
CREATE TABLE clazz_num_kafka (
    clazz STRING,
    num BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'clazz_num', -- 数据的topic
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
  'properties.group.id' = 'testGroup', -- 消费者组
  'scan.startup.mode' = 'earliest-offset', -- 读取数据的位置earliest-offset latest-offset
  'format' = 'canal-json' -- 读取数据的格式
);
-- 将更新的数据写入kafka需要使用canal-json格式,数据中会带上操作类型
{"data":[{"clazz":"文科一班","num":71}],"type":"INSERT"}
{"data":[{"clazz":"理科三班","num":67}],"type":"DELETE"}
insert into clazz_num_kafka
select clazz,count(1) as num from 
students
group by clazz;
kafka-console-consumer.sh --bootstrap-server  master:9092,node1:9092,node2:9092 --from-beginning --topic clazz_num

2、hdfs

1、hdfs source

flink读取文件可以使用有界流方式,也可以使用无界流方式

-- 有界流
CREATE TABLE students_hdfs_batch (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
)WITH (
  'connector' = 'filesystem',           -- 必选:指定连接器类型
  'path' = 'hdfs://master:9000/data/student',  -- 必选:指定路径
  'format' = 'csv'                     -- 必选:文件系统连接器指定 format
);
select * from students_hdfs_batch;
-- 无界流
-- 基于hdfs做流处理,读取数据是以文件为单位,延迟比kafka大
CREATE TABLE students_hdfs_stream (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
)WITH (
    'connector' = 'filesystem',           -- 必选:指定连接器类型
    'path' = 'hdfs://master:9000/data/student',  -- 必选:指定路径
    'format' = 'csv' ,                    -- 必选:文件系统连接器指定 format
    'source.monitor-interval' = '5000' -- 每隔一段时间扫描目录,生成一个无界流
);
select * from students_hdfs_stream;
2、hdfs sink
-- 1、批处理模式(使用方式和底层原理和hive类似)
SET 'execution.runtime-mode' = 'batch';
-- 创建表
CREATE TABLE clazz_num_hdfs (
    clazz STRING,
    num BIGINT
)WITH (
  'connector' = 'filesystem',           -- 必选:指定连接器类型
  'path' = 'hdfs://master:9000/data/clazz_num',  -- 必选:指定路径
  'format' = 'csv'                     -- 必选:文件系统连接器指定 format
);
-- 将查询结果保存到表中
insert into clazz_num_hdfs
select clazz,count(1) as num
from students_hdfs_batch
group by clazz;
-- 2、流处理模式
SET 'execution.runtime-mode' = 'streaming'; 
-- 创建表,如果查询数据返回的十更新更改的流需要使用canal-json格式
CREATE TABLE clazz_num_hdfs_canal_json (
    clazz STRING,
    num BIGINT
)WITH (
  'connector' = 'filesystem',           -- 必选:指定连接器类型
  'path' = 'hdfs://master:9000/data/clazz_num_canal_json',  -- 必选:指定路径
  'format' = 'canal-json'                     -- 必选:文件系统连接器指定 format
);
insert into clazz_num_hdfs_canal_json
select clazz,count(1) as num
from students_hdfs_stream
group by clazz;

3、MySQL

1、整合
# 1、上传依赖包到flink 的lib目录下/usr/local/soft/flink-1.15.2/lib
flink-connector-jdbc-1.15.2.jar
mysql-connector-java-5.1.49.jar
# 2、需要重启flink集群
yarn application -kill [appid]
yarn-session.sh -d
# 3、重新进入sql命令行
sql-client.sh
2、mysql source
-- 有界流
-- flink中表的字段类型和字段名需要和mysql保持一致
CREATE TABLE students_jdbc (
    id BIGINT,
    name STRING,
    age BIGINT,
    gender STRING,
    clazz STRING,
    PRIMARY KEY (id) NOT ENFORCED -- 主键
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://master:3306/student',
    'table-name' = 'students',
    'username' ='root',
    'password' ='123456'
);
select * from students_jdbc limit 10;
3、mysql sink
-- 创建kafka 表
CREATE TABLE students_kafka (
    `offset` BIGINT METADATA VIRTUAL, -- 偏移量
    `event_time` TIMESTAMP(3) METADATA FROM 'timestamp', --数据进入kafka的时间,可以当作事件时间使用
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'students', -- 数据的topic
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
  'properties.group.id' = 'testGroup', -- 消费者组
  'scan.startup.mode' = 'earliest-offset', -- 读取数据的位置earliest-offset latest-offset
  'format' = 'csv' -- 读取数据的格式
);
-- 创建mysql sink表
CREATE TABLE clazz_num_mysql (
    clazz STRING,
    num BIGINT,
    PRIMARY KEY (clazz) NOT ENFORCED -- 主键
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://master:3306/student',
    'table-name' = 'clazz_num',
    'username' ='root',
    'password' ='123456'
);
--- 再mysql创建接收表
CREATE TABLE clazz_num (
    clazz varchar(10),
    num BIGINT,
    PRIMARY KEY (clazz) -- 主键
) ;
-- 将sql查询结果实时写入mysql
-- 将更新更改的流写入mysql,flink会自动按照主键更新数据
insert into clazz_num_mysql
select 
clazz,
count(1) as num from 
students_kafka
group by clazz;
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic students
1500100001,施笑槐,22,女,文科六班

4、DataGen

用于生成随机数据,一般用在高性能测试上

-- 创建包(只能用于source表)
CREATE TABLE students_datagen (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
) WITH (
    'connector' = 'datagen',
    'rows-per-second'='5', -- 每秒随机生成的数据量
    'fields.age.min'='1',
    'fields.age.max'='100',
    'fields.sid.length'='10',
    'fields.name.length'='2',
    'fields.sex.length'='1',
    'fields.clazz.length'='4'
);

5、print

用于高性能测试

只能用于sink表

CREATE TABLE print_table (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
) WITH (
     'connector' = 'print'
);
insert into print_table
select * from students_datagen;

6、BlackHole

用于高性能测试

CREATE TABLE blackhole_table (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
) WITH (
  'connector' = 'blackhole'
);
insert into blackhole_table
select * from students_datagen;

4、SQL语法

1、Hints

提示执行,在flink中可以用于动态修改表的属性,在spark中可以用于广播表

CREATE TABLE students_kafka (
    `offset` BIGINT METADATA VIRTUAL, -- 偏移量
    `event_time` TIMESTAMP(3) METADATA FROM 'timestamp', --数据进入kafka的时间,可以当作事件时间使用
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'students', -- 数据的topic
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
  'properties.group.id' = 'testGroup', -- 消费者组
  'scan.startup.mode' = 'latest-offset', -- 读取数据的位置earliest-offset latest-offset
  'format' = 'csv' -- 读取数据的格式
);
-- 动态修改表属性,可以在查询数据时修改读取kafka数据的位置,不需要重新创建表
select * from students_kafka /*+ OPTIONS('scan.startup.mode' = 'earliest-offset') */;
-- 有界流
CREATE TABLE students_hdfs (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
)WITH (
    'connector' = 'filesystem',           -- 必选:指定连接器类型
    'path' = 'hdfs://master:9000/data/student',  -- 必选:指定路径
    'format' = 'csv'                     -- 必选:文件系统连接器指定 format
);
-- 可以在查询hdfs时,动态改成无界流
select * from students_hdfs /*+ OPTIONS('source.monitor-interval' = '5000' )  */;

2、WITH

-- tmp别名代表的时子查询的sql,可以在后面的sql中多次使用
with tmp as (
    select * from students_hdfs 
    /*+ OPTIONS('source.monitor-interval' = '5000' )  */
    where clazz='文科一班'
)
select * from tmp
union all
select * from tmp;

3、DISTINCT

在flink 的流处理中,使用distinct,flink需要将之前的数据保存在状态中,如果数据一直增加,状态会越来越大

状态越来越大,checkpoint时间会增加,最终会导致flink任务出问题

select 
count(distinct sid) 
from students_kafka /*+ OPTIONS('scan.startup.mode' = 'earliest-offset') */;
select 
    count(sid)  
from (
    select 
    distinct *
    from students_kafka /*+ OPTIONS('scan.startup.mode' = 'earliest-offset') */
);

4、窗口函数(TVFs)

1、创建表
-- 创建kafka 表
CREATE TABLE bid (
    bidtime  TIMESTAMP(3),
    price  DECIMAL(10, 2) ,
    item  STRING,
    WATERMARK FOR bidtime AS bidtime
) WITH (
  'connector' = 'kafka',
  'topic' = 'bid', -- 数据的topic
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
  'properties.group.id' = 'testGroup', -- 消费者组
  'scan.startup.mode' = 'latest-offset', -- 读取数据的位置earliest-offset latest-offset
  'format' = 'csv' -- 读取数据的格式
);
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic bid
2020-04-15 08:05:00,4.00,C
2020-04-15 08:07:00,2.00,A
2020-04-15 08:09:00,5.00,D
2020-04-15 08:11:00,3.00,B
2020-04-15 08:13:00,1.00,E
2020-04-15 08:17:00,6.00,F
2、滚动窗口
1、事件时间
-- TUMBLE: 滚动窗口函数,函数的作用时在原表的基础上增加[窗口开始时间,窗口结束时间,窗口时间]
-- TABLE;表函数,将里面函数的结果转换成动态表
SELECT * FROM 
TABLE(
   TUMBLE(TABLE bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)
);
-- 在基于窗口函数提供的字段进行聚合计算
-- 实时统计每隔商品的总的金额,每隔10分钟统计一次
SELECT 
    item,
    window_start,
    window_end,
    sum(price) as sum_price
FROM 
TABLE(
    -- 滚动的事件时间窗口
   TUMBLE(TABLE bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)
)
group by item,window_start,window_end;
2、处理时间
CREATE TABLE words (
    word  STRING,
    proctime as PROCTIME() -- 定义处理时间,PROCTIME:获取处理时间的函数
) WITH (
  'connector' = 'kafka',
  'topic' = 'words', -- 数据的topic
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
  'properties.group.id' = 'testGroup', -- 消费者组
  'scan.startup.mode' = 'latest-offset', -- 读取数据的位置earliest-offset latest-offset
  'format' = 'csv' -- 读取数据的格式
);
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic words
java
spark
-- 在flink SQL中处理时间和事件时间的sql语法没有区别
SELECT * FROM 
TABLE(
   TUMBLE(TABLE words, DESCRIPTOR(proctime), INTERVAL '5' SECOND)
);
SELECT 
    word,window_start,window_end,
    count(1) as c
FROM 
TABLE(
   TUMBLE(TABLE words, DESCRIPTOR(proctime), INTERVAL '5' SECOND)
)
group by 
    word,window_start,window_end
3、滑动窗口
-- HOP: 滑动窗口函数
-- 滑动窗口一条数据可能会落到多个窗口中
SELECT * FROM 
TABLE(
   HOP(TABLE bid, DESCRIPTOR(bidtime),INTERVAL '5' MINUTES, INTERVAL '10' MINUTES)
);
-- 每隔5分钟计算最近10分钟所有商品总的金额
SELECT 
     window_start,
     window_end,
     sum(price) as sum_price
FROM 
TABLE(
   HOP(TABLE bid, DESCRIPTOR(bidtime),INTERVAL '5' MINUTES, INTERVAL '10' MINUTES)
)
group by 
    window_start,window_end
4、会话窗口
CREATE TABLE words (
    word  STRING,
    proctime as PROCTIME() -- 定义处理时间,PROCTIME:获取处理时间的函数
) WITH (
  'connector' = 'kafka',
  'topic' = 'words', -- 数据的topic
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
  'properties.group.id' = 'testGroup', -- 消费者组
  'scan.startup.mode' = 'latest-offset', -- 读取数据的位置earliest-offset latest-offset
  'format' = 'csv' -- 读取数据的格式
);
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic words
java
spark
select 
    word,
    SESSION_START(proctime,INTERVAL '5' SECOND) as window_start,
    SESSION_END(proctime,INTERVAL '5' SECOND) as window_end,
    count(1) as c
from 
    words
group by 
    word,SESSION(proctime,INTERVAL '5' SECOND);

5、OVER聚合

1、批处理

在flink批处理模式下,over函数和hive是一样的

SET 'execution.runtime-mode' = 'batch';
-- 有界流
CREATE TABLE students_hdfs_batch (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
)WITH (
  'connector' = 'filesystem',           -- 必选:指定连接器类型
  'path' = 'hdfs://master:9000/data/student',  -- 必选:指定路径
  'format' = 'csv'                     -- 必选:文件系统连接器指定 format
);
-- row_number,sum,count,avg,lag,lead,max,min
--  获取每隔班级年龄最大的前两个学生
select * 
from(
    select 
    *,
    row_number() over(partition by clazz order by age desc) as r
    from 
    students_hdfs_batch
) as a
where r <=2
2、流处理

flink流处理中over聚合使用限制

1、order by 字段必须是时间字段升序排序或者使用over_number时可以增加条件过滤

SET 'execution.runtime-mode' = 'streaming'; 
-- 创建kafka 表
CREATE TABLE students_kafka (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING,
    proctime as PROCTIME()
) WITH (
  'connector' = 'kafka',
  'topic' = 'students', -- 数据的topic
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
  'properties.group.id' = 'testGroup', -- 消费者组
  'scan.startup.mode' = 'earliest-offset', -- 读取数据的位置earliest-offset latest-offset
  'format' = 'csv' -- 读取数据的格式
);
-- 在流处理模式下,flink只能按照时间字段进行升序排序
-- 如果按照一个普通字段进行排序,在流处理模式下,每来一条新的数据都需重新计算之前的顺序,计算代价太大
-- 在row_number基础上增加条件,可以限制计算的代价不断增加
select * from (
select 
    *,
    row_number() over(partition by clazz order by age desc) as r
from 
    students_kafka
)
where r <= 2;
-- 在流处理模式下,flink只能按照时间字段进行升序排序
select 
*,
sum(age) over(partition by clazz order by proctime)
from 
students_kafka
-- 时间边界
-- RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW 
select 
*,
sum(age) over(
    partition by clazz
    order by proctime
    -- 统计最近10秒的数据
    RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW
)
from 
students_kafka /*+ OPTIONS('scan.startup.mode' = 'latest-offset') */;
-- 数据边界
--ROWS BETWEEN 10 PRECEDING AND CURRENT ROW
select 
*,
sum(age) over(
    partition by clazz
    order by proctime
    -- 统计最近10秒的数据
   ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
)
from 
students_kafka /*+ OPTIONS('scan.startup.mode' = 'latest-offset') */;
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic students
1500100003,单乐蕊,22,女,理科六班

6、ORDER BY

-- 排序字段必须带上时间升序排序
select * from 
students_kafka
order by proctime,age;
-- 限制排序的计算代价
select * 
from 
students_kafka
order by age
limit 10;

7、row_number去重

CREATE TABLE students_kafka (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING,
    proctime as PROCTIME()
) WITH (
  'connector' = 'kafka',
  'topic' = 'students', -- 数据的topic
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
  'properties.group.id' = 'testGroup', -- 消费者组
  'scan.startup.mode' = 'earliest-offset', -- 读取数据的位置earliest-offset latest-offset
  'format' = 'csv' -- 读取数据的格式
);
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic students
1500100003,单乐蕊,22,女,理科六班
select * from (
select 
sid,name,age,
row_number() over(partition by sid order by proctime) as r
from students_kafka /*+ OPTIONS('scan.startup.mode' = 'latest-offset') */
) 
where r = 1;

8、JOIN

Regular Joins: 主要用于批处理,如果在流处理上使用,状态会越来越大

Interval Join: 主要用于双流join

Temporal Joins:用于流表关联时态表(不同时间状态不一样,比如汇率表)

Lookup Join:用于流表关联维表(不怎么变化的表)

1、Regular Joins

常规join,和hive spark sql的join是一样的

1、批处理模式
CREATE TABLE students_hdfs_batch (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
)WITH (
  'connector' = 'filesystem',           -- 必选:指定连接器类型
  'path' = 'hdfs://master:9000/data/student',  -- 必选:指定路径
  'format' = 'csv'                     -- 必选:文件系统连接器指定 format
);
CREATE TABLE score_hdfs_batch (
    sid STRING,
    cid STRING,
    score INT
)WITH (
  'connector' = 'filesystem',           -- 必选:指定连接器类型
  'path' = 'hdfs://master:9000/data/score',  -- 必选:指定路径
  'format' = 'csv'                     -- 必选:文件系统连接器指定 format
);
SET 'execution.runtime-mode' = 'batch';
-- inner join
select a.sid,a.name,b.score from 
students_hdfs_batch as a
inner join
score_hdfs_batch as b
on a.sid=b.sid;
-- left join
select a.sid,a.name,b.score from 
students_hdfs_batch as a
left join
score_hdfs_batch as b
on a.sid=b.sid;
-- full join
select a.sid,a.name,b.score from 
students_hdfs_batch as a
full join
score_hdfs_batch as b
on a.sid=b.sid;
2、流处理模式
CREATE TABLE students_kafka (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
)WITH (
    'connector' = 'kafka',
    'topic' = 'students', -- 数据的topic
    'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
    'properties.group.id' = 'testGroup', -- 消费者组
    'scan.startup.mode' = 'latest-offset', -- 读取数据的位置earliest-offset latest-offset
    'format' = 'csv', -- 读取数据的格式
    'csv.ignore-parse-errors' = 'true' -- 如果数据解析异常自动跳过当前行
);
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic students
1500100001,施笑槐,22,女,文科六班
1500100002,吕金鹏,24,男,文科六班
1500100003,单乐蕊,22,女,理科六班
CREATE TABLE score_kafka (
    sid STRING,
    cid STRING,
    score INT
)WITH (
    'connector' = 'kafka',
    'topic' = 'scores', -- 数据的topic
    'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
    'properties.group.id' = 'testGroup', -- 消费者组
    'scan.startup.mode' = 'latest-offset', -- 读取数据的位置earliest-offset latest-offset
    'format' = 'csv', -- 读取数据的格式
    'csv.ignore-parse-errors' = 'true'
);
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic scores
1500100001,1000001,98
1500100001,1000002,5
1500100001,1000003,137
SET 'execution.runtime-mode' = 'streaming'; 
-- 使用常规关联方式做流处理,flink会将两个表的数据一直保存在状态中,状态会越来越大
-- 可以设置状态有效期避免状态无限增大
SET 'table.exec.state.ttl' = '5000';
-- full join
select a.sid,b.sid,a.name,b.score from 
students_kafka as a
full join
score_kafka as b
on a.sid=b.sid;
2、Interval Join

两个表在join时只关联一段时间内的数据,之前的数据就不需要保存在状态中,可以避免状态无限增大

CREATE TABLE students_kafka_time (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING,
    ts TIMESTAMP(3),
    WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
)WITH (
    'connector' = 'kafka',
    'topic' = 'students', -- 数据的topic
    'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
    'properties.group.id' = 'testGroup', -- 消费者组
    'scan.startup.mode' = 'latest-offset', -- 读取数据的位置earliest-offset latest-offset
    'format' = 'csv', -- 读取数据的格式
    'csv.ignore-parse-errors' = 'true' -- 如果数据解析异常自动跳过当前行
);
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic students
1500100001,施笑槐,22,女,文科六班,2023-11-10 17:10:10
1500100001,吕金鹏,24,男,文科六班,2023-11-10 17:10:11
1500100001,单乐蕊,22,女,理科六班,2023-11-10 17:10:12
CREATE TABLE score_kafka_time (
    sid STRING,
    cid STRING,
    score INT,
    ts TIMESTAMP(3),
    WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
)WITH (
    'connector' = 'kafka',
    'topic' = 'scores', -- 数据的topic
    'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
    'properties.group.id' = 'testGroup', -- 消费者组
    'scan.startup.mode' = 'latest-offset', -- 读取数据的位置earliest-offset latest-offset
    'format' = 'csv', -- 读取数据的格式
    'csv.ignore-parse-errors' = 'true'
);
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic scores
1500100001,1000001,98,2023-11-10 17:10:09
1500100001,1000002,5,2023-11-10 17:10:11
1500100001,1000003,137,2023-11-10 17:10:12
-- a.ts BETWEEN b.ts - INTERVAL '5' SECOND AND b.ts
-- a表数据的时间需要在b表数据的时间减去5秒到b表数据时间的范围内
SELECT a.sid,b.sid,a.name,b.score
FROM students_kafka_time a, score_kafka_time b
WHERE a.sid = b.sid
AND a.ts BETWEEN b.ts - INTERVAL '5' SECOND AND b.ts
3、Temporal Joins

用于流表关联时态表,比如订单表和汇率表的关联

-- 订单表
CREATE TABLE orders (
    order_id    STRING, -- 订单编号
    price       DECIMAL(32,2), --订单金额
    currency    STRING, -- 汇率编号
    order_time  TIMESTAMP(3), -- 订单时间
    WATERMARK FOR order_time AS order_time -- 水位线
) WITH (
  'connector' = 'kafka',
  'topic' = 'orders', -- 数据的topic
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
  'properties.group.id' = 'testGroup', -- 消费者组
  'scan.startup.mode' = 'latest-offset', -- 读取数据的位置earliest-offset latest-offset
  'format' = 'csv' -- 读取数据的格式
);
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic orders
001,100,CN,2023-11-11 09:48:10
002,200,CN,2023-11-11 09:48:11
003,300,CN,2023-11-11 09:48:14
004,400,CN,2023-11-11 09:48:16
005,500,CN,2023-11-11 09:48:18
-- 汇率表
CREATE TABLE currency_rates (
    currency STRING, -- 汇率编号
    conversion_rate DECIMAL(32, 2), -- 汇率
    update_time TIMESTAMP(3),  -- 汇率更新时间
    WATERMARK FOR update_time AS update_time, -- 水位线
    PRIMARY KEY(currency) NOT ENFORCED -- 主键
) WITH (
  'connector' = 'kafka',
  'topic' = 'currency_rates', -- 数据的topic
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
  'properties.group.id' = 'testGroup', -- 消费者组
  'scan.startup.mode' = 'earliest-offset', -- 读取数据的位置earliest-offset latest-offset
  'format' = 'canal-json' -- 读取数据的格式
);
insert into currency_rates 
values
('CN',7.2,TIMESTAMP'2023-11-11 09:48:05'),
('CN',7.1,TIMESTAMP'2023-11-11 09:48:10'),
('CN',6.9,TIMESTAMP'2023-11-11 09:48:15'),
('CN',7.4,TIMESTAMP'2023-11-11 09:48:20');
kafka-console-consumer.sh --bootstrap-server  master:9092,node1:9092,node2:9092 --from-beginning --topic currency_rates
-- 如果使用常规关联方式,取的时最新的汇率,不是对应时间的汇率
select a.order_id,b.* from 
orders as a
left join
currency_rates as b
on a.currency=b.currency;
-- 时态表join
-- FOR SYSTEM_TIME AS OF orders.order_time: 使用订单表的时间到汇率表中查询对应时间的数据
SELECT 
     order_id,
     price,
     conversion_rate,
     order_time
FROM orders
LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time
ON orders.currency = currency_rates.currency;
4、lookup join
-- 学生表
CREATE TABLE students_jdbc (
    id BIGINT,
    name STRING,
    age BIGINT,
    gender STRING,
    clazz STRING,
    PRIMARY KEY (id) NOT ENFORCED -- 主键
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://master:3306/student',
    'table-name' = 'students',
    'username' ='root',
    'password' ='123456',
    'lookup.cache.max-rows' = '1000', -- 缓存的最大行数
    'lookup.cache.ttl' = '20000' -- 缓存过期时间
);
-- 分数表
CREATE TABLE score_kafka (
    sid BIGINT,
    cid STRING,
    score INT,
    proc_time as PROCTIME()
)WITH (
    'connector' = 'kafka',
    'topic' = 'scores', -- 数据的topic
    'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
    'properties.group.id' = 'testGroup', -- 消费者组
    'scan.startup.mode' = 'latest-offset', -- 读取数据的位置earliest-offset latest-offset
    'format' = 'csv', -- 读取数据的格式
    'csv.ignore-parse-errors' = 'true'
);
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic scores
1500100001,1000001,98
1500100001,1000002,5
1500100001,1000003,137
-- 使用常规关联方式,关联维度表
-- 1、任务在启动的时候会将维表加载到flink 的状态中,如果数据库中学生表更新了,flink不知道,关联不到最新的数据
select 
b.id,b.name,a.score
from 
score_kafka as a
left join 
students_jdbc as b
on a.sid=b.id; 
-- lookup join 
-- FOR SYSTEM_TIME AS OF a.proc_time : 使用关联字段到维表中查询最新的数据
-- 优点: 流表每来一条数据都会去mysql中查询,可以关联到最新的数据
-- 每次查询mysql会降低性能
select 
b.id,b.name,a.score
from 
score_kafka as a
left join 
students_jdbc FOR SYSTEM_TIME AS OF a.proc_time  as b
on a.sid=b.id; 

5、Flink SQL Checkpoint

checkpoiny可以定时将flink任务的状态持久化到hdfs中,任务执行失败重启可以保证中间结果不丢失

1、开启checkpoint

# 修改flink配置文件
vim flink-conf.yaml
# checkppint 间隔时间
execution.checkpointing.interval: 1min
# 任务手动取消时保存checkpoint
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
# 同时允许1个checkpoint执行
execution.checkpointing.max-concurrent-checkpoints: 1
execution.checkpointing.min-pause: 0
# 数据处理的语义
execution.checkpointing.mode: EXACTLY_ONCE
# checkpoint超时时间
execution.checkpointing.timeout: 10min
execution.checkpointing.tolerable-failed-checkpoints: 0
execution.checkpointing.unaligned: false
# 状态后端(保存状态的位置,hashmap:内存)
state.backend: hashmap
# checkpoint路径
state.checkpoints.dir: hdfs://master:9000/flink/checkpoint

2、编写一个flink sql脚本

vim word_count.sql

-- 实时从kafka中读取单词,统计单词的数量,将结果保存到mysql中
-- 1、创建source表
CREATE TABLE words (
    word STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'words', -- 数据的topic
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
  'properties.group.id' = 'testGroup', -- 消费者组
  'scan.startup.mode' = 'earliest-offset', -- 读取数据的位置earliest-offset latest-offset
  'format' = 'csv' -- 读取数据的格式
);
-- 2、创建sink表
CREATE TABLE word_count (
    word STRING,
    num BIGINT,
    PRIMARY KEY (word) NOT ENFORCED -- 主键
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://master:3306/student',
    'table-name' = 'word_count', -- 需要手动到mysql中创建表
    'username' ='root',
    'password' ='123456'
);
-- 3、编写sql处理数据将结果保存到sink表中
insert into word_count
select 
word,
count(1) as num
from
words
group by word;

3、使用sql-client -f 启动任务

sql-client.sh -f word_count.sql

4、任务失败重启

-- 1、获取checkpoint的路径
/file/checkpoint/47ee348d8c9edadadfc770cf7de8e7ee/chk-23
-- 2、再sql脚本中增加参数,增加到sql脚本的inwet into 的前面
-- 指定任务会的checkpoint的地址
SET 'execution.savepoint.path' = 'hdfs://master:9000/file/checkpoint/47ee348d8c9edadadfc770cf7de8e7ee/chk-23';
-- 3、启动sql任务
sql-client.sh -f word_count.sql

6、反压

1、测试反压

反压:下游消费数据的速度比上游生成数据的速度小时会出现反压,下游导致上游task反压

CREATE TABLE words (
    word STRING
) WITH (
    'connector' = 'datagen',
    'rows-per-second'='100000', -- 每秒随机生成的数据量
    'fields.word.length'='4'
);
CREATE TABLE blackhole_table (
    wprd STRING,
    num BIGINT
) WITH (
  'connector' = 'blackhole'
);
insert into blackhole_table
select 
word,
count(1) as num from 
words /*+ OPTIONS('rows-per-second'='1000000','fields.word.length'='5') */
group by word;

2、增加资源

--1、增加并行度,一个并行度对应一个slot
SET 'parallelism.default' = '2';
--2、增加内存
-- 如果状态太大,内存放不下导致的反压可以通过增加内存解决
-- -tm : taskmanager的内存
-- -jm : jobmanager的内存
yarn-session.sh -tm 4G -d

3、微批和预聚合

开启微批处理和预聚合,可以减少shuffle过程中传输的数据量,减轻下游算子计算的压力

-- 开启微批处理
set 'table.exec.mini-batch.enabled'='true';
-- 批次的时间
set 'table.exec.mini-batch.allow-latency'='5 s';
-- 批次大小
set 'table.exec.mini-batch.size'='5000';
-- 开启预聚合
set 'table.optimizer.agg-phase-strategy'='TWO_PHASE'; 

7、Flink整合hive

1、整合

# 1、将依赖包上传到flink的lib目录下
flink-sql-connector-hive-3.1.2_2.12-1.15.2.jar
# 2、重启flink集群
yarn application -list
yarn application -kill application_1699579932721_0003
yarn-session.sh -d
# 3、重新进入sql命令行
sql-client.sh 

2、hive catalog

catalog(元数据) —> database —> table —> 数据 — > 列

-- 1、开启hive的元数据服务
nohup hive --service metastore &
-- 2、创建hive catalog
CREATE CATALOG myhive WITH (
  'type' = 'hive',
  'hive-conf-dir' = '/usr/local/soft/hive-3.1.2/conf'
);
-- 查看所有的catalog
-- default_catalog: 默认的元数据,将元数据保存在内存中
show catalogs;
--3、切换catalog
use catalog myhive;
--4、在flink中就可以使用hive中已经创建好的表
select * from student;
-- 可以从catalog开始定位一张表
select * from myhive.`default`.student;
-- 将flink的表结构保存到hive catalog中
-- hive中可以看到flink创建的流表,但是在hive中不能查询flink的流表
create database flink;
use flink;
-- 创建flink动态表
CREATE TABLE students_kafka (
    `offset` BIGINT METADATA VIRTUAL, -- 偏移量
    `event_time` TIMESTAMP(3) METADATA FROM 'timestamp', --数据进入kafka的时间,可以当作事件时间使用
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'students', -- 数据的topic
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
  'properties.group.id' = 'testGroup', -- 消费者组
  'scan.startup.mode' = 'earliest-offset', -- 读取数据的位置earliest-offset latest-offset
  'format' = 'csv' -- 读取数据的格式
);

3、hive functions

-- 加载hive函数
LOAD MODULE hive WITH ('hive-version' = '3.1.2');
-- 使用hive的函数
select split('java,spark',',');

练习

-- 1、实时统计道路拥堵情况,
-- 统计最近15分钟,每隔1分钟统计一次,
-- 统计车流量和平均车速
-- 将统计结果保存到数据库中
-- 1、创建kafka source表
CREATE TABLE cars (
    card_id   String,-- 车牌号
    road_id   String,-- 道路编号
    city_id   String,-- 城市编号
    car_id    String,-- 卡口编号
    com_id    String,-- 摄像头编号
    fx       String,-- 方向
    county_id String,-- 区县
    ts BIGINT,-- 时间
    speed Double, -- 速度
    event_time as TO_TIMESTAMP(FROM_UNIXTIME(ts)) ,-- 将时间戳转换成时间对象
    -- 指定事件时间和水位线,水位线前移5秒
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'cars', -- 数据的topic
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
  'properties.group.id' = 'testGroup', -- 消费者组
  'scan.startup.mode' = 'earliest-offset', -- 读取数据的位置earliest-offset latest-offset
  'format' = 'csv' -- 读取数据的格式
);
-- 创建mysql sink表
CREATE TABLE road_flow_avg_speed (
    road_id STRING,
    win_start TIMESTAMP(3),
    win_end  TIMESTAMP(3),
    flow  BIGINT,
    avg_speed DOUBLE,
    PRIMARY KEY (road_id,win_start) NOT ENFORCED -- 主键
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://master:3306/student',
    'table-name' = 'road_flow_avg_speed',
    'username' ='root',
    'password' ='123456'
);
-- 在mysql中建表
CREATE TABLE road_flow_avg_speed (
    road_id varchar(10),
    win_start DATETIME ,
    win_end  DATETIME ,
    flow  BIGINT,
    avg_speed DOUBLE,
    PRIMARY KEY (road_id,win_start) -- 主键
);
-- 将查询结果保存到mysql
insert into road_flow_avg_speed
select 
    road_id,
    HOP_START(event_time, INTERVAL '1' MINUTE, INTERVAL '15' MINUTE) as win_start, -- 窗口开始时间
    HOP_END(event_time, INTERVAL '1' MINUTE, INTERVAL '15' MINUTE) as win_end, -- 窗口结束时间
    count(1) as flow,
    avg(speed) as avg_speed
from 
    cars
group by 
    road_id,
    -- 滑动的事件时间窗口
    HOP(event_time, INTERVAL '1' MINUTE, INTERVAL '15' MINUTE) ;

网友评论

搜索
最新文章
热门文章
热门标签