上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

【flink番外篇】6、flink的WaterMark(介绍、基本使用、kafka的水印以及超出最大允许延迟数据的处理)介绍及示例(1) - 介绍

guduadmin21天前

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列

    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列

    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列

    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列

    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列

    本部分和实际的运维、监控工作相关。

    二、Flink 示例专栏

    Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

    两专栏的所有文章入口点击:Flink 系列文章汇总索引


    文章目录

    • Flink 系列文章
    • 一、watermark介绍
      • 1、watermark介绍
      • 2、Watermark 策略简介
      • 3、使用 Watermark 策略
      • 4、处理空闲数据源
      • 5、自定义 WatermarkGenerator
        • 1)、自定义周期性 Watermark 生成器
        • 2)、自定义标记 Watermark 生成器
        • 6、Watermark 策略与 Kafka 连接器
        • 7、算子处理 Watermark 的方式

          本文介绍了Flink WaterMark的基本信息,即水印介绍、策略使用、处理空闲数据、自定义水印生成器、kafka的水印及算子处理水印的方式

          如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

          本专题分为以下几篇文章:

          【flink番外篇】6、flink的WaterMark(介绍、基本使用、kafka的水印以及超出最大允许延迟数据的处理)介绍及示例(1) - 介绍

          【flink番外篇】6、flink的WaterMark(介绍、基本使用、kafka的水印以及超出最大允许延迟数据的处理)介绍及示例(2) - 基本使用和超过最大延迟数据处理

          【flink番外篇】6、flink的WaterMark(介绍、基本使用、kafka的水印以及超出最大允许延迟数据的处理)介绍及示例(3) - kafka的水印

          【flink番外篇】6、flink的WaterMark(介绍、基本使用、kafka的水印以及超出最大允许延迟数据的处理)介绍及示例 - 完整版

          关于时间和水印的更多介绍参考文章:

          7、Flink四大基石之Time和watermark详解与详细示例(watermark基本使用、kafka作为数据源的watermark使用示例以及超出最大允许延迟数据的接收实现)

          一、watermark介绍

          1、watermark介绍

          watermark就是给数据再额外的加的一个时间列,watermark是个时间戳。

          watermark = 数据的事件时间 - 最大允许的延迟时间或乱序时间

          watermark = 当前窗口的最大的事件时间 - 最大允许的延迟时间或乱序时间

          这样可以保证watermark水位线会一直上升(变大),不会下降

          窗口计算的触发条件为

          • 1、窗口中有数据
          • 2、watermark>= 窗口的结束时间

            2、Watermark 策略简介

            使用 Flink API 时需要设置一个同时包含 TimestampAssigner 和 WatermarkGenerator 的 WatermarkStrategy。

            WatermarkStrategy 工具类中也提供了许多常用的 watermark 策略,并且用户也可以在某些必要场景下构建自己的 watermark 策略。

            WatermarkStrategy 接口如下:

            public interface WatermarkStrategy extends TimestampAssignerSupplier, WatermarkGeneratorSupplier {
                // ------------------------------------------------------------------------
                //  实现者需要实现的方法.
                // ------------------------------------------------------------------------
                /** 实例化根据此策略生成水印的WatermarkGenerator. */
                @Override
                WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
                /**
                 * 实例化{@link TimestampAssigner},用于根据此策略分配时间戳.
                 */
                @Override
                default TimestampAssigner createTimestampAssigner(
                        TimestampAssignerSupplier.Context context) {
            		//默认情况下,这是{@link RecordTimestampAssigner},用于记录来自具有有效时间戳的源的情况,例如来自Kafka。
                    return new RecordTimestampAssigner<>();
                }
                @Experimental
                default WatermarkAlignmentParams getAlignmentParameters() {
                    return WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED;
                }
                // ------------------------------------------------------------------------
                //  用于丰富基础水印策略的生成器方法
                // ------------------------------------------------------------------------
                /**
                 * 创建一个新的{@code WatermarkStrategy},该策略包装此策略,但使用给定的{@link TimestampAssigner}(通过{@linkTimestampassignerSupplier})
                 *
                 * 
                 * {@code WatermarkStrategy wmStrategy = WatermarkStrategy
                 *   .forMonotonousTimestamps()
                 *   .withTimestampAssigner((ctx) -> new MetricsReportingAssigner(ctx));
                 * }
                 */
                default WatermarkStrategy withTimestampAssigner(
                        TimestampAssignerSupplier timestampAssigner) {
                    checkNotNull(timestampAssigner, "timestampAssigner");
                    return new WatermarkStrategyWithTimestampAssigner<>(this, timestampAssigner);
                }
                /**
                 * 创建一个新的{@code WatermarkStrategy},该策略包装此策略,但使用给定的{@link SerializableTimestampAssigner}。
                 *
                 * 
                 * {@code WatermarkStrategy wmStrategy = WatermarkStrategy
                 *   .forMonotonousTimestamps()
                 *   .withTimestampAssigner((event, timestamp) -> event.getTimestamp());
                 * }
            */ default WatermarkStrategy withTimestampAssigner( SerializableTimestampAssigner timestampAssigner) { checkNotNull(timestampAssigner, "timestampAssigner"); return new WatermarkStrategyWithTimestampAssigner<>( this, TimestampAssignerSupplier.of(timestampAssigner)); } /** *创建一个新的丰富的{@link WatermarkStrategy},该策略还可以在创建的{@linkWatermarkGenerator}中进行空闲检测。 * *

            Idleness can be important if some partitions have little data and might not have events * during some periods. Without idleness, these streams can stall the overall event time * progress of the application. */ default WatermarkStrategy withIdleness(Duration idleTimeout) { checkNotNull(idleTimeout, "idleTimeout"); checkArgument( !(idleTimeout.isZero() || idleTimeout.isNegative()), "idleTimeout must be greater than zero"); return new WatermarkStrategyWithIdleness<>(this, idleTimeout); } /** * 创建一个新的{@link WatermarkStrategy},用于配置来自同一水印组中其他源/任务/分区的最大水印漂移。 * 该组可能包含完全独立的来源(例如File和Kafka)。 * * @param watermarkGroup A group of sources to align watermarks * @param maxAllowedWatermarkDrift Maximal drift, before we pause consuming from the * source/task/partition */ @Experimental default WatermarkStrategy withWatermarkAlignment( String watermarkGroup, Duration maxAllowedWatermarkDrift) { return withWatermarkAlignment( watermarkGroup, maxAllowedWatermarkDrift, WatermarksWithWatermarkAlignment.DEFAULT_UPDATE_INTERVAL); } /** * 创建一个新的{@link WatermarkStrategy},用于配置来自同一水印组中其他源/任务/分区的最大水印漂移。 * 该组可能包含完全独立的来源(例如File和Kafka)。 * * @param watermarkGroup A group of sources to align watermarks * @param maxAllowedWatermarkDrift Maximal drift, before we pause consuming from the * source/task/partition * @param updateInterval How often tasks should notify coordinator about the current watermark * and how often the coordinator should announce the maximal aligned watermark. */ @Experimental default WatermarkStrategy withWatermarkAlignment( String watermarkGroup, Duration maxAllowedWatermarkDrift, Duration updateInterval) { return new WatermarksWithWatermarkAlignment( this, watermarkGroup, maxAllowedWatermarkDrift, updateInterval); } // ------------------------------------------------------------------------ // 常用水印策略的方便方法 // ------------------------------------------------------------------------ /** * 为时间戳单调递增的情况创建水印策略。 * @see AscendingTimestampsWatermarks */ static WatermarkStrategy forMonotonousTimestamps() { return (ctx) -> new AscendingTimestampsWatermarks<>(); } /** * 为记录无序的情况创建水印策略,但可以设置事件无序程度的上限 * @see BoundedOutOfOrdernessWatermarks */ static WatermarkStrategy forBoundedOutOfOrderness(Duration maxOutOfOrderness) { return (ctx) -> new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness); } /** 基于现有的{@link WatermarkGeneratorSupplier}创建水印策略. */ static WatermarkStrategy forGenerator(WatermarkGeneratorSupplier generatorSupplier) { return generatorSupplier::createWatermarkGenerator; } /** * 创建一个根本不生成水印的水印策略。这在进行纯处理基于时间的流处理的场景中可能很有用。 */ static WatermarkStrategy noWatermarks() { return (ctx) -> new NoWatermarksGenerator<>(); } }

            通常情况下,不用实现此接口,而是可以使用 WatermarkStrategy 工具类中通用的 watermark 策略,或者可以使用这个工具类将自定义的 TimestampAssigner 与 WatermarkGenerator 进行绑定。

            例如,想要使用有界无序(bounded-out-of-orderness)watermark 生成器和一个 lambda 表达式作为时间戳分配器,那么可以按照如下方式实现:

            WatermarkStrategy.>forBoundedOutOfOrderness(Duration.ofSeconds(20))
                    .withTimestampAssigner((event, timestamp) -> event.f0);
            //其中 TimestampAssigner 的设置与否是可选的,大多数情况下,可以不用去特别指定。例如,当使用 Kafka 或 Kinesis 数据源时,你可以直接从 Kafka/Kinesis 数据源记录中获取到时间戳。        
            

            3、使用 Watermark 策略

            WatermarkStrategy 可以在 Flink 应用程序中的两处使用:

            • 第一种是直接在数据源上使用,相比第二种会更好。因为数据源可以利用 watermark 生成逻辑中有关分片/分区(shards/partitions/splits)的信息。使用这种方式,数据源通常可以更精准地跟踪 watermark,整体 watermark 生成将更精确。直接在源上指定 WatermarkStrategy 意味着必须使用特定数据源接口,参考下文的kafka部分,以及有关每个分区的 watermark 是如何生成以及工作的。

            • 第二种是直接在非数据源的操作之后使用,仅当无法直接在数据源上设置策略时,才应该使用第二种方式(在任意转换操作之后设置 WatermarkStrategy)

              final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              DataStream stream = env.readFile(
                      myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
                      FilePathFilter.createDefaultFilter(), typeInfo);
              DataStream withTimestampsAndWatermarks = stream
                      .filter( event -> event.severity() == WARNING )
                      .assignTimestampsAndWatermarks();
              withTimestampsAndWatermarks
                      .keyBy( (event) -> event.getGroup() )
                      .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                      .reduce( (a, b) -> a.add(b) )
                      .addSink(...);
              

              使用 WatermarkStrategy 去获取流并生成带有时间戳的元素和 watermark 的新流时,如果原始流已经具有时间戳或 watermark,则新指定的时间戳分配器将覆盖原有的时间戳和 watermark。

              4、处理空闲数据源

              如果数据源中的某一个分区/分片在一段时间内未发送事件数据,则意味着 WatermarkGenerator 也不会获得任何新数据去生成 watermark。称这类数据源为空闲输入或空闲源。在这种情况下,当某些其他分区仍然发送事件数据的时候就会出现问题。由于下游算子 watermark 的计算方式是取所有不同的上游并行数据源 watermark 的最小值,则其 watermark 将不会发生变化。

              为了解决这个问题,可以使用 WatermarkStrategy 来检测空闲输入并将其标记为空闲状态。

              WatermarkStrategy 为此提供了一个工具接口:

              WatermarkStrategy
                      .>forBoundedOutOfOrderness(Duration.ofSeconds(20))
                      .withIdleness(Duration.ofMinutes(1));
              

              5、自定义 WatermarkGenerator

              可以针对每个事件去生成 watermark。但是由于每个 watermark 都会在下游做一些计算,因此过多的 watermark 会降低程序性能。

              TimestampAssigner 是一个可以从事件数据中提取时间戳字段的简单函数,但是 WatermarkGenerator 的编写相对就要复杂一些了,将在接下来的两小节中介绍如何实现此接口。

              WatermarkGenerator 接口代码如下:

              /**
               * {@code WatermarkGenerator} 可以基于事件或者周期性的生成 watermark。
               *
               * 

              注意: WatermarkGenerator 将以前互相独立的 {@code AssignerWithPunctuatedWatermarks} * 和 {@code AssignerWithPeriodicWatermarks} 一同包含了进来。 */ @Public public interface WatermarkGenerator { /** * 每来一条事件数据调用一次,可以检查或者记录事件的时间戳,或者也可以基于事件数据本身去生成 watermark。 */ void onEvent(T event, long eventTimestamp, WatermarkOutput output); /** * 周期性的调用,也许会生成新的 watermark,也许不会。 * *

              调用此方法生成 watermark 的间隔时间由 {@link ExecutionConfig#getAutoWatermarkInterval()} 决定。 */ void onPeriodicEmit(WatermarkOutput output); }

              watermark 的生成方式本质上是有两种:周期性生成和标记生成。