上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

4 Paimon数据湖之Hive Catalog的使用

guduadmin11天前

更多Paimon数据湖内容请关注:https://edu.51cto.com/course/35051.html

Paimon提供了两种类型的Catalog:Filesystem Catalog和Hive Catalog。

  • Filesystem Catalog:会把元数据信息存储到文件系统里面。
  • Hive Catalog:则会把元数据信息存储到Hive的Metastore里面,这样就可以直接在Hive中访问Paimon表了。注意:此时也会同时在文件系统中存储一份元数据信息,相当于元数据会存储两份,这个大家需要特别注意一下。

    还有就是我们在使用Hive Catalog的时候,Paimon中的数据库名称、表名称,以及字段名称都要小写,因为这些数据存储到Hive Metastore的时候,会统一存储为小写。

    下面我们来具体演示一下Paimon如何使用Hive Catalog来存储元数据。

    在Flink中操作Paimon的时候想要使用Hive Catalog,需要依赖于Flink Hive connector,以及hive-exec和flink-table-api-scala-bridge。

    flink-table-api-scala-bridge这个依赖我们之前已经添加过了,所以只需要添加另外两个即可:

    
    
        org.apache.flink
        flink-connector-hive_2.12
        1.15.0
        
    
    
        org.apache.hive
        hive-exec
        3.1.2
        
            
                org.apache.logging.log4j
                log4j-slf4j-impl
            
        
        
    
    

    创建package:tech.xuwei.paimon.catalog

    创建object:PaimonHiveCatalog

    代码如下:

    package tech.xuwei.paimon.catalog
    import org.apache.flink.api.common.RuntimeExecutionMode
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
    /**
     * Paimon使用Hive Catalog
     * Created by xuwei
     */
    object PaimonHiveCatalog {
      def main(args: Array[String]): Unit = {
        //创建执行环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
        val tEnv = StreamTableEnvironment.create(env)
        //创建Paimon类型的Catalog--使用Hive Catalog
        tEnv.executeSql(
          """
            |CREATE CATALOG paimon_hive_catalog WITH(
            |    'type'='paimon',
            |    'metastore' = 'hive',
            |    'uri' = 'thrift://bigdata04:9083',
            |    'warehouse'='hdfs://bigdata01:9000/paimon'
            |)
            |""".stripMargin)
        tEnv.executeSql("USE CATALOG paimon_hive_catalog")
        //创建Paimon表
        tEnv.executeSql(
          """
            |CREATE TABLE IF NOT EXISTS p_h_t1(
            |    name STRING,
            |    age INT,
            |    PRIMARY KEY (name) NOT ENFORCED
            |)
            |""".stripMargin)
        //向表中插入数据
        tEnv.executeSql(
          """
            |INSERT INTO p_h_t1(name,age) VALUES('jack',18),('tom',20)
            |""".stripMargin)
      }
    }
    

    接下来到bigdata04节点上启动hive的metastore服务。

    [root@bigdata04 ~]# cd /data/soft/apache-hive-3.1.2-bin/
    [root@bigdata04 apache-hive-3.1.2-bin]# nohup bin/hive --service metastore -p 9083 2>&1 >/dev/null &
    

    然后运行代码PaimonHiveCatalog

    代码运行之后可以到先到hdfs中确认一下是否能看到元数据信息:

    [root@bigdata04 ~]# hdfs dfs -cat /paimon/default.db/p_h_t1/schema/schema-0
    {
      "id" : 0,
      "fields" : [ {
        "id" : 0,
        "name" : "name",
        "type" : "STRING NOT NULL"
      }, {
        "id" : 1,
        "name" : "age",
        "type" : "INT"
      } ],
      "highestFieldId" : 1,
      "partitionKeys" : [ ],
      "primaryKeys" : [ "name" ],
      "options" : { }
    

    可以发现,在hdfs中依然是可以看到的,因为我们前面说了,使用hive catalog时也会同时在hdfs中存储一份元数据。

    最后我们到hive中确认一下:

    注意:由于目前bigdata04节点的环境变量中有HADOOP_CLASSPATH,所以直接使用hive客户端会看到很多日志信息,所以建议使用hive的beeline客户端。

    此时需要先启动hiveserver2服务。

    [root@bigdata04 ~]# cd /data/soft/apache-hive-3.1.2-bin/
    [root@bigdata04 apache-hive-3.1.2-bin]# bin/hiveserver2
    

    使用beeline客户端进行连接

    [root@bigdata04 apache-hive-3.1.2-bin]# bin/beeline -u  jdbc:hive2://localhost:10000 -n root
    0: jdbc:hive2://localhost:10000> show tables;
    +--------------------+
    |      tab_name      |
    +--------------------+
    | flink_stu          |
    | orders             |
    | p_h_t1             |
    | s1                 |
    | student_favors     |
    | student_favors_2   |
    | student_score      |
    | student_score_bak  |
    | t1                 |
    +--------------------+
    9 rows selected (1.727 seconds)
    0: jdbc:hive2://localhost:10000> select * from p_h_t1;
    Error: Error while compiling statement: FAILED: RuntimeException java.lang.ClassNotFoundException: org.apache.paimon.hive.mapred.PaimonInputFormat (state=42000,code=40000)
    

    此时是可以在hive中查看到p_h_t1这个表的,但是在操作这个表的时候会报错,提示缺少依赖,现在报这个错是正常的,等后面我们会有一个单独的小节来讲Paimon和Hive引擎的集成。

    目前通过hive catalog可以将paimon的元数据同时存储到hive的metastore中,但是还无法在hive中操作paimon的表,其实主要是因为缺少一个依赖,在这大家先知道这个问题即可。

    注意:如果我们此时操作的是分区表,那么分区信息默认是无法同步到Hive Metastore的。

    也就是说默认情况下,Paimon不会将新创建的分区同步到Hive Metastore中。我们在Hive中只能看到一个未分区的普通表。

    如果想解决这个问题,也很简单,只需要在paimon的表属性中设置metastore.partitioned-table=true即可。

    下面开发一个案例:

    创建object:PaimonHiveCatalogPartitionTable,基于PaimonHiveCatalog进行复制。

    完整代码如下:

    package tech.xuwei.paimon.catalog
    import org.apache.flink.api.common.RuntimeExecutionMode
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
    /**
     * Paimon使用Hive Catalog
     * 操作分区表
     * Created by xuwei
     */
    object PaimonHiveCatalogPartitionTable {
      def main(args: Array[String]): Unit = {
        //创建执行环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
        val tEnv = StreamTableEnvironment.create(env)
        //创建Paimon类型的Catalog--使用Hive Catalog
        tEnv.executeSql(
          """
            |CREATE CATALOG paimon_hive_catalog WITH(
            |    'type'='paimon',
            |    'metastore' = 'hive',
            |    'uri' = 'thrift://bigdata04:9083',
            |    'warehouse'='hdfs://bigdata01:9000/paimon'
            |)
            |""".stripMargin)
        tEnv.executeSql("USE CATALOG paimon_hive_catalog")
        //创建Paimon表
        tEnv.executeSql(
          """
            |CREATE TABLE IF NOT EXISTS p_h_par(
            |    id INT,
            |    name STRING,
            |    dt STRING,
            |    PRIMARY KEY (id, dt) NOT ENFORCED
            |) PARTITIONED BY (dt) WITH(
            |    'metastore.partitioned-table' = 'true'
            |)
            |""".stripMargin)
        //向表中插入数据
        tEnv.executeSql(
          """
            |INSERT INTO p_h_par(id,name,dt)
            |VALUES(1,'jack','20230101'),(2,'tom','20230102')
            |""".stripMargin)
      }
    }
    

    在idea中执行代码。

    然后到hive中进行验证,可以执行show partitions p_h_par;进行验证。

    或者到hive metastore里面进行确认,查看mysql中的partitions表,这个表里面存储的是分区信息,如果能看到分区信息,就说明Paimon表的分区信息同步过来了。

    4 Paimon数据湖之Hive Catalog的使用,在这里插入图片描述,第1张

    这样就说明Paimon表的分区信息同步过来了。

    更多Paimon数据湖内容请关注:https://edu.51cto.com/course/35051.html

网友评论

搜索
最新文章
热门文章
热门标签