Flink 系列文章
一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
-
1、Flink 部署系列
本部分介绍Flink的部署、配置相关基础内容。
-
2、Flink基础系列
本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
-
3、Flik Table API和SQL基础系列
本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
-
4、Flik Table API和SQL提高与应用系列
本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
-
5、Flink 监控系列
本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
文章目录
- Flink 系列文章
- 一、maven依赖及数据结构
- 1、maven依赖
- 2、数据结构
- 3、数据源
- 4、验证结果
- 二、维表来源于初始化的静态数据
- 1、说明
- 2、示例:将事实流与维表进行关联
本文介绍了Flink 维表的第一种方式,通过初始化的静态数据实现。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文除了maven依赖外,没有其他依赖。
本专题分为以下几篇文章:
【flink番外篇】15、Flink维表实战之6种实现方式-初始化的静态数据
【flink番外篇】15、Flink维表实战之6种实现方式-维表来源于第三方数据源
【flink番外篇】15、Flink维表实战之6种实现方式-通过广播将维表数据传递到下游
【flink番外篇】15、Flink维表实战之6种实现方式-通过Temporal table实现维表数据join
【flink番外篇】15、Flink维表实战之6种实现方式-完整版(1)
【flink番外篇】15、Flink维表实战之6种实现方式-完整版(2)
一、maven依赖及数据结构
1、maven依赖
本文的所有示例均依赖本部分的pom.xml内容,可能针对下文中的某些示例存在过多的引入,根据自己的情况进行删减。
UTF-8 UTF-8 1.8 1.8 1.8 2.12 1.17.0 org.apache.flink flink-clients ${flink.version} provided org.apache.flink flink-java ${flink.version} provided org.apache.flink flink-table-common ${flink.version} provided org.apache.flink flink-streaming-java ${flink.version} org.apache.flink flink-table-api-java-bridge ${flink.version} provided org.apache.flink flink-csv ${flink.version} provided org.apache.flink flink-json ${flink.version} provided org.apache.flink flink-table-planner_2.12 ${flink.version} provided org.apache.flink flink-table-api-java-uber ${flink.version} provided org.apache.flink flink-table-runtime ${flink.version} provided org.apache.flink flink-connector-jdbc 3.1.0-1.17 mysql mysql-connector-java 5.1.38 com.google.guava guava 32.0.1-jre org.apache.flink flink-connector-kafka ${flink.version} org.apache.flink flink-sql-connector-kafka ${flink.version} provided org.apache.commons commons-compress 1.24.0 org.projectlombok lombok 1.18.2 org.apache.bahir flink-connector-redis_2.12 1.1.0 flink-streaming-java_2.12 org.apache.flink flink-runtime_2.12 org.apache.flink flink-core org.apache.flink flink-java org.apache.flink org.apache.flink flink-table-api-java org.apache.flink flink-table-api-java-bridge_2.12 org.apache.flink flink-table-common org.apache.flink flink-table-planner_2.12 com.alibaba fastjson 2.0.43 2、数据结构
本示例仅仅为实现需求:将订单中uId与用户id进行关联,然后输出Tuple2
。 - 事实流 order
// 事实表 @Data @NoArgsConstructor @AllArgsConstructor static class Order { private Integer id; private Integer uId; private Double total; }
- 维度流 user
// 维表 @Data @NoArgsConstructor @AllArgsConstructor static class User { private Integer id; private String name; private Double balance; private Integer age; private String email; }
3、数据源
事实流数据有几种,具体见示例部分,比如socket、redis、kafka等
维度表流有几种,具体见示例部分,比如静态数据、mysql、socket、kafka等。
如此,实现本文中的示例就需要准备好相应的环境,即mysql、redis、kafka、netcat等。
4、验证结果
本文提供的所有示例均为验证通过的示例,测试的数据均在每个示例中,分为事实流、维度流和运行结果进行注释,在具体的示例中关于验证不再赘述。
二、维表来源于初始化的静态数据
1、说明
通过定义一个类实现RichMapFunction,在open()中读取维表数据加载到内存中,在事实流map()方法中与维表数据进行关联。
由于数据存储于内存中,所以只适合小数据量并且维表数据更新频率不高的情况下使用。虽然可以在open中定义一个定时器定时更新维表,但是还是存在维表更新不及时的情况或资源开销较大的情况。一般如果数据量较小且不大会变(或变化影响也不大)的情况下,理想选择之一。
2、示例:将事实流与维表进行关联
import java.util.HashMap; import java.util.Map; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; /* * @Author: alanchan * @LastEditors: alanchan * @Description: 采用在RichMapfunction类的open方法中将维表数据加载到内存 */ public class TestJoinDimFromStaticDataDemo { // 维表 @Data @NoArgsConstructor @AllArgsConstructor static class User { private Integer id; private String name; private Double balance; private Integer age; private String email; } // 事实表 @Data @NoArgsConstructor @AllArgsConstructor static class Order { private Integer id; private Integer uId; private Double total; } public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // order 事实流 DataStream
orderDs = env.socketTextStream("192.168.10.42", 9999) .map(o -> { String[] lines = o.split(","); return new Order(Integer.valueOf(lines[0]), Integer.valueOf(lines[1]), Double.valueOf(lines[2])); }); DataStream > result = orderDs.map(new RichMapFunction >() { Map userDim = null; // 维表-静态数据,本处使用的是匿名内部类实现的 @Override public void open(Configuration parameters) throws Exception { userDim = new HashMap<>(); userDim.put(1001, new User(1001, "alan", 20d, 18, "alan.chan.chn@163.com")); userDim.put(1002, new User(1002, "alanchan", 22d, 20, "alan.chan.chn@163.com")); userDim.put(1003, new User(1003, "alanchanchn", 23d, 22, "alan.chan.chn@163.com")); userDim.put(1004, new User(1004, "alan_chan", 21d, 19, "alan.chan.chn@163.com")); userDim.put(1005, new User(1005, "alan_chan_chn", 23d, 21, "alan.chan.chn@163.com")); } @Override public Tuple2 map(Order value) throws Exception { return new Tuple2(value, userDim.get(value.getUId()).getName()); } }); result.print(); // nc 输入 // 1,1004,345 // 2,1001,678 // 控制台输出 // 2> (TestJoinDimFromStaticData.Order(id=1, uId=1004, total=345.0),alan_chan) // 3> (TestJoinDimFromStaticData.Order(id=2, uId=1001, total=678.0),alan) env.execute("TestJoinDimFromStaticData"); } } 以上,本文介绍了Flink 维表的第一种方式,通过初始化的静态数据实现。
本专题分为以下几篇文章:
【flink番外篇】15、Flink维表实战之6种实现方式-初始化的静态数据
【flink番外篇】15、Flink维表实战之6种实现方式-维表来源于第三方数据源
【flink番外篇】15、Flink维表实战之6种实现方式-通过广播将维表数据传递到下游
【flink番外篇】15、Flink维表实战之6种实现方式-通过Temporal table实现维表数据join
【flink番外篇】15、Flink维表实战之6种实现方式-完整版(1)
【flink番外篇】15、Flink维表实战之6种实现方式-完整版(2)
- 维度流 user
- 事实流 order
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章