上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

【flink番外篇】9、Flink Table API 支持的操作示例(5)- 表的列操作

guduadmin11天前

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列

    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列

    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列

    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列

    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列

    本部分和实际的运维、监控工作相关。

    二、Flink 示例专栏

    Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

    两专栏的所有文章入口点击:Flink 系列文章汇总索引


    文章目录

    • Flink 系列文章
    • 一、maven依赖
    • 二、表的列操作

      本文给出针对表字段的各种操作及验证。

      如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

      本文除了maven依赖外,没有其他依赖。

      本文需要有kafka的运行环境。

      本文更详细的内容可参考文章:

      17、Flink 之Table API: Table API 支持的操作(1)

      17、Flink 之Table API: Table API 支持的操作(2)

      本专题分为以下几篇文章:

      【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表

      【flink番外篇】9、Flink Table API 支持的操作示例(2)- 通过Table API 和 SQL 创建视图

      【flink番外篇】9、Flink Table API 支持的操作示例(3)- 通过API查询表和使用窗口函数的查询

      【flink番外篇】9、Flink Table API 支持的操作示例(4)- Table API 对表的查询、过滤操作

      【flink番外篇】9、Flink Table API 支持的操作示例(5)- 表的列操作

      【flink番外篇】9、Flink Table API 支持的操作示例(6)- 表的聚合(group by、Distinct、GroupBy/Over Window Aggregation)操作

      【flink番外篇】9、Flink Table API 支持的操作示例(7)- 表的join操作(内联接、外联接以及联接自定义函数等)

      【flink番外篇】9、Flink Table API 支持的操作示例(8)- 时态表的join(scala版本)

      【flink番外篇】9、Flink Table API 支持的操作示例(9)- 表的union、unionall、intersect、intersectall、minus、minusall和in的操作

      【flink番外篇】9、Flink Table API 支持的操作示例(10)- 表的OrderBy、Offset 和 Fetch、insert操作

      【flink番外篇】9、Flink Table API 支持的操作示例(11)- Group Windows(tumbling、sliding和session)操作

      【flink番外篇】9、Flink Table API 支持的操作示例(12)- Over Windows(有界和无界的over window)操作

      【flink番外篇】9、Flink Table API 支持的操作示例(13)- Row-based(map、flatmap、aggregate、group window aggregate等)操作

      【flink番外篇】9、Flink Table API 支持的操作示例(14)- 时态表的join(java版本)

      【flink番外篇】9、Flink Table API 支持的操作示例(1)-完整版

      【flink番外篇】9、Flink Table API 支持的操作示例(2)-完整版

      一、maven依赖

      本文maven依赖参考文章:【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表 中的依赖,为节省篇幅不再赘述。

      二、表的列操作

      针对表的字段进行操作,具体示例如下,运行结果在源文件中。

      import static org.apache.flink.table.api.Expressions.$;
      import static org.apache.flink.table.api.Expressions.row;
      import static org.apache.flink.table.api.Expressions.and;
      import static org.apache.flink.table.api.Expressions.concat;
      import org.apache.flink.api.java.tuple.Tuple2;
      import org.apache.flink.streaming.api.datastream.DataStream;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.table.api.DataTypes;
      import org.apache.flink.table.api.Table;
      import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
      import org.apache.flink.types.Row;
      /**
       * @author alanchan
       *
       */
      public class TestTableAPIOperationDemo {
      	static String sourceSql = "CREATE TABLE Alan_KafkaTable (\r\n" 
      			+ "  `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',\r\n" 
      			+ "  `partition` BIGINT METADATA VIRTUAL,\r\n"
      			+ "  `offset` BIGINT METADATA VIRTUAL,\r\n" 
      			+ "  `user_id` BIGINT,\r\n" 
      			+ "  `item_id` BIGINT,\r\n" 
      			+ "  `behavior` STRING\r\n" 
      			+ ") WITH (\r\n"
      			+ "  'connector' = 'kafka',\r\n" 
      			+ "  'topic' = 'user_behavior',\r\n"
      			+ "  'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',\r\n" 
      			+ "  'properties.group.id' = 'testGroup',\r\n"
      			+ "  'scan.startup.mode' = 'earliest-offset',\r\n" 
      			+ "  'format' = 'csv'\r\n" 
      			+ ");";
      	/**
      	 * @param args
      	 * @throws Exception
      	 */
      	public static void main(String[] args) throws Exception {
      //		test1();
      //		test2();
      		test3();
      		
      	}
      	
      	static void test3() throws Exception {
      		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
      		
      		// 建表
      		tenv.executeSql(sourceSql);
      		
      		Table table1 = tenv.from("Alan_KafkaTable");
      		
      		// 重命名字段。
      		Table result = table1.as("a","b","c","d","e","f");
      		DataStream> resultDS = tenv.toRetractStream(result, Row.class);
      		resultDS.print();
      		//11> (true,+I[2023-11-01T11:00:30.183, 0, 2, 1, 1002, login])
      		
      		//和 SQL 的 WHERE 子句类似。 过滤掉未验证通过过滤谓词的行。
      		Table table2 = result.where($("f").isEqual("login"));
      		DataStream> result2DS = tenv.toRetractStream(table2, Row.class);
      		result2DS.print();
      		//11> (true,+I[2023-11-01T11:00:30.183, 0, 2, 1, 1002, login])
      		
      		Table table3 = result.where($("f").isNotEqual("login"));
      		DataStream> result3DS = tenv.toRetractStream(table3, Row.class);
      		result3DS.print();
      		// 没有匹配条件的记录,无输出
      		
      		Table table4 = result
      									.filter(
      											and(
      													$("f").isNotNull(),
      //													$("d").isGreater(1)
      													$("e").isNotNull()
      													)
      											);
      		DataStream> result4DS = tenv.toRetractStream(table4, Row.class);
      		result4DS.print("test filter:");
      		//test filter::11> (true,+I[2023-11-01T11:00:30.183, 0, 2, 1, 1002, login])
      		
      		env.execute();
      	}
      	
      	/**
      	 * 和 SQL 查询中的 VALUES 子句类似。 基于提供的行生成一张内联表。
      	 * 
      	 * 你可以使用 row(...) 表达式创建复合行:
      	 * 
      	 * @throws Exception
      	 */
      	static void test2() throws Exception {
      		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
      		Table table = tenv.fromValues(row(1, "ABC"), row(2L, "ABCDE"));
      		table.printSchema();
      //		(
      //				  `f0` BIGINT NOT NULL,
      //				  `f1` VARCHAR(5) NOT NULL
      //		)
      		DataStream> resultDS = tenv.toRetractStream(table, Row.class);
      		resultDS.print();
      //		1> (true,+I[2, ABCDE])
      //		2> (true,+I[1, ABC])
      		Table table2 = tenv.fromValues(
      			    DataTypes.ROW(
      			        DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)),
      			        DataTypes.FIELD("name", DataTypes.STRING())
      			    ),
      			    row(1, "ABCD"),
      			    row(2L, "ABCDEF")
      			);
      		table2.printSchema();
      //		(
      //				  `id` DECIMAL(10, 2),
      //				  `name` STRING
      //		)
      		DataStream> result2DS = tenv.toRetractStream(table2, Row.class);
      		result2DS.print();
      //		15> (true,+I[2.00, ABCDEF])
      //		14> (true,+I[1.00, ABCD])
      		env.execute();
      	}
      	/**
      	 * 和 SQL 查询的 FROM 子句类似。 执行一个注册过的表的扫描。
      	 * 
      	 * @throws Exception
      	 */
      	static void test1() throws Exception {
      		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
      		// 建表
      		tenv.executeSql(sourceSql);
      		// 查询
      //		tenv.from("Alan_KafkaTable").execute().print();
      		// kafka输入数据
      		// 1,1002,login
      		// 应用程序控制台输出如下
      //		+----+-------------------------+----------------------+----------------------+----------------------+----------------------+--------------------------------+
      //		| op |              event_time |            partition |               offset |              user_id |              item_id |                       behavior |
      //		+----+-------------------------+----------------------+----------------------+----------------------+----------------------+--------------------------------+
      //		| +I | 2023-11-01 11:00:30.183 |                    0 |                    2 |                    1 |                 1002 |                          login |
      		Table temp = tenv.from("Alan_KafkaTable");
      		//和 SQL 的 SELECT 子句类似。 执行一个 select 操作
      		Table result1 = temp.select($("user_id"), $("item_id").as("behavior"), $("event_time"));
      		DataStream> result1DS = tenv.toRetractStream(result1, Row.class);
      //		result1DS.print();
      // 11> (true,+I[1, 1002, 2023-11-01T11:00:30.183])
      		
      		//选择星号(*)作为通配符,select 表中的所有列。
      		Table result2 = temp.select($("*"));
      		DataStream> result2DS = tenv.toRetractStream(result2, Row.class);
      		result2DS.print();
      // 11> (true,+I[2023-11-01T11:00:30.183, 0, 2, 1, 1002, login])
      		env.execute();
      	}
      	static void test5() throws Exception {
      		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
      		
      		// 建表
      		tenv.executeSql(sourceSql);
      		Table table = tenv.from("Alan_KafkaTable");
      		//和 SQL 的 GROUP BY 子句类似。 使用分组键对行进行分组,使用伴随的聚合算子来按照组进行聚合行。
      		Table result = table.groupBy($("user_id")).select($("user_id"), $("user_id").count().as("count(user_id)"));
      		
      		DataStream> resultDS = tenv.toRetractStream(result, Row.class);
      		resultDS.print();
      //		12> (true,+I[1, 1])
      		
      		env.execute();
      	}
      	
      	static void test4() throws Exception {
      		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
      		// 建表
      		tenv.executeSql(sourceSql);
      		Table table = tenv.from("Alan_KafkaTable");
      		//执行字段添加操作。 如果所添加的字段已经存在,将抛出异常。
      		Table result2 = table.addColumns($("behavior").plus(1).as("t_col1"));
      		result2.printSchema();
      //		(
      //				  `event_time` TIMESTAMP(3),
      //				  `partition` BIGINT,
      //				  `offset` BIGINT,
      //				  `user_id` BIGINT,
      //				  `item_id` BIGINT,
      //				  `behavior` STRING,
      //				  `t_col1` STRING
      //				)
      		
      		Table result = table.addColumns($("behavior").plus(1).as("t_col3"), concat($("behavior"), "alanchan").as("t_col4"));
      		result.printSchema();
      //		(
      //				  `event_time` TIMESTAMP(3),
      //				  `partition` BIGINT,
      //				  `offset` BIGINT,
      //				  `user_id` BIGINT,
      //				  `item_id` BIGINT,
      //				  `behavior` STRING,
      //				  `t_col3` STRING,
      //				  `t_col4` STRING
      //				)
      		
      		Table result3 = table.addColumns(concat($("behavior"), "alanchan").as("t_col4"));
      		result3.printSchema();
      //		(
      //				  `event_time` TIMESTAMP(3),
      //				  `partition` BIGINT,
      //				  `offset` BIGINT,
      //				  `user_id` BIGINT,
      //				  `item_id` BIGINT,
      //				  `behavior` STRING,
      //				  `t_col4` STRING
      //				)
      		//执行字段添加操作。 如果添加的列名称和已存在的列名称相同,则已存在的字段将被替换。 此外,如果添加的字段里面有重复的字段名,则会使用最后一个字段。
      		Table result4 = result3.addOrReplaceColumns(concat($("t_col4"), "alanchan").as("t_col"));
      		result4.printSchema();
      //		(
      //				  `event_time` TIMESTAMP(3),
      //				  `partition` BIGINT,
      //				  `offset` BIGINT,
      //				  `user_id` BIGINT,
      //				  `item_id` BIGINT,
      //				  `behavior` STRING,
      //				  `t_col4` STRING,
      //				  `t_col` STRING
      //				)
      		
      		Table result5 = result4.dropColumns($("t_col4"), $("t_col"));
      		result5.printSchema();
      //		(
      //				  `event_time` TIMESTAMP(3),
      //				  `partition` BIGINT,
      //				  `offset` BIGINT,
      //				  `user_id` BIGINT,
      //				  `item_id` BIGINT,
      //				  `behavior` STRING
      //				)
      		
      		//执行字段重命名操作。 字段表达式应该是别名表达式,并且仅当字段已存在时才能被重命名。
      		Table result6 = result4.renameColumns($("t_col4").as("col1"), $("t_col").as("col2"));
      		result6.printSchema();
      //		(
      //				  `event_time` TIMESTAMP(3),
      //				  `partition` BIGINT,
      //				  `offset` BIGINT,
      //				  `user_id` BIGINT,
      //				  `item_id` BIGINT,
      //				  `behavior` STRING,
      //				  `col1` STRING,
      //				  `col2` STRING
      //				)
      		
      		DataStream> resultDS = tenv.toRetractStream(table, Row.class);
      		resultDS.print();
      //		11> (true,+I[2023-11-01T11:00:30.183, 0, 2, 1, 1002, login])
      		
      		env.execute();
      	}
      }
      

      以上,本文给出针对表字段的各种操作及验证。

      如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

      本文更详细的内容可参考文章:

      17、Flink 之Table API: Table API 支持的操作(1)

      17、Flink 之Table API: Table API 支持的操作(2)

      本专题分为以下几篇文章:

      【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表

      【flink番外篇】9、Flink Table API 支持的操作示例(2)- 通过Table API 和 SQL 创建视图

      【flink番外篇】9、Flink Table API 支持的操作示例(3)- 通过API查询表和使用窗口函数的查询

      【flink番外篇】9、Flink Table API 支持的操作示例(4)- Table API 对表的查询、过滤操作

      【flink番外篇】9、Flink Table API 支持的操作示例(5)- 表的列操作

      【flink番外篇】9、Flink Table API 支持的操作示例(6)- 表的聚合(group by、Distinct、GroupBy/Over Window Aggregation)操作

      【flink番外篇】9、Flink Table API 支持的操作示例(7)- 表的join操作(内联接、外联接以及联接自定义函数等)

      【flink番外篇】9、Flink Table API 支持的操作示例(8)- 时态表的join(scala版本)

      【flink番外篇】9、Flink Table API 支持的操作示例(9)- 表的union、unionall、intersect、intersectall、minus、minusall和in的操作

      【flink番外篇】9、Flink Table API 支持的操作示例(10)- 表的OrderBy、Offset 和 Fetch、insert操作

      【flink番外篇】9、Flink Table API 支持的操作示例(11)- Group Windows(tumbling、sliding和session)操作

      【flink番外篇】9、Flink Table API 支持的操作示例(12)- Over Windows(有界和无界的over window)操作

      【flink番外篇】9、Flink Table API 支持的操作示例(13)- Row-based(map、flatmap、aggregate、group window aggregate等)操作

      【flink番外篇】9、Flink Table API 支持的操作示例(14)- 时态表的join(java版本)

      【flink番外篇】9、Flink Table API 支持的操作示例(1)-完整版

      【flink番外篇】9、Flink Table API 支持的操作示例(2)-完整版

网友评论

搜索
最新文章
热门文章
热门标签