上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

58、Flink维表的实战-6种实现方式维表的join

guduadmin64小时前

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列

    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列

    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列

    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列

    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列

    本部分和实际的运维、监控工作相关。

    二、Flink 示例专栏

    Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

    两专栏的所有文章入口点击:Flink 系列文章汇总索引


    文章目录

    • Flink 系列文章
    • 一、maven依赖及数据结构
      • 1、maven依赖
      • 2、数据结构
      • 3、数据源
      • 4、验证结果
      • 二、维表来源于初始化的静态数据
        • 1、说明
        • 2、示例:将事实流与维表进行关联
        • 三、维表来源于第三方数据源
          • 1、说明
          • 2、示例:将事实流与维表进行关联-通过缓存降低性能开销
          • 3、示例:将事实流与维表进行关联-通过Flink 的异步 I/O提高系统效率
            • 1)、redis 异步I/O实现
            • 2)、实现事实流与维度流join
            • 四、通过广播将维表数据传递到下游
              • 1、说明
              • 2、示例:将事实流与维表进行关联-通过Flink 的Broadcast
                • 1)、广播实现
                • 2)、实现事实流与维度流join
                • 五、通过Temporal table实现维表数据join
                  • 1、说明
                  • 2、示例:将事实流与维表进行关联-ProcessingTime实现
                  • 3、示例:将事实流与维表进行关联-EventTime实现
                  • 4、示例:将事实流与维表进行关联-Kafka Source的EventTime实现
                    • 1)、bean定义
                    • 2)、序列化定义
                    • 3)、实现

                      本文详细的介绍了Flink的维表join的6种方式,即静态数据、缓存、异步I/O、广播、时态表的3种方式。

                      如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

                      本文除了maven依赖外,没有其他依赖。

                      本文的示例中依赖环境有redis、kafka、netcat等。

                      一、maven依赖及数据结构

                      1、maven依赖

                      本文的所有示例均依赖本部分的pom.xml内容,可能针对下文中的某些示例存在过多的引入,根据自己的情况进行删减。

                      
                      	UTF-8
                      	UTF-8
                      	1.8
                      	1.8
                      	1.8
                      	2.12
                      	1.17.0
                      
                      
                      	
                      	
                      		org.apache.flink
                      		flink-clients
                      		${flink.version}
                      		provided
                      	
                      	
                      		org.apache.flink
                      		flink-java
                      		${flink.version}
                      		provided
                      	
                      	
                      		org.apache.flink
                      		flink-table-common
                      		${flink.version}
                      		provided
                      	
                      	
                      		org.apache.flink
                      		flink-streaming-java
                      		${flink.version}
                      	
                      	
                      		org.apache.flink
                      		flink-table-api-java-bridge
                      		${flink.version}
                      		provided
                      	
                      	
                      		org.apache.flink
                      		flink-csv
                      		${flink.version}
                      		provided
                      	
                      	
                      		org.apache.flink
                      		flink-json
                      		${flink.version}
                      		provided
                      	
                      	
                      	
                      		org.apache.flink
                      		flink-table-planner_2.12
                      		${flink.version}
                      		provided
                      	
                      	
                      	
                      		org.apache.flink
                      		flink-table-api-java-uber
                      		${flink.version}
                      		provided
                      	
                      	
                      	
                      		org.apache.flink
                      		flink-table-runtime
                      		${flink.version}
                      		provided
                      	
                      	
                      		org.apache.flink
                      		flink-connector-jdbc
                      		3.1.0-1.17
                      	
                      	
                      		mysql
                      		mysql-connector-java
                      		5.1.38
                      	
                      	
                      		com.google.guava
                      		guava
                      		32.0.1-jre
                      	
                      	
                      	
                      	
                      		org.apache.flink
                      		flink-connector-kafka
                      		${flink.version}
                      	
                      	
                      	
                      		org.apache.flink
                      		flink-sql-connector-kafka
                      		${flink.version}
                      		provided
                      	
                      	
                      	
                      		org.apache.commons
                      		commons-compress
                      		1.24.0
                      	
                      	
                      		org.projectlombok
                      		lombok
                      		1.18.2
                      	
                      	
                      		org.apache.bahir
                      		flink-connector-redis_2.12
                      		1.1.0
                      		
                      			
                      				flink-streaming-java_2.12
                      				org.apache.flink
                      			
                      			
                      				flink-runtime_2.12
                      				org.apache.flink
                      			
                      			
                      				flink-core
                      				org.apache.flink
                      			
                      			
                      				flink-java
                      				org.apache.flink
                      			
                      			
                      				org.apache.flink
                      				flink-table-api-java
                      			
                      			
                      				org.apache.flink
                      				flink-table-api-java-bridge_2.12
                      			
                      			
                      				org.apache.flink
                      				flink-table-common
                      			
                      			
                      				org.apache.flink
                      				flink-table-planner_2.12
                      			
                      		
                      	
                      	
                      	
                      		com.alibaba
                      		fastjson
                      		2.0.43
                      	
                      
                      

                      2、数据结构

                      本示例仅仅为实现需求:将订单中uId与用户id进行关联,然后输出Tuple2

                      • 事实流 order
                            // 事实表
                            @Data
                            @NoArgsConstructor
                            @AllArgsConstructor
                            static class Order {
                                private Integer id;
                                private Integer uId;
                                private Double total;
                            }
                        
                        • 维度流 user
                              // 维表
                              @Data
                              @NoArgsConstructor
                              @AllArgsConstructor
                              static class User {
                                  private Integer id;
                                  private String name;
                                  private Double balance;
                                  private Integer age;
                                  private String email;
                              }
                          

                          3、数据源

                          事实流数据有几种,具体见示例部分,比如socket、redis、kafka等

                          维度表流有几种,具体见示例部分,比如静态数据、mysql、socket、kafka等。

                          如此,实现本文中的示例就需要准备好相应的环境,即mysql、redis、kafka、netcat等。

                          4、验证结果

                          本文提供的所有示例均为验证通过的示例,测试的数据均在每个示例中,分为事实流、维度流和运行结果进行注释,在具体的示例中关于验证不再赘述。

                          二、维表来源于初始化的静态数据

                          1、说明

                          通过定义一个类实现RichMapFunction,在open()中读取维表数据加载到内存中,在事实流map()方法中与维表数据进行关联。

                          由于数据存储于内存中,所以只适合小数据量并且维表数据更新频率不高的情况下使用。虽然可以在open中定义一个定时器定时更新维表,但是还是存在维表更新不及时的情况或资源开销较大的情况。一般如果数据量较小且不大会变(或变化影响也不大)的情况下,理想选择之一。

                          2、示例:将事实流与维表进行关联

                          import java.util.HashMap;
                          import java.util.Map;
                          import org.apache.flink.api.common.functions.RichMapFunction;
                          import org.apache.flink.api.java.tuple.Tuple2;
                          import org.apache.flink.configuration.Configuration;
                          import org.apache.flink.streaming.api.datastream.DataStream;
                          import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
                          import lombok.AllArgsConstructor;
                          import lombok.Data;
                          import lombok.NoArgsConstructor;
                          /*
                           * @Author: alanchan
                           * @LastEditors: alanchan
                           * @Description: 采用在RichMapfunction类的open方法中将维表数据加载到内存
                           */
                          public class TestJoinDimFromStaticDataDemo {
                              // 维表
                              @Data
                              @NoArgsConstructor
                              @AllArgsConstructor
                              static class User {
                                  private Integer id;
                                  private String name;
                                  private Double balance;
                                  private Integer age;
                                  private String email;
                              }
                              // 事实表
                              @Data
                              @NoArgsConstructor
                              @AllArgsConstructor
                              static class Order {
                                  private Integer id;
                                  private Integer uId;
                                  private Double total;
                              }
                              public static void main(String[] args) throws Exception {
                                  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                                  // order 事实流
                                  DataStream orderDs = env.socketTextStream("192.168.10.42", 9999)
                                          .map(o -> {
                                              String[] lines = o.split(",");
                                              return new Order(Integer.valueOf(lines[0]), Integer.valueOf(lines[1]), Double.valueOf(lines[2]));
                                          });
                                  DataStream> result = orderDs.map(new RichMapFunction>() {
                                      Map userDim = null;
                                      // 维表-静态数据,本处使用的是匿名内部类实现的
                                      @Override
                                      public void open(Configuration parameters) throws Exception {
                                          userDim = new HashMap<>();
                                          userDim.put(1001, new User(1001, "alan", 20d, 18, "alan.chan.chn@163.com"));
                                          userDim.put(1002, new User(1002, "alanchan", 22d, 20, "alan.chan.chn@163.com"));
                                          userDim.put(1003, new User(1003, "alanchanchn", 23d, 22, "alan.chan.chn@163.com"));
                                          userDim.put(1004, new User(1004, "alan_chan", 21d, 19, "alan.chan.chn@163.com"));
                                          userDim.put(1005, new User(1005, "alan_chan_chn", 23d, 21, "alan.chan.chn@163.com"));
                                      }
                                      @Override
                                      public Tuple2 map(Order value) throws Exception {
                                          return new Tuple2(value, userDim.get(value.getUId()).getName());
                                      }
                                  });
                                  result.print();
                                  // nc 输入
                                  // 1,1004,345
                                  // 2,1001,678
                                  
                                  // 控制台输出
                                  // 2> (TestJoinDimFromStaticData.Order(id=1, uId=1004, total=345.0),alan_chan)
                                  // 3> (TestJoinDimFromStaticData.Order(id=2, uId=1001, total=678.0),alan)
                                  env.execute("TestJoinDimFromStaticData");
                              }
                          }
                          

                          三、维表来源于第三方数据源

                          1、说明

                          这种方式是将维表数据存储在Redis、HBase、MySQL等外部存储中,事实流在关联维表数据的时候实时去外部存储中查询。

                          由于维度数据量不受内存限制,可以存储很大的数据量。同时维表数据来源于第三方数据源,读取速度受制于外部存储的读取速度。一般常见的做法该种方式较多。

                          2、示例:将事实流与维表进行关联-通过缓存降低性能开销

                          如果频繁的访问第三方数据源进行join,会带来很大的开销,为降低该种情况的开销,一般使用cache来减轻访问压力,但该种方式存在数据同步的不一致或延迟情况。如果使用缓存,则会存在将数据存在内存中,也会增加系统开销。该种情况的实际应用以具体的业务场景而定。本示例使用的是guava Cache,缓存的实现有很多种方式,具体以自己的实际情况进行选择。

                          本示例的数据源仅仅以静态的数据进行展示,实际上可能数据来源于Hbase、mysql等。

                          import java.util.HashMap;
                          import java.util.Map;
                          import java.util.concurrent.TimeUnit;
                          import com.google.common.cache.CacheBuilder;
                          import com.google.common.cache.CacheLoader;
                          import com.google.common.cache.LoadingCache;
                          import com.google.common.cache.RemovalListener;
                          import com.google.common.cache.RemovalNotification;
                          import org.apache.flink.api.common.functions.RichMapFunction;
                          import org.apache.flink.api.java.tuple.Tuple2;
                          import org.apache.flink.configuration.Configuration;
                          import org.apache.flink.streaming.api.datastream.DataStream;
                          import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
                          import lombok.AllArgsConstructor;
                          import lombok.Data;
                          import lombok.NoArgsConstructor;
                          /*
                           * @Author: alanchan
                           * @LastEditors: alanchan
                           * @Description: 
                           */
                          public class TestJoinDimFromCacheDataDemo {
                              // 维表
                              @Data
                              @NoArgsConstructor
                              @AllArgsConstructor
                              static class User {
                                  private Integer id;
                                  private String name;
                                  private Double balance;
                                  private Integer age;
                                  private String email;
                              }
                              // 事实表
                              @Data
                              @NoArgsConstructor
                              @AllArgsConstructor
                              static class Order {
                                  private Integer id;
                                  private Integer uId;
                                  private Double total;
                              }
                              public static void main(String[] args) throws Exception {
                                  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                                  // order 实时流
                                  DataStream orderDs = env.socketTextStream("192.168.10.42", 9999)
                                          .map(o -> {
                                              String[] lines = o.split(",");
                                              return new Order(Integer.valueOf(lines[0]), Integer.valueOf(lines[1]), Double.valueOf(lines[2]));
                                          });
                                  // user 维表
                                  DataStream> result = orderDs.map(new RichMapFunction>() {
                                      // 缓存接口这里是LoadingCache,LoadingCache在缓存项不存在时可以自动加载缓存
                                      LoadingCache userDim;
                                      @Override
                                      public void open(Configuration parameters) throws Exception {
                                          // 使用google LoadingCache来进行缓存
                                          // CacheBuilder的构造函数是私有的,只能通过其静态方法newBuilder()来获得CacheBuilder的实例
                                          userDim = CacheBuilder.newBuilder()
                                                  // 设置并发级别为8,并发级别是指可以同时写缓存的线程数
                                                  .concurrencyLevel(8)
                                                  // 最多缓存个数,超过了就根据最近最少使用算法来移除缓存
                                                  .maximumSize(1000)
                                                  // 设置写缓存后10分钟过期
                                                  .expireAfterWrite(10, TimeUnit.MINUTES)
                                                  // 设置缓存容器的初始容量为10
                                                  .initialCapacity(10)
                                                  // 设置要统计缓存的命中率
                                                  .recordStats()
                                                  // 指定移除通知
                                                  .removalListener(new RemovalListener() {
                                                      @Override
                                                      public void onRemoval(RemovalNotification removalNotification) {
                                                          System.out.println(removalNotification.getKey() + "被移除了,值为:" + removalNotification.getValue());
                                                      }
                                                  })
                                                  .build(
                                                          // 指定加载缓存的逻辑
                                                          new CacheLoader() {
                                                              @Override
                                                              public User load(Integer uId) throws Exception {
                                                                  return dataSource(uId);
                                                              }
                                                          });
                                          System.out.println("userDim:" + userDim.get(1002));
                                      }
                                      private User dataSource(Integer uId) {
                                          // 可以是任何数据源,本处仅仅示例
                                          Map users = new HashMap<>();
                                          users.put(1001, new User(1001, "alan", 20d, 18, "alan.chan.chn@163.com"));
                                          users.put(1002, new User(1002, "alanchan", 22d, 20, "alan.chan.chn@163.com"));
                                          users.put(1003, new User(1003, "alanchanchn", 23d, 22, "alan.chan.chn@163.com"));
                                          users.put(1004, new User(1004, "alan_chan", 21d, 19, "alan.chan.chn@163.com"));
                                          users.put(1005, new User(1005, "alan_chan_chn", 23d, 21, "alan.chan.chn@163.com"));
                                          User user = null;
                                          if (users.containsKey(uId)) {
                                              user = users.get(uId);
                                          }
                                          return user;
                                      }
                                      @Override
                                      public Tuple2 map(Order value) throws Exception {
                                          return new Tuple2(value, userDim.get(value.getUId()).getName());
                                      }
                                  });
                                  result.print();
                                  // 输入数据
                                  // 7,1003,111
                                  // 8,1005,234
                                  // 9,1002,875
                                  // 控制台输出数据
                                  // 5> (TestJoinDimFromCacheDataDemo.Order(id=7, uId=1003, total=111.0),alanchanchn)
                                  // 6> (TestJoinDimFromCacheDataDemo.Order(id=8, uId=1005,  total=234.0),alan_chan_chn)
                                  // 7> (TestJoinDimFromCacheDataDemo.Order(id=9, uId=1002, total=875.0),alanchan)
                                  env.execute("TestJoinDimFromCacheDataDemo");
                              }
                          }
                          

                          3、示例:将事实流与维表进行关联-通过Flink 的异步 I/O提高系统效率

                          Flink与外部存储系统进行读写操作的时候可以使用同步方式,也就是发送一个请求后等待外部系统响应,然后再发送第二个读写请求,这样的方式吞吐量比较低,可以用提高并行度的方式来提高吞吐量,但是并行度多了也就导致了进程数量多了,占用了大量的资源。

                          Flink中可以使用异步IO来读写外部系统,这要求外部系统客户端支持异步IO,比如redis、MongoDB等。

                          更多内容见文章:

                          55、Flink之用于外部数据访问的异步 I/O介绍及示例

                          1)、redis 异步I/O实现

                          package org.tablesql.join;
                          import java.util.ArrayList;
                          import java.util.Collections;
                          import java.util.List;
                          import java.util.concurrent.CompletableFuture;
                          import java.util.function.Supplier;
                          import org.apache.flink.api.java.tuple.Tuple2;
                          import org.apache.flink.configuration.Configuration;
                          import org.apache.flink.streaming.api.functions.async.ResultFuture;
                          import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
                          import org.tablesql.join.TestJoinDimFromAsyncDataStreamDemo.Order;
                          import redis.clients.jedis.Jedis;
                          import redis.clients.jedis.JedisPool;
                          import redis.clients.jedis.JedisPoolConfig;
                          /*
                           * @Author: alanchan
                           * @LastEditors: alanchan
                           * @Description: 
                           */
                          public class JoinAyncFunctionByRedis extends RichAsyncFunction> {
                              private JedisPoolConfig config = null;
                              private static String ADDR = "192.168.10.41";
                              private static int PORT = 6379;
                              private static int TIMEOUT = 10000;
                              private JedisPool jedisPool = null;
                              private Jedis jedis = null;
                              @Override
                              public void open(Configuration parameters) throws Exception {
                                  super.open(parameters);
                                  config = new JedisPoolConfig();
                                  jedisPool = new JedisPool(config, ADDR, PORT, TIMEOUT);
                                  jedis = jedisPool.getResource();
                              }
                              @Override
                              public void asyncInvoke(Order input, ResultFuture> resultFuture) throws Exception {
                                  // order 实时流中的单行数据
                                  System.out.println("输入参数input----:" + input);
                                  // 发起一个异步请求,返回结果
                                  CompletableFuture.supplyAsync(new Supplier() {
                                      @Override
                                      public String get() {
                                          // 数据格式:1002,alanchan,19,25,alan.chan.chn@163.com
                                          String userLine = jedis.hget("AsyncReadUserById_Redis", input.getUId() + "");
                                          String[] userTemp = userLine.split(",");
                                          // 返回 用户名
                                          return userTemp[1];
                                      }
                                  }).thenAccept((String dbResult) -> {
                                      // 设置请求完成时的回调,将结果返回
                                      List list = new ArrayList>();
                                      list.add(new Tuple2<>(input, dbResult));
                                      resultFuture.complete(list);
                                  });
                              }
                              // 连接超时的时候调用的方法
                              public void timeout(Order input, ResultFuture> resultFuture)
                                      throws Exception {
                                  List list = new ArrayList>();
                                  // 数据源超时,不能获取到维表信息,置为"
                                  list.add(new Tuple2<>(input, ""));
                                  resultFuture.complete(list);
                              }
                              @Override
                              public void close() throws Exception {
                                  super.close();
                                  if (jedis.isConnected()) {
                                      jedis.close();
                                  }
                              }
                          }
                          

                          2)、实现事实流与维度流join

                          package org.tablesql.join;
                          import java.util.concurrent.TimeUnit;
                          import org.apache.flink.api.java.tuple.Tuple2;
                          import org.apache.flink.streaming.api.datastream.AsyncDataStream;
                          import org.apache.flink.streaming.api.datastream.DataStream;
                          import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
                          import lombok.AllArgsConstructor;
                          import lombok.Data;
                          import lombok.NoArgsConstructor;
                          /*
                           * @Author: alanchan
                           * @LastEditors: alanchan
                           * @Description: 
                           */
                          public class TestJoinDimFromAsyncDataStreamDemo {
                              // 维表
                              @Data
                              @NoArgsConstructor
                              @AllArgsConstructor
                              static class User {
                                  private Integer id;
                                  private String name;
                                  private Double balance;
                                  private Integer age;
                                  private String email;
                              }
                              // 事实表
                              @Data
                              @NoArgsConstructor
                              @AllArgsConstructor
                              static class Order {
                                  private Integer id;
                                  private Integer uId;
                                  private Double total;
                              }
                              public static void main(String[] args) throws Exception {
                                  testJoinAyncFunctionByRedis();
                              }
                              static void testJoinAyncFunctionByRedis() throws Exception {
                                  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                                  // order 实时流
                                  DataStream orderDs = env.socketTextStream("192.168.10.42", 9999)
                                          .map(o -> {
                                              String[] lines = o.split(",");
                                              return new Order(Integer.valueOf(lines[0]), Integer.valueOf(lines[1]), Double.valueOf(lines[2]));
                                          });
                                  // 保证顺序:异步返回的结果保证顺序,超时时间1秒,最大容量2,超出容量触发反压
                                  DataStream> result = AsyncDataStream.orderedWait(orderDs, new JoinAyncFunctionByRedis(),
                                          1000L, TimeUnit.MILLISECONDS, 2);
                                  result.print("result:");
                                  // 允许乱序:异步返回的结果允许乱序,超时时间1秒,最大容量2,超出容量触发反压
                                  DataStream> unorderedResult = AsyncDataStream
                                          .unorderedWait(orderDs, new JoinAyncFunctionByRedis(), 1000L, TimeUnit.MILLISECONDS, 2)
                                          .setParallelism(1);
                                  unorderedResult.print("unorderedResult");
                                  
                                  // redis的操作命令及数据
                                  // 127.0.0.1:6379> hset AsyncReadUserById_Redis 1001 '1001,alan,18,20,alan.chan.chn@163.com'
                                  // (integer) 1
                                  // 127.0.0.1:6379> hset AsyncReadUserById_Redis 1002 '1002,alanchan,19,25,alan.chan.chn@163.com'
                                  // (integer) 1
                                  // 127.0.0.1:6379> hset AsyncReadUserById_Redis 1003 '1003,alanchanchn,20,30,alan.chan.chn@163.com'
                                  // (integer) 1
                                  // 127.0.0.1:6379> hset AsyncReadUserById_Redis 1004 '1004,alan_chan,27,20,alan.chan.chn@163.com'
                                  // (integer) 1
                                  // 127.0.0.1:6379> hset AsyncReadUserById_Redis 1005 '1005,alan_chan_chn,36,10,alan.chan.chn@163.com'
                                  // (integer) 1
                                  // 127.0.0.1:6379> hgetall AsyncReadUserById_Redis
                                  // 1) "1001"
                                  // 2) "1001,alan,18,20,alan.chan.chn@163.com"
                                  // 3) "1002"
                                  // 4) "1002,alanchan,19,25,alan.chan.chn@163.com"
                                  // 5) "1003"
                                  // 6) "1003,alanchanchn,20,30,alan.chan.chn@163.com"
                                  // 7) "1004"
                                  // 8) "1004,alan_chan,27,20,alan.chan.chn@163.com"
                                  // 9) "1005"
                                  // 10) "1005,alan_chan_chn,36,10,alan.chan.chn@163.com"
                                  
                                  // 输入数据
                                  // 13,1002,811
                                  // 14,1004,834
                                  // 15,1005,975
                                  // 控制台输出数据
                                  // 输入参数input----:TestJoinDimFromAsyncDataStreamDemo.Order(id=13, uId=1002, total=811.0)
                                  // result::12> (TestJoinDimFromAsyncDataStreamDemo.Order(id=13, uId=1002, total=811.0),1002,alanchan,19,25,alan.chan.chn@163.com)
                                  // 输入参数input----:TestJoinDimFromAsyncDataStreamDemo.Order(id=13, uId=1002, total=811.0)
                                  // unorderedResult:9> (TestJoinDimFromAsyncDataStreamDemo.Order(id=13, uId=1002, total=811.0),1002,alanchan,19,25,alan.chan.chn@163.com)
                                  // result::5> (TestJoinDimFromAsyncDataStreamDemo.Order(id=14, uId=1004, total=834.0),alan_chan)
                                  // 输入参数input----:TestJoinDimFromAsyncDataStreamDemo.Order(id=14, uId=1004, total=834.0)
                                  // unorderedResult:2> (TestJoinDimFromAsyncDataStreamDemo.Order(id=14, uId=1004, total=834.0),alan_chan)
                                  // 输入参数input----:TestJoinDimFromAsyncDataStreamDemo.Order(id=15, uId=1005, total=975.0)
                                  // result::6> (TestJoinDimFromAsyncDataStreamDemo.Order(id=15, uId=1005, total=975.0),alan_chan_chn)
                                  // 输入参数input----:TestJoinDimFromAsyncDataStreamDemo.Order(id=15, uId=1005, total=975.0)
                                  // unorderedResult:3> (TestJoinDimFromAsyncDataStreamDemo.Order(id=15, uId=1005, total=975.0),alan_chan_chn)
                                  env.execute("TestJoinDimFromAsyncDataStreamDemo");
                              }
                          }
                          

                          四、通过广播将维表数据传递到下游

                          1、说明

                          利用Flink的Broadcast State将维表数据流广播到下游做join操作。该种方式实现比较方便,完全满足需求,美中不足的是需要充分利用系统的内存,也就是将数据存储在内容中。

                          更多内容见文章:

                          53、Flink 的Broadcast State 模式介绍及示例

                          2、示例:将事实流与维表进行关联-通过Flink 的Broadcast

                          1)、广播实现

                          /*
                           * @Author: alanchan
                           * @LastEditors: alanchan
                           * @Description: 
                           */
                          package org.tablesql.join;
                          import org.apache.flink.api.common.state.MapStateDescriptor;
                          import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
                          import org.apache.flink.api.java.tuple.Tuple2;
                          import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
                          import org.apache.flink.util.Collector;
                          import org.tablesql.join.TestJoinDimFromBroadcastDataStreamDemo.Order;
                          import org.tablesql.join.TestJoinDimFromBroadcastDataStreamDemo.User;
                          // final BroadcastProcessFunction function)
                          public class JoinBroadcastProcessFunctionImpl extends BroadcastProcessFunction> {
                              // 用于存储规则名称与规则本身的 map 存储结构 
                              MapStateDescriptor broadcastDesc;
                              JoinBroadcastProcessFunctionImpl(MapStateDescriptor broadcastDesc) {
                                  this.broadcastDesc = broadcastDesc;
                              }
                              // 负责处理广播流的元素
                              @Override
                              public void processBroadcastElement(User value,
                                      BroadcastProcessFunction>.Context ctx,
                                      Collector> out) throws Exception {
                                  System.out.println("收到广播数据:" + value);
                                  // 得到广播流的存储状态
                                  ctx.getBroadcastState(broadcastDesc).put(value.getId(), value);
                              }
                              // 处理非广播流,关联维度
                              @Override
                              public void processElement(Order value,
                                      BroadcastProcessFunction>.ReadOnlyContext ctx,
                                      Collector> out) throws Exception {
                                  // 得到广播流的存储状态
                                  ReadOnlyBroadcastState state = ctx.getBroadcastState(broadcastDesc);
                                  out.collect(new Tuple2<>(value, state.get(value.getUId()).getName()));
                              }
                          }
                          

                          2)、实现事实流与维度流join

                          /*
                           * @Author: alanchan
                           * @LastEditors: alanchan
                           * @Description: 
                           */
                          package org.tablesql.join;
                          import org.apache.flink.api.common.state.MapStateDescriptor;
                          import org.apache.flink.streaming.api.datastream.BroadcastStream;
                          import org.apache.flink.streaming.api.datastream.DataStream;
                          import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
                          import lombok.AllArgsConstructor;
                          import lombok.Data;
                          import lombok.NoArgsConstructor;
                          public class TestJoinDimFromBroadcastDataStreamDemo {
                              // 维表
                              @Data
                              @NoArgsConstructor
                              @AllArgsConstructor
                              static class User {
                                  private Integer id;
                                  private String name;
                                  private Double balance;
                                  private Integer age;
                                  private String email;
                              }
                              // 事实表
                              @Data
                              @NoArgsConstructor
                              @AllArgsConstructor
                              static class Order {
                                  private Integer id;
                                  private Integer uId;
                                  private Double total;
                              }
                              public static void main(String[] args) throws Exception {
                                  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                                  // order 实时流
                                  DataStream orderDs = env.socketTextStream("192.168.10.42", 9999)
                                          .map(o -> {
                                              String[] lines = o.split(",");
                                              return new Order(Integer.valueOf(lines[0]), Integer.valueOf(lines[1]), Double.valueOf(lines[2]));
                                          });
                                  // user 实时流
                                  DataStream userDs = env.socketTextStream("192.168.10.42", 8888)
                                          .map(o -> {
                                              String[] lines = o.split(",");
                                              return new User(Integer.valueOf(lines[0]), lines[1], Double.valueOf(lines[2]), Integer.valueOf(lines[3]), lines[4]);
                                          }).setParallelism(1);
                                          
                                  // 一个 map descriptor,它描述了用于存储规则名称与规则本身的 map 存储结构
                                  // MapStateDescriptor ruleStateDescriptor = new MapStateDescriptor<>(
                                  //         "RulesBroadcastState",
                                  //         BasicTypeInfo.STRING_TYPE_INFO,
                                  //         TypeInformation.of(new TypeHint() {
                                  //         }));
                                  // 广播流,广播规则并且创建 broadcast state
                                  // BroadcastStream ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor);
                                  // 将user流(维表)定义为广播流
                                  final MapStateDescriptor broadcastDesc = new MapStateDescriptor("Alan_RulesBroadcastState",
                                          Integer.class,
                                          User.class);
                                  BroadcastStream broadcastStream = userDs.broadcast(broadcastDesc);
                                  // 需要由非广播流来进行调用
                                  DataStream result = orderDs.connect(broadcastStream)
                                          .process(new JoinBroadcastProcessFunctionImpl(broadcastDesc));
                                  result.print();
                                  // user 流数据(维度表),由于未做容错处理,需要先广播维度数据,否则会出现空指针异常
                                  // 1001,alan,18,20,alan.chan.chn@163.com
                                  // 1002,alanchan,19,25,alan.chan.chn@163.com
                                  // 1003,alanchanchn,20,30,alan.chan.chn@163.com
                                  // 1004,alan_chan,27,20,alan.chan.chn@163.com
                                  // 1005,alan_chan_chn,36,10,alan.chan.chn@163.com
                                  // order 流数据
                                  // 16,1002,211
                                  // 17,1004,234
                                  // 18,1005,175
                                  
                                  // 控制台输出
                                  // 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1001, name=alan, balance=18.0, age=20, email=alan.chan.chn@163.com)
                                  // ......
                                  // 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1001, name=alan, balance=18.0, age=20, email=alan.chan.chn@163.com)
                                  // 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1002, name=alanchan, balance=19.0, age=25, email=alan.chan.chn@163.com)
                                  // ......
                                  // 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1002, name=alanchan, balance=19.0, age=25, email=alan.chan.chn@163.com)
                                  // 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1003, name=alanchanchn, balance=20.0, age=30, email=alan.chan.chn@163.com)
                                  // ......
                                  // 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1003, name=alanchanchn, balance=20.0, age=30, email=alan.chan.chn@163.com)
                                  // 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1004, name=alan_chan, balance=27.0, age=20, email=alan.chan.chn@163.com)
                                  // ......
                                  // 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1004, name=alan_chan, balance=27.0, age=20, email=alan.chan.chn@163.com)
                                  // 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1005, name=alan_chan_chn, balance=36.0, age=10, email=alan.chan.chn@163.com)
                                  // ......
                                  // 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1005, name=alan_chan_chn, balance=36.0, age=10, email=alan.chan.chn@163.com)
                                  // 7> (TestJoinDimFromBroadcastDataStreamDemo.Order(id=16, uId=1002, total=211.0),alanchan)
                                  // 8> (TestJoinDimFromBroadcastDataStreamDemo.Order(id=17, uId=1004, total=234.0),alan_chan)
                                  // 9> (TestJoinDimFromBroadcastDataStreamDemo.Order(id=18, uId=1005, total=175.0),alan_chan_chn)
                                  env.execute();
                              }
                           }
                          

                          五、通过Temporal table实现维表数据join

                          1、说明

                          Temporal table是持续变化表上某一时刻的视图,Temporal table function是一个表函数,传递一个时间参数,返回Temporal table这一指定时刻的视图。可以将维度数据流映射为Temporal table,事实流与这个Temporal table进行join,可以关联到某一个版本视图的维度数据。

                          该种方式维度数据量可以很大,维表数据实时更新,不依赖于第三方存储,并且提供不同版本的维表数据(应对维表信息更新)。截至版本Flink 1.17该种方式只能在Flink SQL API中使用。

                          关于时间参数,flink有三个时间,即eventtime、processingtime和injectiontime,常用的是eventtime和processingtime,本文介绍其实现方式。关于eventtime的实现,kafka与其他的数据源还有不同,本文单独介绍一下kafka的实现方式。

                          2、示例:将事实流与维表进行关联-ProcessingTime实现

                          package org.tablesql.join;
                          import static org.apache.flink.table.api.Expressions.$;
                          import org.apache.flink.streaming.api.datastream.DataStream;
                          import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
                          import org.apache.flink.table.api.Table;
                          import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
                          import org.apache.flink.table.functions.TemporalTableFunction;
                          import org.apache.flink.types.Row;
                          import lombok.AllArgsConstructor;
                          import lombok.Data;
                          import lombok.NoArgsConstructor;
                          /*
                           * @Author: alanchan
                           * @LastEditors: alanchan
                           * @Description: 基于处理时间的时态表
                           */
                          public class TestJoinDimByProcessingTimeDemo {
                              // 维表
                              @Data
                              @NoArgsConstructor
                              @AllArgsConstructor
                              public static class User {
                                  private Integer id;
                                  private String name;
                                  private Double balance;
                                  private Integer age;
                                  private String email;
                              }
                              // 事实表
                              @Data
                              @NoArgsConstructor
                              @AllArgsConstructor
                              public static class Order {
                                  private Integer id;
                                  private Integer uId;
                                  private Double total;
                              }
                              public static void main(String[] args) throws Exception {
                                  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                                  StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
                                  // order 实时流 事实表
                                  DataStream orderDs = env.socketTextStream("192.168.10.42", 9999)
                                          .map(o -> {
                                              String[] lines = o.split(",");
                                              return new Order(Integer.valueOf(lines[0]), Integer.valueOf(lines[1]), Double.valueOf(lines[2]));
                                          });
                                  // user 实时流 维度表
                                  DataStream userDs = env.socketTextStream("192.168.10.42", 8888)
                                          .map(o -> {
                                              String[] lines = o.split(",");
                                              return new User(Integer.valueOf(lines[0]), lines[1], Double.valueOf(lines[2]),
                                                      Integer.valueOf(lines[3]), lines[4]);
                                          }).setParallelism(1);
                                  // 转变为Table
                                  Table orderTable = tenv.fromDataStream(orderDs, $("id"), $("uId"), $("total"), $("order_ps").proctime());
                                  Table userTable = tenv.fromDataStream(userDs, $("id"), $("name"), $("balance"), $("age"), $("email"),
                                          $("user_ps").proctime());
                                  // 定义一个TemporalTableFunction
                                  TemporalTableFunction userDim = userTable.createTemporalTableFunction($("user_ps"), $("id"));
                                  // 注册表函数
                                  tenv.registerFunction("alan_userDim", userDim);
                                  // 关联查询
                                  Table result = tenv
                                          .sqlQuery("select o.* , u.name from " + orderTable + " as o  , Lateral table (alan_userDim(o.order_ps)) u " +
                                                  "where o.uId = u.id");
                                  // 打印输出
                                  DataStream resultDs = tenv.toAppendStream(result, Row.class);
                                  resultDs.print();
                                  // user 流数据(维度表)
                                  // 1001,alan,18,20,alan.chan.chn@163.com
                                  // 1002,alanchan,19,25,alan.chan.chn@163.com
                                  // 1003,alanchanchn,20,30,alan.chan.chn@163.com
                                  // 1004,alan_chan,27,20,alan.chan.chn@163.com
                                  // 1005,alan_chan_chn,36,10,alan.chan.chn@163.com
                                  // order 流数据
                                  // 26,1002,311
                                  // 27,1004,334
                                  // 28,1005,475
                                  // 控制台输出
                                  // 15> +I[26, 1002, 311.0, 2023-12-20T05:21:12.977Z, alanchan]
                                  // 11> +I[27, 1004, 334.0, 2023-12-20T05:21:50.898Z, alan_chan]
                                  // 5> +I[28, 1005, 475.0, 2023-12-20T05:21:57.559Z, alan_chan_chn]
                                  env.execute();
                              }
                          }
                          

                          3、示例:将事实流与维表进行关联-EventTime实现

                          /*
                           * @Author: alanchan
                           * @LastEditors: alanchan
                           * @Description: 
                           */
                          package org.tablesql.join;
                          import static org.apache.flink.table.api.Expressions.$;
                          import java.time.Duration;
                          import java.util.Arrays;
                          import java.util.List;
                          import org.apache.flink.api.common.eventtime.WatermarkStrategy;
                          import org.apache.flink.streaming.api.datastream.DataStream;
                          import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
                          import org.apache.flink.table.api.Table;
                          import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
                          import org.apache.flink.table.functions.TemporalTableFunction;
                          import org.apache.flink.types.Row;
                          import lombok.AllArgsConstructor;
                          import lombok.Data;
                          import lombok.NoArgsConstructor;
                          public class TestjoinDimByEventTimeDemo {
                              // 维表
                              @Data
                              @NoArgsConstructor
                              @AllArgsConstructor
                              public static class User {
                                  private Integer id;
                                  private String name;
                                  private Double balance;
                                  private Integer age;
                                  private String email;
                                  private Long eventTime;
                              }
                              // 事实表
                              @Data
                              @NoArgsConstructor
                              @AllArgsConstructor
                              public static class Order {
                                  private Integer id;
                                  private Integer uId;
                                  private Double total;
                                  private Long eventTime;
                              }
                              final static List userList = Arrays.asList(
                                      new User(1001, "alan", 20d, 18, "alan.chan.chn@163.com", 1L),
                                      new User(1002, "alan", 30d, 19, "alan.chan.chn@163.com", 10L),
                                      new User(1003, "alan", 29d, 25, "alan.chan.chn@163.com", 1L),
                                      new User(1004, "alanchan", 22d, 28, "alan.chan.chn@163.com", 5L),
                                      new User(1005, "alanchan", 50d, 29, "alan.chan.chn@163.com", 1698742362424L));
                              final static List orderList = Arrays.asList(
                                      new Order(11, 1002, 1084d, 1L),
                                      new Order(12, 1001, 84d, 10L),
                                      new Order(13, 1005, 369d, 2L));
                              public static void main(String[] args) throws Exception {
                                  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                                  StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
                                  // order 实时流 事实表
                                  // DataStream orderDs = env.socketTextStream("192.168.10.42", 9999)
                                  //         .map(o -> {
                                  //             String[] lines = o.split(",");
                                  //             return new Order(Integer.valueOf(lines[0]), Integer.valueOf(lines[1]), Double.valueOf(lines[2]), Long.valueOf(lines[3]));
                                  //         })
                                  //         .assignTimestampsAndWatermarks(WatermarkStrategy
                                  //                 .forBoundedOutOfOrderness(Duration.ofSeconds(10))
                                  //                 .withTimestampAssigner((order, rTimeStamp) -> order.getEventTime()));
                                  
                                  DataStream orderDs = env.fromCollection(orderList)
                                      .assignTimestampsAndWatermarks(WatermarkStrategy
                                          .forBoundedOutOfOrderness(Duration.ofSeconds(10))
                                          .withTimestampAssigner((order, rTimeStamp) -> order.getEventTime()));
                                  // user 实时流 维度表
                                  // DataStream userDs = env.socketTextStream("192.168.10.42", 8888)
                                  //         .map(o -> {
                                  //             String[] lines = o.split(",");
                                  //             return new User(Integer.valueOf(lines[0]), lines[1], Double.valueOf(lines[2]), Integer.valueOf(lines[3]), lines[4], Long.valueOf(lines[3]));
                                  //         })
                                  //         .assignTimestampsAndWatermarks(WatermarkStrategy
                                  //                 .forBoundedOutOfOrderness(Duration.ofSeconds(10))
                                  //                 .withTimestampAssigner((user, rTimeStamp) -> user.getEventTime()));
                                  DataStream userDs =  env.fromCollection(userList)
                                      .assignTimestampsAndWatermarks(WatermarkStrategy
                                          .forBoundedOutOfOrderness(Duration.ofSeconds(10))
                                          .withTimestampAssigner((user, rTimeStamp) -> user.getEventTime()));      
                                                  
                                  // 转变为Table
                                  Table orderTable = tenv.fromDataStream(orderDs, $("id"), $("uId"), $("total"), $("order_eventTime").rowtime());
                                  Table userTable = tenv.fromDataStream(userDs, $("id"), $("name"), $("balance"), $("age"), $("email"), $("user_eventTime").rowtime());
                                  tenv.createTemporaryView("alan_orderTable", orderTable);
                                  tenv.createTemporaryView("alan_userTable", userTable);
                                  // 定义一个TemporalTableFunction
                                  TemporalTableFunction userDim = userTable.createTemporalTableFunction($("user_eventTime"), $("id"));
                                  // 注册表函数
                                  tenv.registerFunction("alan_userDim", userDim);
                                  // String sql = "select o.* from alan_orderTable as o ";
                                  // String sql = "select u.* from alan_userTable as u ";
                                  // String sql = "select o.*,u.name from alan_orderTable as o , alan_userTable as u where o.uId = u.id";
                                  String sql = "select o.*,u.name from alan_orderTable as o,Lateral table (alan_userDim(o.order_eventTime)) u where o.uId = u.id";
                                  // 关联查询
                                  Table result = tenv.sqlQuery(sql);
                                  // 打印输出
                                  DataStream resultDs = tenv.toAppendStream(result, Row.class);
                                  resultDs.print();
                                  // user 流数据(维度表)
                                  // userList    
                                  // order 流数据
                                  // orderList
                                  // 控制台输出
                                  // 3> +I[12, 1001, 84.0, 1970-01-01T00:00:00.010, alan]
                                  env.execute();
                              }
                          }
                          

                          4、示例:将事实流与维表进行关联-Kafka Source的EventTime实现

                          1)、bean定义

                          package org.tablesql.join.bean;
                          import java.io.Serializable;
                          import lombok.Data;
                          /*
                           * @Author: alanchan
                           * @LastEditors: alanchan
                           * @Description: 
                           */
                          @Data
                          public  class CityInfo implements Serializable {
                              private Integer cityId;
                              private String cityName;
                              private Long ts;
                          }
                          
                          package org.tablesql.join.bean;
                          import java.io.Serializable;
                          import lombok.Data;
                          /*
                           * @Author: alanchan
                           * @LastEditors: alanchan
                           * @Description: 
                           */
                          @Data
                          public  class UserInfo implements Serializable {
                              private String userName;
                              private Integer cityId;
                              private Long ts;
                          }
                          

                          2)、序列化定义

                          package org.tablesql.join.bean;
                          import java.io.IOException;
                          import java.nio.charset.StandardCharsets;
                          import com.alibaba.fastjson.JSON;
                          import com.alibaba.fastjson.TypeReference;
                          import org.apache.flink.api.common.serialization.DeserializationSchema;
                          import org.apache.flink.api.common.typeinfo.TypeHint;
                          import org.apache.flink.api.common.typeinfo.TypeInformation;
                           
                           /*
                           * @Author: alanchan
                           * @LastEditors: alanchan
                           * @Description: 
                           */
                          public class CityInfoSchema implements DeserializationSchema {
                           
                              @Override
                              public CityInfo deserialize(byte[] message) throws IOException {
                                  String jsonStr = new String(message, StandardCharsets.UTF_8);
                                  CityInfo data = JSON.parseObject(jsonStr, new TypeReference() {});
                                  return data;
                              }
                           
                              @Override
                              public boolean isEndOfStream(CityInfo nextElement) {
                                  return false;
                              }
                           
                              @Override
                              public TypeInformation getProducedType() {
                                  return TypeInformation.of(new TypeHint() {
                                  });
                              }
                              
                          }
                          
                          package org.tablesql.join.bean;
                          import java.io.IOException;
                          import java.nio.charset.StandardCharsets;
                          import com.alibaba.fastjson.JSON;
                          import com.alibaba.fastjson.TypeReference;
                          import org.apache.flink.api.common.serialization.DeserializationSchema;
                          import org.apache.flink.api.common.typeinfo.TypeHint;
                          import org.apache.flink.api.common.typeinfo.TypeInformation;
                           /*
                           * @Author: alanchan
                           * @LastEditors: alanchan
                           * @Description: 
                           */
                          public class UserInfoSchema implements DeserializationSchema {
                           
                              @Override
                              public UserInfo deserialize(byte[] message) throws IOException {
                                  String jsonStr = new String(message, StandardCharsets.UTF_8);
                                  UserInfo data = JSON.parseObject(jsonStr, new TypeReference() {});
                                  return data;
                              }
                           
                              @Override
                              public boolean isEndOfStream(UserInfo nextElement) {
                                  return false;
                              }
                           
                              @Override
                              public TypeInformation getProducedType() {
                                  return TypeInformation.of(new TypeHint() {
                                  });
                              }
                          }
                          

                          3)、实现

                          /*
                           * @Author: alanchan
                           * @LastEditors: alanchan
                           * @Description: 
                           */
                          package org.tablesql.join;
                          import static org.apache.flink.table.api.Expressions.$;
                          import java.time.Duration;
                          import java.util.Properties;
                          import org.apache.flink.api.common.eventtime.WatermarkStrategy;
                          import org.apache.flink.connector.kafka.source.KafkaSource;
                          import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
                          import org.apache.flink.streaming.api.datastream.DataStream;
                          import org.apache.flink.streaming.api.datastream.DataStreamSource;
                          import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
                          import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
                          import org.apache.flink.streaming.api.windowing.time.Time;
                          import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
                          import org.apache.flink.table.api.Table;
                          import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
                          import org.apache.flink.table.functions.TemporalTableFunction;
                          import org.apache.flink.types.Row;
                          import org.tablesql.join.bean.CityInfo;
                          import org.tablesql.join.bean.CityInfoSchema;
                          import org.tablesql.join.bean.UserInfo;
                          import org.tablesql.join.bean.UserInfoSchema;
                          public class TestJoinDimByKafkaEventTimeDemo {
                              public static void main(String[] args) throws Exception {
                                  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                                  StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
                                  // Kafka的ip和要消费的topic,//Kafka设置
                                  Properties props = new Properties();
                                  props.setProperty("bootstrap.servers", "192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092");
                                  props.setProperty("group.id", "kafkatest");
                                  // 读取用户信息Kafka
                                  FlinkKafkaConsumer userConsumer = new FlinkKafkaConsumer("user", new UserInfoSchema(),props);
                                  userConsumer.setStartFromEarliest();
                                  userConsumer.assignTimestampsAndWatermarks(WatermarkStrategy
                                                  .forBoundedOutOfOrderness(Duration.ofSeconds(0))
                                                  .withTimestampAssigner((user, rTimeStamp) -> user.getTs()) // 该句如果不加,则是默认为kafka的事件时间
                                  );
                                          
                                  // 读取城市维度信息Kafka
                                  FlinkKafkaConsumer cityConsumer = new FlinkKafkaConsumer("city", new CityInfoSchema(), props);
                                  cityConsumer.setStartFromEarliest();
                                  cityConsumer.assignTimestampsAndWatermarks(WatermarkStrategy
                                                  .forBoundedOutOfOrderness(Duration.ofSeconds(0))
                                                  .withTimestampAssigner((city, rTimeStamp) -> city.getTs()) // 该句如果不加,则是默认为kafka的事件时间
                                  );
                                  
                                  Table userTable = tableEnv.fromDataStream(env.addSource(userConsumer), $("userName"), $("cityId"), $("ts").rowtime());
                                  Table cityTable = tableEnv.fromDataStream(env.addSource(cityConsumer), $("cityId"), $("cityName"),$("ts").rowtime());
                                  tableEnv.createTemporaryView("userTable", userTable);
                                  tableEnv.createTemporaryView("cityTable", cityTable);
                                  // 定义一个TemporalTableFunction
                                  TemporalTableFunction dimCity = cityTable.createTemporalTableFunction($("ts"), $("cityId"));
                                  // 注册表函数
                                  // tableEnv.registerFunction("dimCity", dimCity);
                                  tableEnv.createTemporarySystemFunction("dimCity", dimCity);
                                  Table u = tableEnv.sqlQuery("select * from userTable");
                                  // u.printSchema();
                                  tableEnv.toAppendStream(u, Row.class).print("user流接收到:");
                                  Table c = tableEnv.sqlQuery("select * from cityTable");
                                  // c.printSchema();
                                  tableEnv.toAppendStream(c, Row.class).print("city流接收到:");
                                  // 关联查询
                                  Table result = tableEnv
                                          .sqlQuery("select u.userName,u.cityId,d.cityName,u.ts " +
                                                  "from userTable as u " +
                                                  ", Lateral table  (dimCity(u.ts)) d " +
                                                  "where u.cityId=d.cityId");
                                  // 打印输出
                                  DataStream resultDs = tableEnv.toAppendStream(result, Row.class);
                                  resultDs.print("\t关联输出:");
                                  // 用户信息格式:
                                  // {"userName":"user1","cityId":1,"ts":0}
                                  // {"userName":"user1","cityId":1,"ts":1}
                                  // {"userName":"user1","cityId":1,"ts":4}
                                  // {"userName":"user1","cityId":1,"ts":5}
                                  // {"userName":"user1","cityId":1,"ts":7}
                                  // {"userName":"user1","cityId":1,"ts":9}
                                  // {"userName":"user1","cityId":1,"ts":11}
                                  // kafka-console-producer.sh --broker-list server1:9092 --topic user
                                  // 城市维度格式:
                                  // {"cityId":1,"cityName":"nanjing","ts":15}
                                  // {"cityId":1,"cityName":"beijing","ts":1}
                                  // {"cityId":1,"cityName":"shanghai","ts":5}
                                  // {"cityId":1,"cityName":"shanghai","ts":7}
                                  // {"cityId":1,"cityName":"wuhan","ts":10}
                                  // kafka-console-producer.sh --broker-list server1:9092 --topic city
                                  // 输出
                                  // city流接收到::6> +I[1, beijing, 1970-01-01T00:00:00.001]
                                  // user流接收到::6> +I[user1, 1, 1970-01-01T00:00:00.004]
                                  // city流接收到::6> +I[1, shanghai, 1970-01-01T00:00:00.005]
                                  // user流接收到::6> +I[user1, 1, 1970-01-01T00:00:00.005]
                                  // city流接收到::6> +I[1, shanghai, 1970-01-01T00:00:00.007]
                                  // user流接收到::6> +I[user1, 1, 1970-01-01T00:00:00.007]
                                  // city流接收到::6> +I[1, wuhan, 1970-01-01T00:00:00.010]
                                  // user流接收到::6> +I[user1, 1, 1970-01-01T00:00:00.009]
                                  // user流接收到::6> +I[user1, 1, 1970-01-01T00:00:00.011]
                                  //         关联输出::12> +I[user1, 1, beijing, 1970-01-01T00:00:00.001]
                                  //         关联输出::12> +I[user1, 1, beijing, 1970-01-01T00:00:00.004]
                                  //         关联输出::12> +I[user1, 1, shanghai, 1970-01-01T00:00:00.005]
                                  //         关联输出::12> +I[user1, 1, shanghai, 1970-01-01T00:00:00.007]
                                  //         关联输出::12> +I[user1, 1, shanghai, 1970-01-01T00:00:00.009]
                                  
                                  env.execute("joinDemo");
                              }
                          }
                          

                          以上,本文详细的介绍了Flink的维表join的6种方式,即静态数据、缓存、异步I/O、广播、时态表的3种方式。

网友评论

搜索
最新文章
热门文章
热门标签