Flink 系列文章
一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
-
1、Flink 部署系列
本部分介绍Flink的部署、配置相关基础内容。
-
2、Flink基础系列
本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
-
3、Flik Table API和SQL基础系列
本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
-
4、Flik Table API和SQL提高与应用系列
本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
-
5、Flink 监控系列
本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
文章目录
- Flink 系列文章
- 一、通过 Table API 和 SQL Client 操作 HiveCatalog
- 1、注册 Catalog
- 1)、方式一:java实现
- 2)、方式二:yaml配置
- 2、修改当前的 Catalog 和数据库
- 1)、java实现
- 2)、sql
- 3、列出可用的 Catalog
- 1)、java实现
- 2)、sql
- 4、列出可用的数据库
- 1)、java实现
- 2)、sql
- 5、列出可用的表
- 1)、java实现
- 2)、sql
本文以示例展示了sql 和 table api 操作hivecatalog。
一、通过 Table API 和 SQL Client 操作 HiveCatalog
1、注册 Catalog
用户可以访问默认创建的内存 Catalog default_catalog,这个 Catalog 默认拥有一个默认数据库 default_database。 用户也可以注册其他的 Catalog 到现有的 Flink 会话中。
以下通过api 和 配置文件注册catalog及配置。
1)、方式一:java实现
public class TestCreateHiveTable { public static final String tableName = "alan_hivecatalog_hivedb_testTable"; public static final String hive_create_table_sql = "CREATE TABLE " + tableName + " (\n" + " id INT,\n" + " name STRING,\n" + " age INT" + ") " + "TBLPROPERTIES (\n" + " 'sink.partition-commit.delay'='5 s',\n" + " 'sink.partition-commit.trigger'='partition-time',\n" + " 'sink.partition-commit.policy.kind'='metastore,success-file'" + ")"; /** * @param args * @throws DatabaseAlreadyExistException * @throws CatalogException */ public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tenv = StreamTableEnvironment.create(env); String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf"; String name = "alan_hive"; // default 数据库名称 String defaultDatabase = "default"; HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir); tenv.registerCatalog("alan_hive", hiveCatalog); tenv.useCatalog("alan_hive"); String newDatabaseName = "alan_hivecatalog_hivedb"; tenv.useDatabase(newDatabaseName); // 创建表 tenv.getConfig().setSqlDialect(SqlDialect.HIVE); tenv.executeSql(hive_create_table_sql); // 插入数据 String insertSQL = "insert into alan_hivecatalog_hivedb_testTable values (1,'alan',18)"; tenv.executeSql(insertSQL); // 查询数据 String selectSQL = "select * from alan_hivecatalog_hivedb_testTable" ; Table table = tenv.sqlQuery(selectSQL); table.printSchema(); DataStream
> result = tenv.toRetractStream(table, Row.class); result.print(); env.execute(); } } 2)、方式二:yaml配置
# 定义 catalogs catalogs: - name: alan_hivecatalog type: hive property-version: 1 hive-conf-dir: /usr/local/bigdata/apache-hive-3.1.2-bin/conf # 须包含 hive-site.xml # 改变表程序基本的执行行为属性。 execution: planner: blink # 可选: 'blink' (默认)或 'old' type: streaming # 必选:执行模式为 'batch' 或 'streaming' result-mode: table # 必选:'table' 或 'changelog' max-table-result-rows: 1000000 # 可选:'table' 模式下可维护的最大行数(默认为 1000000,小于 1 则表示无限制) time-characteristic: event-time # 可选: 'processing-time' 或 'event-time' (默认) parallelism: 1 # 可选:Flink 的并行数量(默认为 1) periodic-watermarks-interval: 200 # 可选:周期性 watermarks 的间隔时间(默认 200 ms) max-parallelism: 16 # 可选:Flink 的最大并行数量(默认 128) min-idle-state-retention: 0 # 可选:表程序的最小空闲状态时间 max-idle-state-retention: 0 # 可选:表程序的最大空闲状态时间 current-catalog: alan_hivecatalog # 可选:当前会话 catalog 的名称(默认为 'default_catalog') current-database: viewtest_db # 可选:当前 catalog 的当前数据库名称(默认为当前 catalog 的默认数据库) restart-strategy: # 可选:重启策略(restart-strategy) type: fallback # 默认情况下“回退”到全局重启策略
2、修改当前的 Catalog 和数据库
Flink 始终在当前的 Catalog 和数据库中寻找表、视图和 UDF。
1)、java实现
代码片段,只列出了关键的代码。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tenv = StreamTableEnvironment.create(env); String catalogName = "alan_hive"; String defaultDatabase = "default"; String databaseName = "viewtest_db"; String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf"; HiveCatalog hiveCatalog = new HiveCatalog(catalogName, defaultDatabase, hiveConfDir); tenv.registerCatalog(catalogName, hiveCatalog); tenv.useCatalog(catalogName); hiveCatalog.createDatabase(databaseName, new CatalogDatabaseImpl(new HashMap(), hiveConfDir) { }, true); // tenv.executeSql("create database "+databaseName); tenv.useDatabase(databaseName);
2)、sql
Flink SQL> USE CATALOG alan_hive; Flink SQL> USE viewtest_db;
通过提供全限定名 catalog.database.object 来访问不在当前 Catalog 中的元数据信息。
- java
tenv.from("not_the_current_catalog.not_the_current_db.my_table");
- sql
Flink SQL> SELECT * FROM not_the_current_catalog.not_the_current_db.my_table;
3、列出可用的 Catalog
1)、java实现
tenv.listCatalogs();
2)、sql
show catalogs;
4、列出可用的数据库
1)、java实现
tenv.listDatabases();
2)、sql
show databases;
5、列出可用的表
1)、java实现
tenv.listTables();
2)、sql
show tables;
以上,本文以示例展示了sql 和 table api 操作hivecatalog。
- sql
- java
猜你喜欢
- 2小时前华为OD机试真题2023 C & D卷(Python&C++)
- 2小时前华为OD机试真题 2023 B + 2023 C&D 卷(JAVA&JS&Python&C++)
- 2小时前有什么提高编程能力的书籍推荐吗?
- 2小时前【大数据处理技术实践】期末考查题目:集群搭建、合并文件与数据统计可视化
- 2小时前清华大学计算机学科推荐学术会议和期刊列表——网络与信息安全
- 2小时前idea就改完内存启动不了怎么办
- 2小时前x-cmd pkg | pdfcpu - 强大的 PDF 处理工具
- 2小时前记CMS FGC 的一次调优
- 2小时前数据分析实战丨基于pygal与requests分析GitHub最受欢迎的Python库
- 2小时前当律师要考什么大学(当律师要考什么大学及分数线)
网友评论
- 搜索
- 最新文章
- 热门文章