Flink 系列文章
一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
-
1、Flink 部署系列
本部分介绍Flink的部署、配置相关基础内容。
-
2、Flink基础系列
本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
-
3、Flik Table API和SQL基础系列
本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
-
4、Flik Table API和SQL提高与应用系列
本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
-
5、Flink 监控系列
本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
文章目录
- Flink 系列文章
- 一、maven依赖及数据结构
- 1、maven依赖
- 2、数据结构
- 3、数据源
- 4、验证结果
- 二、维表来源于初始化的静态数据
- 1、说明
- 2、示例:将事实流与维表进行关联
- 三、维表来源于第三方数据源
- 1、说明
- 2、示例:将事实流与维表进行关联-通过缓存降低性能开销
- 3、示例:将事实流与维表进行关联-通过Flink 的异步 I/O提高系统效率
- 1)、redis 异步I/O实现
- 2)、实现事实流与维度流join
- 四、通过广播将维表数据传递到下游
- 1、说明
- 2、示例:将事实流与维表进行关联-通过Flink 的Broadcast
- 1)、广播实现
- 2)、实现事实流与维度流join
- 五、通过Temporal table实现维表数据join
- 1、说明
- 2、示例:将事实流与维表进行关联-ProcessingTime实现
- 3、示例:将事实流与维表进行关联-EventTime实现
- 4、示例:将事实流与维表进行关联-Kafka Source的EventTime实现
- 1)、bean定义
- 2)、序列化定义
- 3)、实现
本文详细的介绍了Flink的维表join的6种方式,即静态数据、缓存、异步I/O、广播、时态表的3种方式。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文除了maven依赖外,没有其他依赖。
本文的示例中依赖环境有redis、kafka、netcat等。
一、maven依赖及数据结构
1、maven依赖
本文的所有示例均依赖本部分的pom.xml内容,可能针对下文中的某些示例存在过多的引入,根据自己的情况进行删减。
UTF-8 UTF-8 1.8 1.8 1.8 2.12 1.17.0 org.apache.flink flink-clients ${flink.version} provided org.apache.flink flink-java ${flink.version} provided org.apache.flink flink-table-common ${flink.version} provided org.apache.flink flink-streaming-java ${flink.version} org.apache.flink flink-table-api-java-bridge ${flink.version} provided org.apache.flink flink-csv ${flink.version} provided org.apache.flink flink-json ${flink.version} provided org.apache.flink flink-table-planner_2.12 ${flink.version} provided org.apache.flink flink-table-api-java-uber ${flink.version} provided org.apache.flink flink-table-runtime ${flink.version} provided org.apache.flink flink-connector-jdbc 3.1.0-1.17 mysql mysql-connector-java 5.1.38 com.google.guava guava 32.0.1-jre org.apache.flink flink-connector-kafka ${flink.version} org.apache.flink flink-sql-connector-kafka ${flink.version} provided org.apache.commons commons-compress 1.24.0 org.projectlombok lombok 1.18.2 org.apache.bahir flink-connector-redis_2.12 1.1.0 flink-streaming-java_2.12 org.apache.flink flink-runtime_2.12 org.apache.flink flink-core org.apache.flink flink-java org.apache.flink org.apache.flink flink-table-api-java org.apache.flink flink-table-api-java-bridge_2.12 org.apache.flink flink-table-common org.apache.flink flink-table-planner_2.12 com.alibaba fastjson 2.0.43 2、数据结构
本示例仅仅为实现需求:将订单中uId与用户id进行关联,然后输出Tuple2
。 - 事实流 order
// 事实表 @Data @NoArgsConstructor @AllArgsConstructor static class Order { private Integer id; private Integer uId; private Double total; }
- 维度流 user
// 维表 @Data @NoArgsConstructor @AllArgsConstructor static class User { private Integer id; private String name; private Double balance; private Integer age; private String email; }
3、数据源
事实流数据有几种,具体见示例部分,比如socket、redis、kafka等
维度表流有几种,具体见示例部分,比如静态数据、mysql、socket、kafka等。
如此,实现本文中的示例就需要准备好相应的环境,即mysql、redis、kafka、netcat等。
4、验证结果
本文提供的所有示例均为验证通过的示例,测试的数据均在每个示例中,分为事实流、维度流和运行结果进行注释,在具体的示例中关于验证不再赘述。
二、维表来源于初始化的静态数据
1、说明
通过定义一个类实现RichMapFunction,在open()中读取维表数据加载到内存中,在事实流map()方法中与维表数据进行关联。
由于数据存储于内存中,所以只适合小数据量并且维表数据更新频率不高的情况下使用。虽然可以在open中定义一个定时器定时更新维表,但是还是存在维表更新不及时的情况或资源开销较大的情况。一般如果数据量较小且不大会变(或变化影响也不大)的情况下,理想选择之一。
2、示例:将事实流与维表进行关联
import java.util.HashMap; import java.util.Map; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; /* * @Author: alanchan * @LastEditors: alanchan * @Description: 采用在RichMapfunction类的open方法中将维表数据加载到内存 */ public class TestJoinDimFromStaticDataDemo { // 维表 @Data @NoArgsConstructor @AllArgsConstructor static class User { private Integer id; private String name; private Double balance; private Integer age; private String email; } // 事实表 @Data @NoArgsConstructor @AllArgsConstructor static class Order { private Integer id; private Integer uId; private Double total; } public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // order 事实流 DataStream
orderDs = env.socketTextStream("192.168.10.42", 9999) .map(o -> { String[] lines = o.split(","); return new Order(Integer.valueOf(lines[0]), Integer.valueOf(lines[1]), Double.valueOf(lines[2])); }); DataStream > result = orderDs.map(new RichMapFunction >() { Map userDim = null; // 维表-静态数据,本处使用的是匿名内部类实现的 @Override public void open(Configuration parameters) throws Exception { userDim = new HashMap<>(); userDim.put(1001, new User(1001, "alan", 20d, 18, "alan.chan.chn@163.com")); userDim.put(1002, new User(1002, "alanchan", 22d, 20, "alan.chan.chn@163.com")); userDim.put(1003, new User(1003, "alanchanchn", 23d, 22, "alan.chan.chn@163.com")); userDim.put(1004, new User(1004, "alan_chan", 21d, 19, "alan.chan.chn@163.com")); userDim.put(1005, new User(1005, "alan_chan_chn", 23d, 21, "alan.chan.chn@163.com")); } @Override public Tuple2 map(Order value) throws Exception { return new Tuple2(value, userDim.get(value.getUId()).getName()); } }); result.print(); // nc 输入 // 1,1004,345 // 2,1001,678 // 控制台输出 // 2> (TestJoinDimFromStaticData.Order(id=1, uId=1004, total=345.0),alan_chan) // 3> (TestJoinDimFromStaticData.Order(id=2, uId=1001, total=678.0),alan) env.execute("TestJoinDimFromStaticData"); } } 三、维表来源于第三方数据源
1、说明
这种方式是将维表数据存储在Redis、HBase、MySQL等外部存储中,事实流在关联维表数据的时候实时去外部存储中查询。
由于维度数据量不受内存限制,可以存储很大的数据量。同时维表数据来源于第三方数据源,读取速度受制于外部存储的读取速度。一般常见的做法该种方式较多。
2、示例:将事实流与维表进行关联-通过缓存降低性能开销
如果频繁的访问第三方数据源进行join,会带来很大的开销,为降低该种情况的开销,一般使用cache来减轻访问压力,但该种方式存在数据同步的不一致或延迟情况。如果使用缓存,则会存在将数据存在内存中,也会增加系统开销。该种情况的实际应用以具体的业务场景而定。本示例使用的是guava Cache,缓存的实现有很多种方式,具体以自己的实际情况进行选择。
本示例的数据源仅仅以静态的数据进行展示,实际上可能数据来源于Hbase、mysql等。
import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.cache.RemovalListener; import com.google.common.cache.RemovalNotification; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; /* * @Author: alanchan * @LastEditors: alanchan * @Description: */ public class TestJoinDimFromCacheDataDemo { // 维表 @Data @NoArgsConstructor @AllArgsConstructor static class User { private Integer id; private String name; private Double balance; private Integer age; private String email; } // 事实表 @Data @NoArgsConstructor @AllArgsConstructor static class Order { private Integer id; private Integer uId; private Double total; } public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // order 实时流 DataStream
orderDs = env.socketTextStream("192.168.10.42", 9999) .map(o -> { String[] lines = o.split(","); return new Order(Integer.valueOf(lines[0]), Integer.valueOf(lines[1]), Double.valueOf(lines[2])); }); // user 维表 DataStream > result = orderDs.map(new RichMapFunction >() { // 缓存接口这里是LoadingCache,LoadingCache在缓存项不存在时可以自动加载缓存 LoadingCache userDim; @Override public void open(Configuration parameters) throws Exception { // 使用google LoadingCache来进行缓存 // CacheBuilder的构造函数是私有的,只能通过其静态方法newBuilder()来获得CacheBuilder的实例 userDim = CacheBuilder.newBuilder() // 设置并发级别为8,并发级别是指可以同时写缓存的线程数 .concurrencyLevel(8) // 最多缓存个数,超过了就根据最近最少使用算法来移除缓存 .maximumSize(1000) // 设置写缓存后10分钟过期 .expireAfterWrite(10, TimeUnit.MINUTES) // 设置缓存容器的初始容量为10 .initialCapacity(10) // 设置要统计缓存的命中率 .recordStats() // 指定移除通知 .removalListener(new RemovalListener () { @Override public void onRemoval(RemovalNotification removalNotification) { System.out.println(removalNotification.getKey() + "被移除了,值为:" + removalNotification.getValue()); } }) .build( // 指定加载缓存的逻辑 new CacheLoader () { @Override public User load(Integer uId) throws Exception { return dataSource(uId); } }); System.out.println("userDim:" + userDim.get(1002)); } private User dataSource(Integer uId) { // 可以是任何数据源,本处仅仅示例 Map users = new HashMap<>(); users.put(1001, new User(1001, "alan", 20d, 18, "alan.chan.chn@163.com")); users.put(1002, new User(1002, "alanchan", 22d, 20, "alan.chan.chn@163.com")); users.put(1003, new User(1003, "alanchanchn", 23d, 22, "alan.chan.chn@163.com")); users.put(1004, new User(1004, "alan_chan", 21d, 19, "alan.chan.chn@163.com")); users.put(1005, new User(1005, "alan_chan_chn", 23d, 21, "alan.chan.chn@163.com")); User user = null; if (users.containsKey(uId)) { user = users.get(uId); } return user; } @Override public Tuple2 map(Order value) throws Exception { return new Tuple2(value, userDim.get(value.getUId()).getName()); } }); result.print(); // 输入数据 // 7,1003,111 // 8,1005,234 // 9,1002,875 // 控制台输出数据 // 5> (TestJoinDimFromCacheDataDemo.Order(id=7, uId=1003, total=111.0),alanchanchn) // 6> (TestJoinDimFromCacheDataDemo.Order(id=8, uId=1005, total=234.0),alan_chan_chn) // 7> (TestJoinDimFromCacheDataDemo.Order(id=9, uId=1002, total=875.0),alanchan) env.execute("TestJoinDimFromCacheDataDemo"); } } 3、示例:将事实流与维表进行关联-通过Flink 的异步 I/O提高系统效率
Flink与外部存储系统进行读写操作的时候可以使用同步方式,也就是发送一个请求后等待外部系统响应,然后再发送第二个读写请求,这样的方式吞吐量比较低,可以用提高并行度的方式来提高吞吐量,但是并行度多了也就导致了进程数量多了,占用了大量的资源。
Flink中可以使用异步IO来读写外部系统,这要求外部系统客户端支持异步IO,比如redis、MongoDB等。
更多内容见文章:
55、Flink之用于外部数据访问的异步 I/O介绍及示例
1)、redis 异步I/O实现
package org.tablesql.join; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; import org.tablesql.join.TestJoinDimFromAsyncDataStreamDemo.Order; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; /* * @Author: alanchan * @LastEditors: alanchan * @Description: */ public class JoinAyncFunctionByRedis extends RichAsyncFunction
> { private JedisPoolConfig config = null; private static String ADDR = "192.168.10.41"; private static int PORT = 6379; private static int TIMEOUT = 10000; private JedisPool jedisPool = null; private Jedis jedis = null; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); config = new JedisPoolConfig(); jedisPool = new JedisPool(config, ADDR, PORT, TIMEOUT); jedis = jedisPool.getResource(); } @Override public void asyncInvoke(Order input, ResultFuture > resultFuture) throws Exception { // order 实时流中的单行数据 System.out.println("输入参数input----:" + input); // 发起一个异步请求,返回结果 CompletableFuture.supplyAsync(new Supplier () { @Override public String get() { // 数据格式:1002,alanchan,19,25,alan.chan.chn@163.com String userLine = jedis.hget("AsyncReadUserById_Redis", input.getUId() + ""); String[] userTemp = userLine.split(","); // 返回 用户名 return userTemp[1]; } }).thenAccept((String dbResult) -> { // 设置请求完成时的回调,将结果返回 List list = new ArrayList >(); list.add(new Tuple2<>(input, dbResult)); resultFuture.complete(list); }); } // 连接超时的时候调用的方法 public void timeout(Order input, ResultFuture > resultFuture) throws Exception { List list = new ArrayList >(); // 数据源超时,不能获取到维表信息,置为" list.add(new Tuple2<>(input, "")); resultFuture.complete(list); } @Override public void close() throws Exception { super.close(); if (jedis.isConnected()) { jedis.close(); } } } 2)、实现事实流与维度流join
package org.tablesql.join; import java.util.concurrent.TimeUnit; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.AsyncDataStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; /* * @Author: alanchan * @LastEditors: alanchan * @Description: */ public class TestJoinDimFromAsyncDataStreamDemo { // 维表 @Data @NoArgsConstructor @AllArgsConstructor static class User { private Integer id; private String name; private Double balance; private Integer age; private String email; } // 事实表 @Data @NoArgsConstructor @AllArgsConstructor static class Order { private Integer id; private Integer uId; private Double total; } public static void main(String[] args) throws Exception { testJoinAyncFunctionByRedis(); } static void testJoinAyncFunctionByRedis() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // order 实时流 DataStream
orderDs = env.socketTextStream("192.168.10.42", 9999) .map(o -> { String[] lines = o.split(","); return new Order(Integer.valueOf(lines[0]), Integer.valueOf(lines[1]), Double.valueOf(lines[2])); }); // 保证顺序:异步返回的结果保证顺序,超时时间1秒,最大容量2,超出容量触发反压 DataStream > result = AsyncDataStream.orderedWait(orderDs, new JoinAyncFunctionByRedis(), 1000L, TimeUnit.MILLISECONDS, 2); result.print("result:"); // 允许乱序:异步返回的结果允许乱序,超时时间1秒,最大容量2,超出容量触发反压 DataStream > unorderedResult = AsyncDataStream .unorderedWait(orderDs, new JoinAyncFunctionByRedis(), 1000L, TimeUnit.MILLISECONDS, 2) .setParallelism(1); unorderedResult.print("unorderedResult"); // redis的操作命令及数据 // 127.0.0.1:6379> hset AsyncReadUserById_Redis 1001 '1001,alan,18,20,alan.chan.chn@163.com' // (integer) 1 // 127.0.0.1:6379> hset AsyncReadUserById_Redis 1002 '1002,alanchan,19,25,alan.chan.chn@163.com' // (integer) 1 // 127.0.0.1:6379> hset AsyncReadUserById_Redis 1003 '1003,alanchanchn,20,30,alan.chan.chn@163.com' // (integer) 1 // 127.0.0.1:6379> hset AsyncReadUserById_Redis 1004 '1004,alan_chan,27,20,alan.chan.chn@163.com' // (integer) 1 // 127.0.0.1:6379> hset AsyncReadUserById_Redis 1005 '1005,alan_chan_chn,36,10,alan.chan.chn@163.com' // (integer) 1 // 127.0.0.1:6379> hgetall AsyncReadUserById_Redis // 1) "1001" // 2) "1001,alan,18,20,alan.chan.chn@163.com" // 3) "1002" // 4) "1002,alanchan,19,25,alan.chan.chn@163.com" // 5) "1003" // 6) "1003,alanchanchn,20,30,alan.chan.chn@163.com" // 7) "1004" // 8) "1004,alan_chan,27,20,alan.chan.chn@163.com" // 9) "1005" // 10) "1005,alan_chan_chn,36,10,alan.chan.chn@163.com" // 输入数据 // 13,1002,811 // 14,1004,834 // 15,1005,975 // 控制台输出数据 // 输入参数input----:TestJoinDimFromAsyncDataStreamDemo.Order(id=13, uId=1002, total=811.0) // result::12> (TestJoinDimFromAsyncDataStreamDemo.Order(id=13, uId=1002, total=811.0),1002,alanchan,19,25,alan.chan.chn@163.com) // 输入参数input----:TestJoinDimFromAsyncDataStreamDemo.Order(id=13, uId=1002, total=811.0) // unorderedResult:9> (TestJoinDimFromAsyncDataStreamDemo.Order(id=13, uId=1002, total=811.0),1002,alanchan,19,25,alan.chan.chn@163.com) // result::5> (TestJoinDimFromAsyncDataStreamDemo.Order(id=14, uId=1004, total=834.0),alan_chan) // 输入参数input----:TestJoinDimFromAsyncDataStreamDemo.Order(id=14, uId=1004, total=834.0) // unorderedResult:2> (TestJoinDimFromAsyncDataStreamDemo.Order(id=14, uId=1004, total=834.0),alan_chan) // 输入参数input----:TestJoinDimFromAsyncDataStreamDemo.Order(id=15, uId=1005, total=975.0) // result::6> (TestJoinDimFromAsyncDataStreamDemo.Order(id=15, uId=1005, total=975.0),alan_chan_chn) // 输入参数input----:TestJoinDimFromAsyncDataStreamDemo.Order(id=15, uId=1005, total=975.0) // unorderedResult:3> (TestJoinDimFromAsyncDataStreamDemo.Order(id=15, uId=1005, total=975.0),alan_chan_chn) env.execute("TestJoinDimFromAsyncDataStreamDemo"); } } 四、通过广播将维表数据传递到下游
1、说明
利用Flink的Broadcast State将维表数据流广播到下游做join操作。该种方式实现比较方便,完全满足需求,美中不足的是需要充分利用系统的内存,也就是将数据存储在内容中。
更多内容见文章:
53、Flink 的Broadcast State 模式介绍及示例
2、示例:将事实流与维表进行关联-通过Flink 的Broadcast
1)、广播实现
/* * @Author: alanchan * @LastEditors: alanchan * @Description: */ package org.tablesql.join; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReadOnlyBroadcastState; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.util.Collector; import org.tablesql.join.TestJoinDimFromBroadcastDataStreamDemo.Order; import org.tablesql.join.TestJoinDimFromBroadcastDataStreamDemo.User; // final BroadcastProcessFunction
function) public class JoinBroadcastProcessFunctionImpl extends BroadcastProcessFunction > { // 用于存储规则名称与规则本身的 map 存储结构 MapStateDescriptor broadcastDesc; JoinBroadcastProcessFunctionImpl(MapStateDescriptor broadcastDesc) { this.broadcastDesc = broadcastDesc; } // 负责处理广播流的元素 @Override public void processBroadcastElement(User value, BroadcastProcessFunction >.Context ctx, Collector > out) throws Exception { System.out.println("收到广播数据:" + value); // 得到广播流的存储状态 ctx.getBroadcastState(broadcastDesc).put(value.getId(), value); } // 处理非广播流,关联维度 @Override public void processElement(Order value, BroadcastProcessFunction >.ReadOnlyContext ctx, Collector > out) throws Exception { // 得到广播流的存储状态 ReadOnlyBroadcastState state = ctx.getBroadcastState(broadcastDesc); out.collect(new Tuple2<>(value, state.get(value.getUId()).getName())); } } 2)、实现事实流与维度流join
/* * @Author: alanchan * @LastEditors: alanchan * @Description: */ package org.tablesql.join; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.streaming.api.datastream.BroadcastStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; public class TestJoinDimFromBroadcastDataStreamDemo { // 维表 @Data @NoArgsConstructor @AllArgsConstructor static class User { private Integer id; private String name; private Double balance; private Integer age; private String email; } // 事实表 @Data @NoArgsConstructor @AllArgsConstructor static class Order { private Integer id; private Integer uId; private Double total; } public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // order 实时流 DataStream
orderDs = env.socketTextStream("192.168.10.42", 9999) .map(o -> { String[] lines = o.split(","); return new Order(Integer.valueOf(lines[0]), Integer.valueOf(lines[1]), Double.valueOf(lines[2])); }); // user 实时流 DataStream userDs = env.socketTextStream("192.168.10.42", 8888) .map(o -> { String[] lines = o.split(","); return new User(Integer.valueOf(lines[0]), lines[1], Double.valueOf(lines[2]), Integer.valueOf(lines[3]), lines[4]); }).setParallelism(1); // 一个 map descriptor,它描述了用于存储规则名称与规则本身的 map 存储结构 // MapStateDescriptor ruleStateDescriptor = new MapStateDescriptor<>( // "RulesBroadcastState", // BasicTypeInfo.STRING_TYPE_INFO, // TypeInformation.of(new TypeHint () { // })); // 广播流,广播规则并且创建 broadcast state // BroadcastStream ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor); // 将user流(维表)定义为广播流 final MapStateDescriptor broadcastDesc = new MapStateDescriptor("Alan_RulesBroadcastState", Integer.class, User.class); BroadcastStream broadcastStream = userDs.broadcast(broadcastDesc); // 需要由非广播流来进行调用 DataStream result = orderDs.connect(broadcastStream) .process(new JoinBroadcastProcessFunctionImpl(broadcastDesc)); result.print(); // user 流数据(维度表),由于未做容错处理,需要先广播维度数据,否则会出现空指针异常 // 1001,alan,18,20,alan.chan.chn@163.com // 1002,alanchan,19,25,alan.chan.chn@163.com // 1003,alanchanchn,20,30,alan.chan.chn@163.com // 1004,alan_chan,27,20,alan.chan.chn@163.com // 1005,alan_chan_chn,36,10,alan.chan.chn@163.com // order 流数据 // 16,1002,211 // 17,1004,234 // 18,1005,175 // 控制台输出 // 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1001, name=alan, balance=18.0, age=20, email=alan.chan.chn@163.com) // ...... // 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1001, name=alan, balance=18.0, age=20, email=alan.chan.chn@163.com) // 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1002, name=alanchan, balance=19.0, age=25, email=alan.chan.chn@163.com) // ...... // 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1002, name=alanchan, balance=19.0, age=25, email=alan.chan.chn@163.com) // 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1003, name=alanchanchn, balance=20.0, age=30, email=alan.chan.chn@163.com) // ...... // 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1003, name=alanchanchn, balance=20.0, age=30, email=alan.chan.chn@163.com) // 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1004, name=alan_chan, balance=27.0, age=20, email=alan.chan.chn@163.com) // ...... // 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1004, name=alan_chan, balance=27.0, age=20, email=alan.chan.chn@163.com) // 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1005, name=alan_chan_chn, balance=36.0, age=10, email=alan.chan.chn@163.com) // ...... // 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1005, name=alan_chan_chn, balance=36.0, age=10, email=alan.chan.chn@163.com) // 7> (TestJoinDimFromBroadcastDataStreamDemo.Order(id=16, uId=1002, total=211.0),alanchan) // 8> (TestJoinDimFromBroadcastDataStreamDemo.Order(id=17, uId=1004, total=234.0),alan_chan) // 9> (TestJoinDimFromBroadcastDataStreamDemo.Order(id=18, uId=1005, total=175.0),alan_chan_chn) env.execute(); } } 五、通过Temporal table实现维表数据join
1、说明
Temporal table是持续变化表上某一时刻的视图,Temporal table function是一个表函数,传递一个时间参数,返回Temporal table这一指定时刻的视图。可以将维度数据流映射为Temporal table,事实流与这个Temporal table进行join,可以关联到某一个版本视图的维度数据。
该种方式维度数据量可以很大,维表数据实时更新,不依赖于第三方存储,并且提供不同版本的维表数据(应对维表信息更新)。截至版本Flink 1.17该种方式只能在Flink SQL API中使用。
关于时间参数,flink有三个时间,即eventtime、processingtime和injectiontime,常用的是eventtime和processingtime,本文介绍其实现方式。关于eventtime的实现,kafka与其他的数据源还有不同,本文单独介绍一下kafka的实现方式。
2、示例:将事实流与维表进行关联-ProcessingTime实现
package org.tablesql.join; import static org.apache.flink.table.api.Expressions.$; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.functions.TemporalTableFunction; import org.apache.flink.types.Row; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; /* * @Author: alanchan * @LastEditors: alanchan * @Description: 基于处理时间的时态表 */ public class TestJoinDimByProcessingTimeDemo { // 维表 @Data @NoArgsConstructor @AllArgsConstructor public static class User { private Integer id; private String name; private Double balance; private Integer age; private String email; } // 事实表 @Data @NoArgsConstructor @AllArgsConstructor public static class Order { private Integer id; private Integer uId; private Double total; } public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tenv = StreamTableEnvironment.create(env); // order 实时流 事实表 DataStream
orderDs = env.socketTextStream("192.168.10.42", 9999) .map(o -> { String[] lines = o.split(","); return new Order(Integer.valueOf(lines[0]), Integer.valueOf(lines[1]), Double.valueOf(lines[2])); }); // user 实时流 维度表 DataStream userDs = env.socketTextStream("192.168.10.42", 8888) .map(o -> { String[] lines = o.split(","); return new User(Integer.valueOf(lines[0]), lines[1], Double.valueOf(lines[2]), Integer.valueOf(lines[3]), lines[4]); }).setParallelism(1); // 转变为Table Table orderTable = tenv.fromDataStream(orderDs, $("id"), $("uId"), $("total"), $("order_ps").proctime()); Table userTable = tenv.fromDataStream(userDs, $("id"), $("name"), $("balance"), $("age"), $("email"), $("user_ps").proctime()); // 定义一个TemporalTableFunction TemporalTableFunction userDim = userTable.createTemporalTableFunction($("user_ps"), $("id")); // 注册表函数 tenv.registerFunction("alan_userDim", userDim); // 关联查询 Table result = tenv .sqlQuery("select o.* , u.name from " + orderTable + " as o , Lateral table (alan_userDim(o.order_ps)) u " + "where o.uId = u.id"); // 打印输出 DataStream resultDs = tenv.toAppendStream(result, Row.class); resultDs.print(); // user 流数据(维度表) // 1001,alan,18,20,alan.chan.chn@163.com // 1002,alanchan,19,25,alan.chan.chn@163.com // 1003,alanchanchn,20,30,alan.chan.chn@163.com // 1004,alan_chan,27,20,alan.chan.chn@163.com // 1005,alan_chan_chn,36,10,alan.chan.chn@163.com // order 流数据 // 26,1002,311 // 27,1004,334 // 28,1005,475 // 控制台输出 // 15> +I[26, 1002, 311.0, 2023-12-20T05:21:12.977Z, alanchan] // 11> +I[27, 1004, 334.0, 2023-12-20T05:21:50.898Z, alan_chan] // 5> +I[28, 1005, 475.0, 2023-12-20T05:21:57.559Z, alan_chan_chn] env.execute(); } } 3、示例:将事实流与维表进行关联-EventTime实现
/* * @Author: alanchan * @LastEditors: alanchan * @Description: */ package org.tablesql.join; import static org.apache.flink.table.api.Expressions.$; import java.time.Duration; import java.util.Arrays; import java.util.List; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.functions.TemporalTableFunction; import org.apache.flink.types.Row; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; public class TestjoinDimByEventTimeDemo { // 维表 @Data @NoArgsConstructor @AllArgsConstructor public static class User { private Integer id; private String name; private Double balance; private Integer age; private String email; private Long eventTime; } // 事实表 @Data @NoArgsConstructor @AllArgsConstructor public static class Order { private Integer id; private Integer uId; private Double total; private Long eventTime; } final static List
userList = Arrays.asList( new User(1001, "alan", 20d, 18, "alan.chan.chn@163.com", 1L), new User(1002, "alan", 30d, 19, "alan.chan.chn@163.com", 10L), new User(1003, "alan", 29d, 25, "alan.chan.chn@163.com", 1L), new User(1004, "alanchan", 22d, 28, "alan.chan.chn@163.com", 5L), new User(1005, "alanchan", 50d, 29, "alan.chan.chn@163.com", 1698742362424L)); final static List orderList = Arrays.asList( new Order(11, 1002, 1084d, 1L), new Order(12, 1001, 84d, 10L), new Order(13, 1005, 369d, 2L)); public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tenv = StreamTableEnvironment.create(env); // order 实时流 事实表 // DataStream orderDs = env.socketTextStream("192.168.10.42", 9999) // .map(o -> { // String[] lines = o.split(","); // return new Order(Integer.valueOf(lines[0]), Integer.valueOf(lines[1]), Double.valueOf(lines[2]), Long.valueOf(lines[3])); // }) // .assignTimestampsAndWatermarks(WatermarkStrategy // . forBoundedOutOfOrderness(Duration.ofSeconds(10)) // .withTimestampAssigner((order, rTimeStamp) -> order.getEventTime())); DataStream orderDs = env.fromCollection(orderList) .assignTimestampsAndWatermarks(WatermarkStrategy . forBoundedOutOfOrderness(Duration.ofSeconds(10)) .withTimestampAssigner((order, rTimeStamp) -> order.getEventTime())); // user 实时流 维度表 // DataStream userDs = env.socketTextStream("192.168.10.42", 8888) // .map(o -> { // String[] lines = o.split(","); // return new User(Integer.valueOf(lines[0]), lines[1], Double.valueOf(lines[2]), Integer.valueOf(lines[3]), lines[4], Long.valueOf(lines[3])); // }) // .assignTimestampsAndWatermarks(WatermarkStrategy // . forBoundedOutOfOrderness(Duration.ofSeconds(10)) // .withTimestampAssigner((user, rTimeStamp) -> user.getEventTime())); DataStream userDs = env.fromCollection(userList) .assignTimestampsAndWatermarks(WatermarkStrategy . forBoundedOutOfOrderness(Duration.ofSeconds(10)) .withTimestampAssigner((user, rTimeStamp) -> user.getEventTime())); // 转变为Table Table orderTable = tenv.fromDataStream(orderDs, $("id"), $("uId"), $("total"), $("order_eventTime").rowtime()); Table userTable = tenv.fromDataStream(userDs, $("id"), $("name"), $("balance"), $("age"), $("email"), $("user_eventTime").rowtime()); tenv.createTemporaryView("alan_orderTable", orderTable); tenv.createTemporaryView("alan_userTable", userTable); // 定义一个TemporalTableFunction TemporalTableFunction userDim = userTable.createTemporalTableFunction($("user_eventTime"), $("id")); // 注册表函数 tenv.registerFunction("alan_userDim", userDim); // String sql = "select o.* from alan_orderTable as o "; // String sql = "select u.* from alan_userTable as u "; // String sql = "select o.*,u.name from alan_orderTable as o , alan_userTable as u where o.uId = u.id"; String sql = "select o.*,u.name from alan_orderTable as o,Lateral table (alan_userDim(o.order_eventTime)) u where o.uId = u.id"; // 关联查询 Table result = tenv.sqlQuery(sql); // 打印输出 DataStream resultDs = tenv.toAppendStream(result, Row.class); resultDs.print(); // user 流数据(维度表) // userList // order 流数据 // orderList // 控制台输出 // 3> +I[12, 1001, 84.0, 1970-01-01T00:00:00.010, alan] env.execute(); } } 4、示例:将事实流与维表进行关联-Kafka Source的EventTime实现
1)、bean定义
package org.tablesql.join.bean; import java.io.Serializable; import lombok.Data; /* * @Author: alanchan * @LastEditors: alanchan * @Description: */ @Data public class CityInfo implements Serializable { private Integer cityId; private String cityName; private Long ts; }
package org.tablesql.join.bean; import java.io.Serializable; import lombok.Data; /* * @Author: alanchan * @LastEditors: alanchan * @Description: */ @Data public class UserInfo implements Serializable { private String userName; private Integer cityId; private Long ts; }
2)、序列化定义
package org.tablesql.join.bean; import java.io.IOException; import java.nio.charset.StandardCharsets; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.TypeReference; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; /* * @Author: alanchan * @LastEditors: alanchan * @Description: */ public class CityInfoSchema implements DeserializationSchema
{ @Override public CityInfo deserialize(byte[] message) throws IOException { String jsonStr = new String(message, StandardCharsets.UTF_8); CityInfo data = JSON.parseObject(jsonStr, new TypeReference () {}); return data; } @Override public boolean isEndOfStream(CityInfo nextElement) { return false; } @Override public TypeInformation getProducedType() { return TypeInformation.of(new TypeHint () { }); } } package org.tablesql.join.bean; import java.io.IOException; import java.nio.charset.StandardCharsets; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.TypeReference; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; /* * @Author: alanchan * @LastEditors: alanchan * @Description: */ public class UserInfoSchema implements DeserializationSchema
{ @Override public UserInfo deserialize(byte[] message) throws IOException { String jsonStr = new String(message, StandardCharsets.UTF_8); UserInfo data = JSON.parseObject(jsonStr, new TypeReference () {}); return data; } @Override public boolean isEndOfStream(UserInfo nextElement) { return false; } @Override public TypeInformation getProducedType() { return TypeInformation.of(new TypeHint () { }); } } 3)、实现
/* * @Author: alanchan * @LastEditors: alanchan * @Description: */ package org.tablesql.join; import static org.apache.flink.table.api.Expressions.$; import java.time.Duration; import java.util.Properties; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.functions.TemporalTableFunction; import org.apache.flink.types.Row; import org.tablesql.join.bean.CityInfo; import org.tablesql.join.bean.CityInfoSchema; import org.tablesql.join.bean.UserInfo; import org.tablesql.join.bean.UserInfoSchema; public class TestJoinDimByKafkaEventTimeDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // Kafka的ip和要消费的topic,//Kafka设置 Properties props = new Properties(); props.setProperty("bootstrap.servers", "192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092"); props.setProperty("group.id", "kafkatest"); // 读取用户信息Kafka FlinkKafkaConsumer
userConsumer = new FlinkKafkaConsumer ("user", new UserInfoSchema(),props); userConsumer.setStartFromEarliest(); userConsumer.assignTimestampsAndWatermarks(WatermarkStrategy . forBoundedOutOfOrderness(Duration.ofSeconds(0)) .withTimestampAssigner((user, rTimeStamp) -> user.getTs()) // 该句如果不加,则是默认为kafka的事件时间 ); // 读取城市维度信息Kafka FlinkKafkaConsumer cityConsumer = new FlinkKafkaConsumer ("city", new CityInfoSchema(), props); cityConsumer.setStartFromEarliest(); cityConsumer.assignTimestampsAndWatermarks(WatermarkStrategy . forBoundedOutOfOrderness(Duration.ofSeconds(0)) .withTimestampAssigner((city, rTimeStamp) -> city.getTs()) // 该句如果不加,则是默认为kafka的事件时间 ); Table userTable = tableEnv.fromDataStream(env.addSource(userConsumer), $("userName"), $("cityId"), $("ts").rowtime()); Table cityTable = tableEnv.fromDataStream(env.addSource(cityConsumer), $("cityId"), $("cityName"),$("ts").rowtime()); tableEnv.createTemporaryView("userTable", userTable); tableEnv.createTemporaryView("cityTable", cityTable); // 定义一个TemporalTableFunction TemporalTableFunction dimCity = cityTable.createTemporalTableFunction($("ts"), $("cityId")); // 注册表函数 // tableEnv.registerFunction("dimCity", dimCity); tableEnv.createTemporarySystemFunction("dimCity", dimCity); Table u = tableEnv.sqlQuery("select * from userTable"); // u.printSchema(); tableEnv.toAppendStream(u, Row.class).print("user流接收到:"); Table c = tableEnv.sqlQuery("select * from cityTable"); // c.printSchema(); tableEnv.toAppendStream(c, Row.class).print("city流接收到:"); // 关联查询 Table result = tableEnv .sqlQuery("select u.userName,u.cityId,d.cityName,u.ts " + "from userTable as u " + ", Lateral table (dimCity(u.ts)) d " + "where u.cityId=d.cityId"); // 打印输出 DataStream resultDs = tableEnv.toAppendStream(result, Row.class); resultDs.print("\t关联输出:"); // 用户信息格式: // {"userName":"user1","cityId":1,"ts":0} // {"userName":"user1","cityId":1,"ts":1} // {"userName":"user1","cityId":1,"ts":4} // {"userName":"user1","cityId":1,"ts":5} // {"userName":"user1","cityId":1,"ts":7} // {"userName":"user1","cityId":1,"ts":9} // {"userName":"user1","cityId":1,"ts":11} // kafka-console-producer.sh --broker-list server1:9092 --topic user // 城市维度格式: // {"cityId":1,"cityName":"nanjing","ts":15} // {"cityId":1,"cityName":"beijing","ts":1} // {"cityId":1,"cityName":"shanghai","ts":5} // {"cityId":1,"cityName":"shanghai","ts":7} // {"cityId":1,"cityName":"wuhan","ts":10} // kafka-console-producer.sh --broker-list server1:9092 --topic city // 输出 // city流接收到::6> +I[1, beijing, 1970-01-01T00:00:00.001] // user流接收到::6> +I[user1, 1, 1970-01-01T00:00:00.004] // city流接收到::6> +I[1, shanghai, 1970-01-01T00:00:00.005] // user流接收到::6> +I[user1, 1, 1970-01-01T00:00:00.005] // city流接收到::6> +I[1, shanghai, 1970-01-01T00:00:00.007] // user流接收到::6> +I[user1, 1, 1970-01-01T00:00:00.007] // city流接收到::6> +I[1, wuhan, 1970-01-01T00:00:00.010] // user流接收到::6> +I[user1, 1, 1970-01-01T00:00:00.009] // user流接收到::6> +I[user1, 1, 1970-01-01T00:00:00.011] // 关联输出::12> +I[user1, 1, beijing, 1970-01-01T00:00:00.001] // 关联输出::12> +I[user1, 1, beijing, 1970-01-01T00:00:00.004] // 关联输出::12> +I[user1, 1, shanghai, 1970-01-01T00:00:00.005] // 关联输出::12> +I[user1, 1, shanghai, 1970-01-01T00:00:00.007] // 关联输出::12> +I[user1, 1, shanghai, 1970-01-01T00:00:00.009] env.execute("joinDemo"); } } 以上,本文详细的介绍了Flink的维表join的6种方式,即静态数据、缓存、异步I/O、广播、时态表的3种方式。
- 维度流 user
- 事实流 order
猜你喜欢
- 3小时前073:vue+mapbox 加载here地图(影像瓦片图 v3版)
- 3小时前flex布局优化(两端对齐,从左至右)
- 3小时前【JaveWeb教程】(15) SpringBootWeb之 响应 详细代码示例讲解
- 3小时前【论文阅读笔记】4篇Disentangled representation learning用于图像分割的论文
- 3小时前ImageNet Classification with Deep Convolutional 论文笔记
- 3小时前部署YUM仓库及NFS共享存储
- 3小时前kafka服务器连接出现:[NetworkClient.java:935] [Producer clientId=producer-1] Node -1 disconnected原因分析
- 3小时前Log4j2 配置日志记录发送到 kafka 中
- 3小时前怎样查看kafka写数据送到topic是否成功
- 1小时前准备好了吗英文(准备好了吗英文咋说)
网友评论
- 搜索
- 最新文章
- 热门文章