上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

【大数据】Hudi HMS Catalog 完全使用指南

guduadmin21天前

Hudi HMS Catalog 完全使用指南

  • 1.Hudi HMS Catalog 基本介绍
  • 2.在 Flink 中写入数据
  • 3.在 Flink SQL 中查看数据
  • 4.在 Spark 中查看数据
  • 5.在 Hive 中查看数据

    1.Hudi HMS Catalog 基本介绍

    功能亮点:当 Flink 和 Spark 同时接入 Hive Metastore(HMS)时,用 Hive Metastore 对 Hudi 的元数据进行管理,无论是使用 Flink 还是 Spark 引擎建表,另外一种引擎或者 Hive 都可以直接查询。

    本文以 HDP 集群为例,其他版本分别为:

    • Flink:1.13.6
    • Spark:3.2.1
    • Hudi:0.12.0

      在 HDP 集群中,Hive 的配置文件路径为 /etc/hive/conf,所以在 Flink SQL Client 中使用 Hive 的配置文件来创建 hudi-hive catalog,从而将 Hudi 元数据存储于 hive metastore 中。

      2.在 Flink 中写入数据

      在 Flink SQL Client 中进行如下操作:

      create catalog hudi with(
      	'type' = 'hudi',
      	'mode' = 'hms',
      	'hive.conf.dir'='/etc/hive/conf'
      );
      --- 创建数据库供hudi使用
      create database hudi.hudidb;
      

      Flink SQL Client 中建表:

      --- order表
      CREATE TABLE hudi.hudidb.orders_hudi(
        uuid INT,
        ts INT,
        num INT,
        PRIMARY KEY(uuid) NOT ENFORCED
      ) WITH (
        'connector' = 'hudi',
        'table.type' = 'MERGE_ON_READ',
        'hive_sync.conf.dir' = '/etc/hive/conf' --如果hive是配置了Kerberos,必须指定
      );
      --- product表
      CREATE TABLE hudi.hudidb.product_hudi(
        uuid INT,
        ts INT,
        name STRING,
        PRIMARY KEY(uuid) NOT ENFORCED
      ) WITH (
        'connector' = 'hudi',
        'table.type' = 'MERGE_ON_READ',
        'hive_sync.conf.dir' = '/etc/hive/conf' 
      );
      --- 宽表
      CREATE TABLE hudi.hudidb.orders_product_hudi(
        uuid INT,
        ts INT,
        name STRING,
        num INT,
        PRIMARY KEY(uuid) NOT ENFORCED
      ) WITH (
        'connector' = 'hudi',
        'table.type' = 'MERGE_ON_READ',
        'hive_sync.conf.dir' = '/etc/hive/conf'
      );
      

      使用 Flink SQL 进行数据写入:

      insert into hudi.hudidb.orders_hudi values
        (1, 1, 2),
        (2, 2, 3),
        (3, 3, 4);
        
      insert into hudi.hudidb.product_hudi values
        (1, 1, 'tony'),
        (2, 2, 'mike'),
        (3, 3, 'funny');
      insert into hudi.hudidb.orders_product_hudi
      select
        hudi.hudidb.orders_hudi.uuid as uuid,
        hudi.hudidb.orders_hudi.ts as ts,
        hudi.hudidb.product_hudi.name as name,
        hudi.hudidb.orders_hudi.num as num
      from hudi.hudidb.orders_hudi
      inner join hudi.hudidb.product_hudi on hudi.hudidb.orders_hudi.uuid = hudi.hudidb.product_hudi.uuid;
      

      3.在 Flink SQL 中查看数据

      select * from hudi.hudidb.orders_hudi;
      

      得到:

      【大数据】Hudi HMS Catalog 完全使用指南,在这里插入图片描述,第1张

      select * from hudi.hudidb.product_hudi;
      

      得到:

      【大数据】Hudi HMS Catalog 完全使用指南,在这里插入图片描述,第2张

      select * from hudi.hudidb.orders_product_hudi;
      

      得到:

      【大数据】Hudi HMS Catalog 完全使用指南,在这里插入图片描述,第3张

      4.在 Spark 中查看数据

      Hive 为了连接集群 Hive Metastore,只需要将 Hive 的配置文件 hive-site.xml 放置到 Spark 的配置文件目录即可。

      通过 beeline 连接 spark thriftserver,查看数据库:

      show databases;
      

      【大数据】Hudi HMS Catalog 完全使用指南,在这里插入图片描述,第4张

      可以看到刚刚在 Flink 中创建的 hudidb 数据库。现在查看里面的表:

      use hudidb;
      show tables;
      

      【大数据】Hudi HMS Catalog 完全使用指南,在这里插入图片描述,第5张

      由于在将数据写入 Hudi 时,默认会新增 _hoodie_commit_time、_hoodie_record_key、precombine 以及 _hoodie_file_name 用于内部使用,如果使用 select * 进行查询时会查出上述字段。

      以查看 orders_hudi 表数据为例:

      select * from orders_hudi;
      

      【大数据】Hudi HMS Catalog 完全使用指南,在这里插入图片描述,第6张

      所以为了正确查询,需要指定字段:

      select uuid, ts, num from orders_hudi;
      

      【大数据】Hudi HMS Catalog 完全使用指南,在这里插入图片描述,第7张

      5.在 Hive 中查看数据

      Hudi MOR 表在将元数据同步到 Hive 时,会同时生成以 _ro 和 _rt 结尾的两张子表。例如有一张表名称为 product_hudi,那么此时,数据库中会有三张和 product_hudi 相关的表,分别为原始 product_hudi 表、product_hudi_ro 和 product_hudi_rt。

      • ro 表表示读优化(read-optimized)查询,只查询 MOR 表中的 parquet 文件部分。
      • rt 表表示快照(real-time,实时)查询,查询全表,该操作需要合并 avro 和 parquet 文件,较为耗时。
      • product_hudi 表用于 Flink 或者 Spark 的元数据管理。

        为了在 Hive 引擎中查看,对于 MERGE_ON_READ 表,至少需要执行过一次压缩,也就是把 avro 文件压缩为 parquet 文件,才能够正常查看数据。由于上述操作为批量操作,默认是不会触发压缩操作的,所以需要手动触发压缩(该操作 master 分支会支持)。

        所以对 product_hudi 表进行手动压缩

        ./bin/flink run -c \
        org.apache.hudi.sink.compact.HoodieFlinkCompactor \
        lib/hudi-flink1.13-bundle_2.12-0.12.0.jar \
        --path hdfs://bigdata:8020/warehouse/tablespace/managed/hive/hudidb.db/product_hudi \
        --schedule
        

        注意:如果是使用 Flink 将数据实时流式写入 Hudi 的话,默认在写入五次时会自动触发压缩,不需要手动执行。

        为了在 Hive 中查看 Hudi 所有数据,需要设置如下参数:

        set hive.input.format = org.apache.hadoop.hive.ql.io.HiveInputFormat;
        

        如果设置该参数出现如下报错:

        Error: Error while processing statement: Cannot modify hive.input.format at runtime. It is not in list of params that are allowed to be modified at runtime (state=42000,code=1)

        那么通过如下方式修改 hive-site.xml 配置,新增如下配置,然后重启 Hive 即可。

        hive.security.authorization.sqlstd.confwhitelist.append = hive.input.format
        

        【大数据】Hudi HMS Catalog 完全使用指南,在这里插入图片描述,第8张

        进入 Hive 客户端

        设置变量:

        set hive.input.format = org.apache.hadoop.hive.ql.io.HiveInputFormat;
        

        查看数据库:

        show databases;
        

        【大数据】Hudi HMS Catalog 完全使用指南,在这里插入图片描述,第9张

        查看 hudidb 库中 product_hudi 表数据:

        select * from hudidb.product_hudi;
        

        【大数据】Hudi HMS Catalog 完全使用指南,在这里插入图片描述,第10张

网友评论

搜索
最新文章
热门文章
热门标签