上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

Flink实时电商数仓(二)

guduadmin11天前

GitLab的用户创建和推送

  1. 在root用户-密码界面重新设置密码
  2. 添加Leader用户和自己使用的用户
  3. 使用root用户创建相应的群组
  4. 使用Leader用户创建对应的项目
  5. 设置分支配置为“初始推送后完全保护”
  6. 设置.gitignore文件,项目配置文件等其他非通用代码无需提交
  7. 安装gitlab project 2020插件
  8. 点击share project on gitlab 即可将项目上传到gitlab中

Flink集群的搭建

  • 只需要运行Yarn模式
  • 配置Hadoop的环境变量

    Flink实时电商数仓(二),在这里插入图片描述,第1张

  • 将Flink1.17解压安装到对应为止即可

    Hbase的配置

    1. 依赖zookeeper和hadoop这两个框架
    2. 检查Hadoop是否退出安全模式,如果丢失文件,先退出安全模式,hdfs dfsadmin -safemode leave
    3. 解压Hbase2.4.11的安装包
    4. 添加Hbase的环境变量

      Flink实时电商数仓(二),在这里插入图片描述,第2张

    5. 修改配置文件
      • hbase-env.xml
        • export hbase_manages_zk=false 不使用自带的zookeeper
        • hbase-site.xml
          • hbase.cluster.distributed = true 使用集群模式
          • hbase.zookeeper.quorum = hadoop102… zookeeper连接地址
          • hbase.rootdir = hdfs://hadoop102:8020, hbase在hdfs的存放根路径
          • hbase.wal.provider = filesystem 预写日志
          • regionservers: 添加hbase小弟的主机名称

    Redise的配置

    1. 进入redise目录,执行make指令进行编译
    2. make instanll安装
    3. 将myredis.conf文件复制到~/目录下
    4. 将bind 127.0.0.1 注释掉,并且关闭保护模式
    5. 设置daemon 后台启动模式为yes
    6. redis-server ./my_redis.conf后台启动

    实时数仓ODS层

    • 保证数据模拟器产生的数据是有序的
      • 设置mock.if-realtime:1,重复执行数据模拟器产生数据时,会从当前时间继续产生数据。
      • Kafka数据有序:Flink并发度和Kafka的分区数一致
        • 设置三个kafka节点的分区个数都为4,num.partitions=4
        • Flink的并发度=4
        • 历史维度数据
          • 使用maxwell的bootstrap功能初始化维度信息(json格式),写入到kafka
          • 编写mysql_to_kafka_init.sh脚本
          • maxwell需要检查是否连接mysql的binlog成功,查看日志;如果出错,需要在mysql的maxwell库中删除所有表即可

            实时数仓dim层

            • dim层的设计依据是维度建模理论,并且遵循三范式,使用雪花模型
            • dim层的数据存储在Hbase中
            • 开发时需要切换到dev开发分支
            • 为Flink的开发创建一个基类,名为BaseApp
              • 抽象方法handle(): 每个主程序的业务逻辑
              • 具体方法start():里面实现Flink代码的通用逻辑
              • 不同分组的数据只能消费一次,如果数据需要给多个程序使用,就需要分为不同的group

                Flink-cdc获取维度信息

                1. 数据清洗
                2. 动态拆分维度表功能
                  • 方式1:直接将维度表做成List< String > (维度表名称)保存
                    • 如果将代码写死,后续想要修改,需要重新编译修改
                    • 方式2:将维度表名称设计为单独的一个配置文件,而不是在代码里面写死;后续想要修改,直接改配置文件,重启任务即可生效
                    • 方式3:热修改hotfix, 热加载配置文件,不需要重启;热加载文件一般是以时间周期作为加载逻辑。时间长时会出现时效性问题,时间短的话过于耗费资源。
                    • 方式4:zookeeper的watch的监控,能够存储基础的表名,但是不适合存储完整的表格信息,除了要判断哪些是维度表,还需要记录哪些数据需要写出到Hbase。
                    • 方式5:cdc,变更数据抓取,类似与maxwell。
                    • 注意:运行下面的代码需要再虚拟机的/etc/my.cnf文件中开启对应数据库的binlog日志。注意对照库名是否填写正确。
                public class Test02 {
                    public static void main(String[] args) {
                        //创建env
                        //1.创建运行环境
                        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                        //默认是最大并行度
                        env.setParallelism(4);
                        System.setProperty("HADOOP_USER_NAME", "atguigu");
                        //设置检查点和状态后端
                        // 1.4 状态后端及检查点相关配置
                        // 1.4.1 设置状态后端
                        env.setStateBackend(new HashMapStateBackend());
                         1.4.2 开启 checkpoint
                        //env.enableCheckpointing(5000);
                         1.4.3 设置 checkpoint 模式: 精准一次
                        //env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
                         1.4.4 checkpoint 存储
                        //env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/gmall2023/stream/" + "test01");
                         1.4.5 checkpoint 并发数
                        //env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
                         1.4.6 checkpoint 之间的最小间隔
                        //env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
                         1.4.7 checkpoint  的超时时间
                        //env.getCheckpointConfig().setCheckpointTimeout(10000);
                         1.4.8 job 取消时 checkpoint 保留策略
                        //env.getCheckpointConfig().setExternalizedCheckpointCleanup(RETAIN_ON_CANCELLATION);
                        //读取数据
                        //mysql source
                        MySqlSource mySqlSource = MySqlSource.builder()
                                .hostname(Constant.MYSQL_HOST)
                                .port(Constant.MYSQL_PORT)
                                .username(Constant.MYSQL_USER_NAME)
                                .password(Constant.MYSQL_PASSWORD)
                                .databaseList("gmall2023_config")
                                .tableList("gmall2023_config.table_process_dim")
                                .deserializer(new JsonDebeziumDeserializationSchema())
                                .startupOptions(StartupOptions.initial())
                                .build();
                        DataStreamSource ds = env.fromSource(mySqlSource,
                                WatermarkStrategy.noWatermarks(),
                                "kafkasource").setParallelism(1);
                        ds.print();
                        try {
                            env.execute();
                        } catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                    }
                }
                

网友评论

搜索
最新文章
热门文章
热门标签