Flink 系列文章
一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
-
1、Flink 部署系列
本部分介绍Flink的部署、配置相关基础内容。
-
2、Flink基础系列
本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
-
3、Flik Table API和SQL基础系列
本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
-
4、Flik Table API和SQL提高与应用系列
本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
-
5、Flink 监控系列
本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
文章目录
- Flink 系列文章
- 一、DataStream 和 Table集成-Changelog Streams变化流示例
- 1、maven依赖
- 2、Changelog Streams集成说明
- 3、fromChangelogStream示例
- 4、toChangelogStream示例
本文介绍了Flink 的Changelog Streams与table 的集成2个示例。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文除了maven依赖外,没有其他依赖。
更多详细内容参考文章:
21、Flink 的table API与DataStream API 集成(完整版)
一、DataStream 和 Table集成-Changelog Streams变化流示例
1、maven依赖
UTF-8 UTF-8 1.8 1.8 1.8 2.12 1.17.0 org.apache.flink flink-clients ${flink.version} provided org.apache.flink flink-java ${flink.version} provided org.apache.flink flink-table-common ${flink.version} provided org.apache.flink flink-streaming-java ${flink.version} provided org.apache.flink flink-table-api-java-bridge ${flink.version} provided org.apache.flink flink-sql-gateway ${flink.version} provided org.apache.flink flink-csv ${flink.version} provided org.apache.flink flink-json ${flink.version} provided org.apache.flink flink-table-planner_2.12 ${flink.version} provided org.apache.flink flink-table-api-java-uber ${flink.version} provided org.apache.flink flink-table-runtime ${flink.version} provided org.apache.flink flink-connector-jdbc 3.1.0-1.17 mysql mysql-connector-java 5.1.38 org.apache.flink flink-connector-hive_2.12 1.17.0 org.apache.hive hive-exec 3.1.2 org.apache.flink flink-connector-kafka ${flink.version} org.apache.flink flink-sql-connector-kafka ${flink.version} provided org.apache.commons commons-compress 1.24.0 org.projectlombok lombok 1.18.2 2、Changelog Streams集成说明
在内部,Flink的表运行时是一个changelog处理器。
StreamTableEnvironment提供了以下方法来暴露change data capture(CDC)功能:
-
fromChangelogStream(DataStream):将变更日志条目流(stream of changelog entries)解释为表。流记录类型必须为org.apache.flink.types.Row,因为其RowKind标志在运行时评估(evaluated )。默认情况下,不会传播事件时间和水印。该方法期望将包含所有类型更改的changelog(在org.apache.flink.types.RowKind中枚举)作为默认的ChangelogMode。
-
fromChangelogStream(DataStream, Schema):允许为DataStream定义类似于fromDataStream(DataStream ,schema )的schema 。否则,语义等于fromChangelogStream(DataStream)。
-
fromChangelogStream(DataStream, Schema, ChangelogMode):提供关于如何将stream 解释为changelog的完全控制。传递的ChangelogMode有助于planner 区分insert-only, upsert, or retract行为。
-
toChangelogStream(Table):fromChangelogStream(DataStream)的反向操作。它生成一个包含org.apache.flink.types.Row实例的流,并在运行时为每个记录设置RowKind标志。该方法支持各种更新表。如果输入表包含单个rowtime 列(single rowtime column),则它将传播到流记录的时间戳中(stream record’s timestamp)。水印也将被传播。
-
toChangelogStream(Table, Schema):fromChangelogStream(DataStream,Schema)的反向操作。该方法可以丰富生成的列数据类型。如果需要,planner 可以插入隐式转换。可以将rowtime写出为元数据列。
-
toChangelogStream(Table, Schema, ChangelogMode):提供关于如何将表转换为变更日志流(convert a table to a changelog stream)的完全控制。传递的ChangelogMode有助于planner 区分insert-only, upsert, or retract 行为。
从Table API的角度来看,和DataStream API的转换类似于读取或写入在SQL中使用CREATE Table DDL定义的虚拟表连接器。
由于fromChangelogStream的行为类似于fromDataStream。
此虚拟连接器还支持读取和写入流记录的rowtime 元数据。
虚拟表源实现SupportsSourceWatermark。
3、fromChangelogStream示例
下面的代码展示了如何将fromChangelogStream用于不同的场景。
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; /** * @author alanchan * */ public class TestFromChangelogStreamDemo { //the stream as a retract stream //默认ChangelogMode应该足以满足大多数用例,因为它接受所有类型的更改。 public static void test1() throws Exception { // 1、创建运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tenv = StreamTableEnvironment.create(env); // 2、创建数据源 DataStream
dataStream = env.fromElements( Row.ofKind(RowKind.INSERT, "alan", 12), Row.ofKind(RowKind.INSERT, "alanchan", 5), Row.ofKind(RowKind.UPDATE_BEFORE, "alan", 12), Row.ofKind(RowKind.UPDATE_AFTER, "alan", 100)); // 3、changlogstream转为table Table table = tenv.fromChangelogStream(dataStream); // 4、创建视图 tenv.createTemporaryView("InputTable", table); //5、聚合查询 tenv.executeSql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY f0") .print(); // +----+--------------------------------+-------------+ // | op | name | score | // +----+--------------------------------+-------------+ // | +I | alanchan | 5 | // | +I | alan | 12 | // | -D | alan | 12 | // | +I | alan | 100 | // +----+--------------------------------+-------------+ // 4 rows in set env.execute(); } //the stream as an upsert stream (without a need for UPDATE_BEFORE) //展示了如何通过使用upsert模式将更新消息的数量减少50%来限制传入更改的类型以提高效率。 //通过为toChangelogStream定义主键和upsert changelog模式,可以减少结果消息的数量。 public static void test2() throws Exception { // 1、创建运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tenv = StreamTableEnvironment.create(env); //2、创建数据源 DataStream
dataStream = env.fromElements( Row.ofKind(RowKind.INSERT, "alan", 12), Row.ofKind(RowKind.INSERT, "alanchan", 5), Row.ofKind(RowKind.UPDATE_AFTER, "alan", 100)); // 3、转为table Table table = tenv.fromChangelogStream( dataStream, Schema.newBuilder().primaryKey("f0").build(), ChangelogMode.upsert()); // 4、创建视图 tenv.createTemporaryView("InputTable", table); // 5、聚合查询 tenv.executeSql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY f0") .print(); // +----+--------------------------------+-------------+ // | op | name | score | // +----+--------------------------------+-------------+ // | +I | alanchan | 5 | // | +I | alan | 12 | // | -U | alan | 12 | // | +U | alan | 100 | // +----+--------------------------------+-------------+ // 4 rows in set env.execute(); } public static void main(String[] args) throws Exception { // test1(); test2(); } }
4、toChangelogStream示例
下面的代码展示了如何将toChangelogStream用于不同的场景。
import static org.apache.flink.table.api.Expressions.$; import static org.apache.flink.table.api.Expressions.row; import java.time.Instant; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.data.StringData; import org.apache.flink.types.Row; import org.apache.flink.util.Collector; /** * @author alanchan * */ public class TestToChangelogStreamDemo { static final String SQL = "CREATE TABLE GeneratedTable " + "(" + " name STRING," + " score INT," + " event_time TIMESTAMP_LTZ(3)," + " WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND" + ")" + "WITH ('connector'='datagen')"; //以最简单和最通用的方式转换为DataStream(无事件时间) public static void test1() throws Exception { // 1、创建运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tenv = StreamTableEnvironment.create(env); // 2、构建数据源并聚合查询 Table simpleTable = tenv .fromValues(row("alan", 12), row("alan", 2), row("alanchan", 12)) .as("name", "score") .groupBy($("name")) .select($("name"), $("score").sum()); // 3、将table转成datastream,并输出 tenv .toChangelogStream(simpleTable) .executeAndCollect() .forEachRemaining(System.out::println); // +I[alanchan, 12] // +I[alan, 12] // -U[alan, 12] // +U[alan, 14] env.execute(); } //以最简单和最通用的方式转换为DataStream(使用事件时间) //由于`event_time`是schema的单个时间属性,因此它默认设置为流记录的时间戳;同时,它仍然是Row的一部分 public static void test2() throws Exception { // 1、创建运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tenv = StreamTableEnvironment.create(env); // 2、建表并填入数据 tenv.executeSql(SQL); Table table = tenv.from("GeneratedTable"); DataStream
dataStream = tenv.toChangelogStream(table); dataStream.process( new ProcessFunction
() { @Override public void processElement(Row row, Context ctx, Collector
out) { System.out.println(row.getFieldNames(true)); // [name, score, event_time] // timestamp exists twice assert ctx.timestamp() == row. getFieldAs("event_time").toEpochMilli(); } }); env.execute(); } //转换为DataStream,但将time属性写出为元数据列,这意味着它不再是physical schema的一部分 public static void test3() throws Exception { // 1、创建运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tenv = StreamTableEnvironment.create(env); // 2、建表并填入数据 tenv.executeSql(SQL); Table table = tenv.from("GeneratedTable"); DataStream dataStream = tenv.toChangelogStream( table, Schema.newBuilder() .column("name", "STRING") .column("score", "INT") .columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)") .build()); // the stream record's timestamp is defined by the metadata; it is not part of the Row dataStream.process( new ProcessFunction
() { @Override public void processElement(Row row, Context ctx, Collector
out) { // prints: [name, score] System.out.println(row.getFieldNames(true)); // timestamp exists once System.out.println(ctx.timestamp()); } }); env.execute(); } //可以使用更多的内部数据结构以提高效率 //这里提到这只是为了完整性,因为使用内部数据结构增加了复杂性和额外的类型处理 //将TIMESTAMP_LTZ列转换为`Long`或将STRING转换为`byte[]`可能很方便,如果需要,结构化类型也可以表示为`Row` public static void test4() throws Exception { // 1、创建运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tenv = StreamTableEnvironment.create(env); // 2、建表并填入数据 tenv.executeSql(SQL); Table table = tenv.from("GeneratedTable"); DataStream dataStream = tenv.toChangelogStream( table, Schema.newBuilder() .column( "name", DataTypes.STRING().bridgedTo(StringData.class)) .column( "score", DataTypes.INT()) .column( "event_time", DataTypes.TIMESTAMP_LTZ(3).bridgedTo(Long.class)) .build()); dataStream.print(); // 12> +I[1b6717eb5d93058ac3b40458a8a549a5e2fbb3b0fa146b36b7c58b5ebc1606cfc26ff9e4ebc3277832b9a8a0bfa1451d6608, 836085755, 1699941384531] // 9> +I[6169d2f3a4766f5fce51cba66ccd33772ab72a690381563426417c75766f99de8b1fd5c3c7fc5ec48954df9299456f433fa9, -766105729, 1699941384531] // 10> +I[e5a815e53d8fdf91b9382d7b15b6c076c5449e27b7ce505520c4334aba227d9a2fefd3333b2609704334b6fb866c244cf03d, 1552621997, 1699941384531] env.execute(); } public static void main(String[] args) throws Exception { // test1(); // test2(); // test3(); test4(); } }
示例test4()中数据类型支持哪些转换的更多信息,请参阅table API的数据类型页面。
toChangelogStream(Table).executeAndCollect()的行为等于调用Table.execute().collect()。然而,toChangelogStream(表)对于测试可能更有用,因为它允许访问DataStream API中后续ProcessFunction中生成的水印。
以上,本文介绍了Flink 的Changelog Streams与table 的集成2个示例。
-
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章