GitLab的用户创建和推送
- 在root用户-密码界面重新设置密码
- 添加Leader用户和自己使用的用户
- 使用root用户创建相应的群组
- 使用Leader用户创建对应的项目
- 设置分支配置为“初始推送后完全保护”
- 设置.gitignore文件,项目配置文件等其他非通用代码无需提交
- 安装gitlab project 2020插件
- 点击share project on gitlab 即可将项目上传到gitlab中
Flink集群的搭建
- 只需要运行Yarn模式
- 配置Hadoop的环境变量
- 将Flink1.17解压安装到对应为止即可
Hbase的配置
- 依赖zookeeper和hadoop这两个框架
- 检查Hadoop是否退出安全模式,如果丢失文件,先退出安全模式,hdfs dfsadmin -safemode leave
- 解压Hbase2.4.11的安装包
- 添加Hbase的环境变量
- 修改配置文件
- hbase-env.xml
- export hbase_manages_zk=false 不使用自带的zookeeper
- hbase-site.xml
- hbase.cluster.distributed = true 使用集群模式
- hbase.zookeeper.quorum = hadoop102… zookeeper连接地址
- hbase.rootdir = hdfs://hadoop102:8020, hbase在hdfs的存放根路径
- hbase.wal.provider = filesystem 预写日志
- regionservers: 添加hbase小弟的主机名称
- hbase-env.xml
Redise的配置
- 进入redise目录,执行make指令进行编译
- make instanll安装
- 将myredis.conf文件复制到~/目录下
- 将bind 127.0.0.1 注释掉,并且关闭保护模式
- 设置daemon 后台启动模式为yes
- redis-server ./my_redis.conf后台启动
实时数仓ODS层
- 保证数据模拟器产生的数据是有序的
- 设置mock.if-realtime:1,重复执行数据模拟器产生数据时,会从当前时间继续产生数据。
- Kafka数据有序:Flink并发度和Kafka的分区数一致
- 设置三个kafka节点的分区个数都为4,num.partitions=4
- Flink的并发度=4
- 历史维度数据
- 使用maxwell的bootstrap功能初始化维度信息(json格式),写入到kafka
- 编写mysql_to_kafka_init.sh脚本
- maxwell需要检查是否连接mysql的binlog成功,查看日志;如果出错,需要在mysql的maxwell库中删除所有表即可
实时数仓dim层
- dim层的设计依据是维度建模理论,并且遵循三范式,使用雪花模型
- dim层的数据存储在Hbase中
- 开发时需要切换到dev开发分支
- 为Flink的开发创建一个基类,名为BaseApp
- 抽象方法handle(): 每个主程序的业务逻辑
- 具体方法start():里面实现Flink代码的通用逻辑
- 不同分组的数据只能消费一次,如果数据需要给多个程序使用,就需要分为不同的group
Flink-cdc获取维度信息
- 数据清洗
- 动态拆分维度表功能
- 方式1:直接将维度表做成List< String > (维度表名称)保存
- 如果将代码写死,后续想要修改,需要重新编译修改
- 方式2:将维度表名称设计为单独的一个配置文件,而不是在代码里面写死;后续想要修改,直接改配置文件,重启任务即可生效
- 方式3:热修改hotfix, 热加载配置文件,不需要重启;热加载文件一般是以时间周期作为加载逻辑。时间长时会出现时效性问题,时间短的话过于耗费资源。
- 方式4:zookeeper的watch的监控,能够存储基础的表名,但是不适合存储完整的表格信息,除了要判断哪些是维度表,还需要记录哪些数据需要写出到Hbase。
- 方式5:cdc,变更数据抓取,类似与maxwell。
- 注意:运行下面的代码需要再虚拟机的/etc/my.cnf文件中开启对应数据库的binlog日志。注意对照库名是否填写正确。
- 方式1:直接将维度表做成List< String > (维度表名称)保存
public class Test02 { public static void main(String[] args) { //创建env //1.创建运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //默认是最大并行度 env.setParallelism(4); System.setProperty("HADOOP_USER_NAME", "atguigu"); //设置检查点和状态后端 // 1.4 状态后端及检查点相关配置 // 1.4.1 设置状态后端 env.setStateBackend(new HashMapStateBackend()); 1.4.2 开启 checkpoint //env.enableCheckpointing(5000); 1.4.3 设置 checkpoint 模式: 精准一次 //env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); 1.4.4 checkpoint 存储 //env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/gmall2023/stream/" + "test01"); 1.4.5 checkpoint 并发数 //env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); 1.4.6 checkpoint 之间的最小间隔 //env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000); 1.4.7 checkpoint 的超时时间 //env.getCheckpointConfig().setCheckpointTimeout(10000); 1.4.8 job 取消时 checkpoint 保留策略 //env.getCheckpointConfig().setExternalizedCheckpointCleanup(RETAIN_ON_CANCELLATION); //读取数据 //mysql source MySqlSource
mySqlSource = MySqlSource. builder() .hostname(Constant.MYSQL_HOST) .port(Constant.MYSQL_PORT) .username(Constant.MYSQL_USER_NAME) .password(Constant.MYSQL_PASSWORD) .databaseList("gmall2023_config") .tableList("gmall2023_config.table_process_dim") .deserializer(new JsonDebeziumDeserializationSchema()) .startupOptions(StartupOptions.initial()) .build(); DataStreamSource ds = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "kafkasource").setParallelism(1); ds.print(); try { env.execute(); } catch (Exception e) { throw new RuntimeException(e); } } }
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章