上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(三)离线分析

guduadmin211小时前

文章目录

    • 08:离线分析:Hbase表设计及构建
    • 09:离线分析:Kafka消费者构建
    • 10:离线分析:Hbase连接构建
    • 11:离线分析:Rowkey的构建
    • 12:离线分析:Put数据列构建
    • 13:离线分析:存储运行测试
    • 14:离线分析:Hive关联测试
    • 15:离线分析:Phoenix关联测试

      08:离线分析:Hbase表设计及构建

      • 目标:掌握Hbase表的设计及创建表的实现

      • 路径

        • step1:基础设计
        • step2:Rowkey设计
        • step3:分区设计
        • step4:建表
        • 实施

          • 基础设计

            • Namespace:MOMO_CHAT

            • Table:MOMO_MSG

            • Family:C1

            • Qualifier:与数据中字段名保持一致

              基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(三)离线分析,image-20210905200550740,第1张

            • Rowkey设计

              • 查询需求:根据发件人id + 收件人id + 消息日期 查询聊天记录

                • 发件人账号
                • 收件人账号
                • 时间
                • 设计规则:业务、唯一、长度、散列、组合

                • 设计实现

                  • 加盐方案:CRC、Hash、MD5、MUR
                  • => 8位、16位、32位
                    MD5Hash【发件人账号_收件人账号_消息时间 =》 8位】_发件人账号_收件人账号_消息时间
                    
                  • 分区设计

                    • Rowkey前缀:MD5编码,由字母和数字构成
                    • 数据并发量:高
                    • 分区设计:使用HexSplit16进制划分多个分区
                    • 建表

                      • 启动Hbase:start-hbase.sh
                      • 进入客户端:hbase shell
                        #创建NS
                        create_namespace 'MOMO_CHAT'
                        #建表
                        create 'MOMO_CHAT:MOMO_MSG', {NAME => "C1", COMPRESSION => "GZ"}, { NUMREGIONS => 6, SPLITALGO => 'HexStringSplit'}
                        

                        基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(三)离线分析,image-20210905192807020,第2张

                      • 小结

                        • 掌握Hbase表的设计及创建表的实现

                          09:离线分析:Kafka消费者构建

                          • 目标:实现离线消费者的开发

                          • 路径

                            • 整体实现的路径

                              //入口:调用实现消费Kafka,将数据写入Hbase
                              public void main(){
                                  //step1:消费Kafka
                                  consumerKafka();
                                  
                              }
                              //用于消费Kafka数据
                              public void consumerKafka(){
                                  prop = new Properties()
                              	KafkaConsumer consumer = new KafkaConsumer(prop)
                                  consumer.subscribe("MOMO_MSG")
                                  ConsumerRecords  records = consumer.poll
                                  //基于每个分区来消费和处理
                                      record :Topic、Partition、Offset、Key、Value
                                  	//step2:写入Hbase
                                      writeToHbase(value)
                                  //提交这个分区的offset
                                   commitSycn(offset+1)
                              }
                              //用于将value的数据写入Hbase方法
                              public void writeToHbase(){
                                  //step1:构建连接
                                  //step2:构建Table对象
                                  //step3:构建Put对象
                                  //获取rowkey
                                 rowkey = getRowkey(value)
                                  Put put = new Put(rowkey)
                                  put.添加每一列
                                  table.put()
                              }
                              public String getRowkey(){
                                  value.getSender
                                  value.getReceiver
                                  value.getTime
                                      rowkey = MD5+sender+receiverId +time
                                      return rowkey
                              }
                              
                            • 实施

                                  /**
                                   * 用于消费Kafka的数据,将合法数据写入Hbase
                                   */
                                  private static void consumerKafkaToHbase() throws Exception {
                                      //构建配置对象
                                      Properties props = new Properties();
                                      //指定服务端地址
                                      props.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
                                      //指定消费者组的id
                                      props.setProperty("group.id", "momo");
                                      //关闭自动提交
                                      props.setProperty("enable.auto.commit", "false");
                                      //指定K和V反序列化的类型
                                      props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                                      props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                                      //构建消费者的连接
                                      KafkaConsumer consumer = new KafkaConsumer<>(props);
                                      //指定订阅哪些Topic
                                      consumer.subscribe(Arrays.asList("MOMO_MSG"));
                                      //持续拉取数据
                                      while (true) {
                                          //向Kafka请求拉取数据,等待Kafka响应,在100ms以内如果响应,就拉取数据,如果100ms内没有响应,就提交下一次请求: 100ms为等待Kafka响应时间
                                          //拉取到的所有数据:多条KV数据都在ConsumerRecords对象,类似于一个集合
                                          ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
                                          //todo:3-处理拉取到的数据:打印
                                          //取出每个分区的数据进行处理
                                          Set partitions = records.partitions();//获取本次数据中所有分区
                                          //对每个分区的数据做处理
                                          for (TopicPartition partition : partitions) {
                                              List> partRecords = records.records(partition);//取出这个分区的所有数据
                                              //处理这个分区的数据
                                              long offset = 0;
                                              for (ConsumerRecord record : partRecords) {
                                                  //获取Topic
                                                  String topic = record.topic();
                                                  //获取分区
                                                  int part = record.partition();
                                                  //获取offset
                                                  offset = record.offset();
                                                  //获取Key
                                                  String key = record.key();
                                                  //获取Value
                                                  String value = record.value();
                                                  System.out.println(topic + "\t" + part + "\t" + offset + "\t" + key + "\t" + value);
                                                  //将Value数据写入Hbase
                                                  if(value != null && !"".equals(value) && value.split("\001").length == 20 ){
                                                      writeToHbase(value);
                                                  }
                                              }
                                              //手动提交分区的commit offset
                                              Map offsets = Collections.singletonMap(partition,new OffsetAndMetadata(offset+1));
                                              consumer.commitSync(offsets);
                                          }
                                      }
                                  }
                              
                            • 小结

                              • 实现离线消费者的开发

                                10:离线分析:Hbase连接构建

                                • 目标:实现Hbase连接的构建

                                • 实施

                                      private static SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                                  	private static Connection conn;
                                      private static Table table;
                                      private static TableName tableName = TableName.valueOf("MOMO_CHAT:MOMO_MSG");//表名
                                      private static byte[] family = Bytes.toBytes("C1");//列族
                                      // 静态代码块: 随着类的加载而加载,一般只会加载一次,避免构建多个连接影响性能
                                      static{
                                          try {
                                              //构建配置对象
                                              Configuration conf = HBaseConfiguration.create();
                                              conf.set("hbase.zookeeper.quorum","node1:2181,node2:2181,node3:2181");
                                              //构建连接
                                              conn = ConnectionFactory.createConnection(conf);
                                              //获取表对象
                                              table = conn.getTable(tableName);
                                          } catch (IOException e) {
                                              e.printStackTrace();
                                          }
                                      }
                                  
                                • 小结

                                  • 实现Hbase连接的构建

                                    11:离线分析:Rowkey的构建

                                    • 目标:实现Rowkey的构建

                                    • 实施

                                      private static String getMomoRowkey(String stime, String sender_accounter, String receiver_accounter) throws Exception {
                                              //转换时间戳
                                              long time = format.parse(stime).getTime();
                                              String suffix = sender_accounter+"_"+receiver_accounter+"_"+time;
                                              //构建MD5
                                              String prefix = MD5Hash.getMD5AsHex(Bytes.toBytes(suffix)).substring(0,8);
                                              //合并返回
                                              return prefix+"_"+suffix;
                                          }
                                      
                                    • 小结

                                      • 实现Rowkey的构建

                                        12:离线分析:Put数据列构建

                                        • 目标:实现Put数据列的构建

                                        • 实施

                                          put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("msg_time"),Bytes.toBytes(items[0]));
                                          put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_nickyname"),Bytes.toBytes(items[1]));
                                          put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_account"),Bytes.toBytes(items[2]));
                                          put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_sex"),Bytes.toBytes(items[3]));
                                          put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_ip"),Bytes.toBytes(items[4]));
                                          put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_os"),Bytes.toBytes(items[5]));
                                          put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_phone_type"),Bytes.toBytes(items[6]));
                                          put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_network"),Bytes.toBytes(items[7]));
                                          put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_gps"),Bytes.toBytes(items[8]));
                                          put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_nickyname"),Bytes.toBytes(items[9]));
                                          put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_ip"),Bytes.toBytes(items[10]));
                                          put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_account"),Bytes.toBytes(items[11]));
                                          put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_os"),Bytes.toBytes(items[12]));
                                          put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_phone_type"),Bytes.toBytes(items[13]));
                                          put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_network"),Bytes.toBytes(items[14]));
                                          put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_gps"),Bytes.toBytes(items[15]));
                                          put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_sex"),Bytes.toBytes(items[16]));
                                          put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("msg_type"),Bytes.toBytes(items[17]));
                                          put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("distance"),Bytes.toBytes(items[18]));
                                          put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("message"),Bytes.toBytes(items[19]));
                                          
                                        • 小结

                                          • 实现Put数据列的构建

                                            13:离线分析:存储运行测试

                                            • 目标:测试运行消费Kafka数据动态写入Hbase

                                            • 实施

                                              • 启动消费者程序

                                              • 启动Flume程序

                                                cd /export/server/flume-1.9.0-bin
                                                bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_kafka.properties -Dflume.root.logger=INFO,console
                                                
                                              • 启动模拟数据

                                                java -jar /export/data/momo_init/MoMo_DataGen.jar \
                                                /export/data/momo_init/MoMo_Data.xlsx \
                                                /export/data/momo_data/ \
                                                10
                                                
                                              • 观察Hbase结果

                                                基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(三)离线分析,image-20210905213457245,第3张

                                              • 小结

                                                • 测试运行消费Kafka数据动态写入Hbase

                                                  14:离线分析:Hive关联测试

                                                  • 目标:使用Hive关联Hbase实现离线分析

                                                  • 路径

                                                    • step1:关联
                                                    • step2:查询
                                                    • 实施

                                                      • 启动Hive和yarn

                                                        start-yarn.sh
                                                        hive-daemon.sh metastore
                                                        hive-daemon.sh hiveserver2
                                                        start-beeline.sh
                                                        
                                                      • 关联

                                                        create database MOMO_CHAT;
                                                        use MOMO_CHAT;
                                                        create external table if not exists MOMO_CHAT.MOMO_MSG (
                                                          id string,
                                                          msg_time string ,
                                                          sender_nickyname string , 
                                                          sender_account string , 
                                                          sender_sex string , 
                                                          sender_ip string ,
                                                          sender_os string , 
                                                          sender_phone_type string ,
                                                          sender_network string , 
                                                          sender_gps string , 
                                                          receiver_nickyname string ,
                                                          receiver_ip string ,
                                                          receiver_account string ,
                                                          receiver_os string ,
                                                          receiver_phone_type string ,
                                                          receiver_network string ,
                                                          receiver_gps string ,
                                                          receiver_sex string ,
                                                          msg_type string ,
                                                          distance string ,
                                                          message string 
                                                        ) stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' 
                                                        with serdeproperties('hbase.columns.mapping'=':key,C1:msg_time,C1:sender_nickyname, 
                                                        C1:sender_account,C1:sender_sex,C1:sender_ip,C1:sender_os,C1:sender_phone_type,
                                                        C1:sender_network,C1:sender_gps,C1:receiver_nickyname,C1:receiver_ip,C1:receiver_account,
                                                        C1:receiver_os,C1:receiver_phone_type,C1:receiver_network,C1:receiver_gps,C1:receiver_sex,
                                                        C1:msg_type,C1:distance,C1:message ') tblproperties('hbase.table.name'='MOMO_CHAT:MOMO_MSG');
                                                        
                                                      • 分析查询

                                                        --基础查询
                                                        select 
                                                          msg_time,sender_nickyname,receiver_nickyname,distance 
                                                        from momo_msg limit 10;
                                                        --查询聊天记录:发送人id + 接收人id + 日期:1f300e5d_13280256412_15260978785_1632888342000
                                                        select 
                                                          * 
                                                        from momo_msg 
                                                        where sender_account='13280256412' 
                                                        and receiver_account='15260978785' 
                                                        and substr(msg_time,0,10) = '2021-09-29';
                                                        --统计每个小时的消息数
                                                        select
                                                          substr(msg_time,0,13) as hour,
                                                          count(*) as cnt
                                                        from momo_msg
                                                        group by substr(msg_time,0,13);
                                                        
                                                      • 小结

                                                        • 使用Hive关联Hbase实现离线分析

                                                          15:离线分析:Phoenix关联测试

                                                          • 目标:使用Phoenix关联Hbase实现即时查询

                                                          • 路径

                                                            • step1:关联
                                                            • step2:查询
                                                            • 实施

                                                              • 启动

                                                                cd /export/server/phoenix-5.0.0-HBase-2.0-bin/
                                                                bin/sqlline.py node1:2181
                                                                
                                                              • 关联

                                                                create view if not exists MOMO_CHAT.MOMO_MSG (
                                                                  "id" varchar primary key,
                                                                  C1."msg_time" varchar ,
                                                                  C1."sender_nickyname" varchar , 
                                                                  C1."sender_account" varchar , 
                                                                  C1."sender_sex" varchar , 
                                                                  C1."sender_ip" varchar ,
                                                                  C1."sender_os" varchar , 
                                                                  C1."sender_phone_type" varchar ,
                                                                  C1."sender_network" varchar , 
                                                                  C1."sender_gps" varchar , 
                                                                  C1."receiver_nickyname" varchar ,
                                                                  C1."receiver_ip" varchar ,
                                                                  C1."receiver_account" varchar ,
                                                                  C1."receiver_os" varchar ,
                                                                  C1."receiver_phone_type" varchar ,
                                                                  C1."receiver_network" varchar ,
                                                                  C1."receiver_gps" varchar ,
                                                                  C1."receiver_sex" varchar ,
                                                                  C1."msg_type" varchar ,
                                                                  C1."distance" varchar ,
                                                                  C1."message" varchar
                                                                );
                                                                
                                                              • 即时查询

                                                                --基础查询
                                                                select 
                                                                  "id",c1."sender_account",c1."receiver_account" 
                                                                from momo_chat.momo_msg 
                                                                limit 10;
                                                                --查询每个发送人发送的消息数
                                                                select 
                                                                  c1."sender_account" ,
                                                                  count(*) as cnt 
                                                                from momo_chat.momo_msg 
                                                                group by c1."sender_account";
                                                                --查询每个发送人聊天的人数
                                                                select 
                                                                  c1."sender_account" ,
                                                                  count(distinct c1."receiver_account") as cnt 
                                                                from momo_chat.momo_msg 
                                                                group by c1."sender_account" 
                                                                order by cnt desc;
                                                                
                                                              • 小结

                                                                • 使用Phoenix关联Hbase实现即时查询

网友评论

搜索
最新文章
热门文章
热门标签