上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

【flink番外篇】15、Flink维表实战之6种实现方式-初始化的静态数据

guduadmin11天前

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列

    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列

    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列

    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列

    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列

    本部分和实际的运维、监控工作相关。

    二、Flink 示例专栏

    Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

    两专栏的所有文章入口点击:Flink 系列文章汇总索引


    文章目录

    • Flink 系列文章
    • 一、maven依赖及数据结构
      • 1、maven依赖
      • 2、数据结构
      • 3、数据源
      • 4、验证结果
      • 二、维表来源于初始化的静态数据
        • 1、说明
        • 2、示例:将事实流与维表进行关联

          本文介绍了Flink 维表的第一种方式,通过初始化的静态数据实现。

          如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

          本文除了maven依赖外,没有其他依赖。

          本专题分为以下几篇文章:

          【flink番外篇】15、Flink维表实战之6种实现方式-初始化的静态数据

          【flink番外篇】15、Flink维表实战之6种实现方式-维表来源于第三方数据源

          【flink番外篇】15、Flink维表实战之6种实现方式-通过广播将维表数据传递到下游

          【flink番外篇】15、Flink维表实战之6种实现方式-通过Temporal table实现维表数据join

          【flink番外篇】15、Flink维表实战之6种实现方式-完整版(1)

          【flink番外篇】15、Flink维表实战之6种实现方式-完整版(2)

          一、maven依赖及数据结构

          1、maven依赖

          本文的所有示例均依赖本部分的pom.xml内容,可能针对下文中的某些示例存在过多的引入,根据自己的情况进行删减。

          
          	UTF-8
          	UTF-8
          	1.8
          	1.8
          	1.8
          	2.12
          	1.17.0
          
          
          	
          	
          		org.apache.flink
          		flink-clients
          		${flink.version}
          		provided
          	
          	
          		org.apache.flink
          		flink-java
          		${flink.version}
          		provided
          	
          	
          		org.apache.flink
          		flink-table-common
          		${flink.version}
          		provided
          	
          	
          		org.apache.flink
          		flink-streaming-java
          		${flink.version}
          	
          	
          		org.apache.flink
          		flink-table-api-java-bridge
          		${flink.version}
          		provided
          	
          	
          		org.apache.flink
          		flink-csv
          		${flink.version}
          		provided
          	
          	
          		org.apache.flink
          		flink-json
          		${flink.version}
          		provided
          	
          	
          	
          		org.apache.flink
          		flink-table-planner_2.12
          		${flink.version}
          		provided
          	
          	
          	
          		org.apache.flink
          		flink-table-api-java-uber
          		${flink.version}
          		provided
          	
          	
          	
          		org.apache.flink
          		flink-table-runtime
          		${flink.version}
          		provided
          	
          	
          		org.apache.flink
          		flink-connector-jdbc
          		3.1.0-1.17
          	
          	
          		mysql
          		mysql-connector-java
          		5.1.38
          	
          	
          		com.google.guava
          		guava
          		32.0.1-jre
          	
          	
          	
          	
          		org.apache.flink
          		flink-connector-kafka
          		${flink.version}
          	
          	
          	
          		org.apache.flink
          		flink-sql-connector-kafka
          		${flink.version}
          		provided
          	
          	
          	
          		org.apache.commons
          		commons-compress
          		1.24.0
          	
          	
          		org.projectlombok
          		lombok
          		1.18.2
          	
          	
          		org.apache.bahir
          		flink-connector-redis_2.12
          		1.1.0
          		
          			
          				flink-streaming-java_2.12
          				org.apache.flink
          			
          			
          				flink-runtime_2.12
          				org.apache.flink
          			
          			
          				flink-core
          				org.apache.flink
          			
          			
          				flink-java
          				org.apache.flink
          			
          			
          				org.apache.flink
          				flink-table-api-java
          			
          			
          				org.apache.flink
          				flink-table-api-java-bridge_2.12
          			
          			
          				org.apache.flink
          				flink-table-common
          			
          			
          				org.apache.flink
          				flink-table-planner_2.12
          			
          		
          	
          	
          	
          		com.alibaba
          		fastjson
          		2.0.43
          	
          
          

          2、数据结构

          本示例仅仅为实现需求:将订单中uId与用户id进行关联,然后输出Tuple2

          • 事实流 order
                // 事实表
                @Data
                @NoArgsConstructor
                @AllArgsConstructor
                static class Order {
                    private Integer id;
                    private Integer uId;
                    private Double total;
                }
            
            • 维度流 user
                  // 维表
                  @Data
                  @NoArgsConstructor
                  @AllArgsConstructor
                  static class User {
                      private Integer id;
                      private String name;
                      private Double balance;
                      private Integer age;
                      private String email;
                  }
              

              3、数据源

              事实流数据有几种,具体见示例部分,比如socket、redis、kafka等

              维度表流有几种,具体见示例部分,比如静态数据、mysql、socket、kafka等。

              如此,实现本文中的示例就需要准备好相应的环境,即mysql、redis、kafka、netcat等。

              4、验证结果

              本文提供的所有示例均为验证通过的示例,测试的数据均在每个示例中,分为事实流、维度流和运行结果进行注释,在具体的示例中关于验证不再赘述。

              二、维表来源于初始化的静态数据

              1、说明

              通过定义一个类实现RichMapFunction,在open()中读取维表数据加载到内存中,在事实流map()方法中与维表数据进行关联。

              由于数据存储于内存中,所以只适合小数据量并且维表数据更新频率不高的情况下使用。虽然可以在open中定义一个定时器定时更新维表,但是还是存在维表更新不及时的情况或资源开销较大的情况。一般如果数据量较小且不大会变(或变化影响也不大)的情况下,理想选择之一。

              2、示例:将事实流与维表进行关联

              import java.util.HashMap;
              import java.util.Map;
              import org.apache.flink.api.common.functions.RichMapFunction;
              import org.apache.flink.api.java.tuple.Tuple2;
              import org.apache.flink.configuration.Configuration;
              import org.apache.flink.streaming.api.datastream.DataStream;
              import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
              import lombok.AllArgsConstructor;
              import lombok.Data;
              import lombok.NoArgsConstructor;
              /*
               * @Author: alanchan
               * @LastEditors: alanchan
               * @Description: 采用在RichMapfunction类的open方法中将维表数据加载到内存
               */
              public class TestJoinDimFromStaticDataDemo {
                  // 维表
                  @Data
                  @NoArgsConstructor
                  @AllArgsConstructor
                  static class User {
                      private Integer id;
                      private String name;
                      private Double balance;
                      private Integer age;
                      private String email;
                  }
                  // 事实表
                  @Data
                  @NoArgsConstructor
                  @AllArgsConstructor
                  static class Order {
                      private Integer id;
                      private Integer uId;
                      private Double total;
                  }
                  public static void main(String[] args) throws Exception {
                      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                      // order 事实流
                      DataStream orderDs = env.socketTextStream("192.168.10.42", 9999)
                              .map(o -> {
                                  String[] lines = o.split(",");
                                  return new Order(Integer.valueOf(lines[0]), Integer.valueOf(lines[1]), Double.valueOf(lines[2]));
                              });
                      DataStream> result = orderDs.map(new RichMapFunction>() {
                          Map userDim = null;
                          // 维表-静态数据,本处使用的是匿名内部类实现的
                          @Override
                          public void open(Configuration parameters) throws Exception {
                              userDim = new HashMap<>();
                              userDim.put(1001, new User(1001, "alan", 20d, 18, "alan.chan.chn@163.com"));
                              userDim.put(1002, new User(1002, "alanchan", 22d, 20, "alan.chan.chn@163.com"));
                              userDim.put(1003, new User(1003, "alanchanchn", 23d, 22, "alan.chan.chn@163.com"));
                              userDim.put(1004, new User(1004, "alan_chan", 21d, 19, "alan.chan.chn@163.com"));
                              userDim.put(1005, new User(1005, "alan_chan_chn", 23d, 21, "alan.chan.chn@163.com"));
                          }
                          @Override
                          public Tuple2 map(Order value) throws Exception {
                              return new Tuple2(value, userDim.get(value.getUId()).getName());
                          }
                      });
                      result.print();
                      // nc 输入
                      // 1,1004,345
                      // 2,1001,678
                      
                      // 控制台输出
                      // 2> (TestJoinDimFromStaticData.Order(id=1, uId=1004, total=345.0),alan_chan)
                      // 3> (TestJoinDimFromStaticData.Order(id=2, uId=1001, total=678.0),alan)
                      env.execute("TestJoinDimFromStaticData");
                  }
              }
              

              以上,本文介绍了Flink 维表的第一种方式,通过初始化的静态数据实现。

              本专题分为以下几篇文章:

              【flink番外篇】15、Flink维表实战之6种实现方式-初始化的静态数据

              【flink番外篇】15、Flink维表实战之6种实现方式-维表来源于第三方数据源

              【flink番外篇】15、Flink维表实战之6种实现方式-通过广播将维表数据传递到下游

              【flink番外篇】15、Flink维表实战之6种实现方式-通过Temporal table实现维表数据join

              【flink番外篇】15、Flink维表实战之6种实现方式-完整版(1)

              【flink番外篇】15、Flink维表实战之6种实现方式-完整版(2)

网友评论

搜索
最新文章
热门文章
热门标签