上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

Flink(十一)【状态管理】

guduadmin14小时前

Flink 状态管理

        我们一直称 Flink 为运行在数据流上的有状态计算框架和处理引擎。在之前的章节中也已经多次提到了“状态”(state),不论是简单聚合、窗口聚合,还是处理函数的应用,都会有状态的身影出现。状态就如同事务处理时数据库中保存的信息一样,是用来辅助进行任务计算的数据。而在 Flink 这样的分布式系统中,我们不仅需要定义出状态在任务并行时的处理方式,还需要考虑如何持久化保存、以便发生故障时正确地恢复。这就需要一套完整的管理机制来处理所有的状态。

1、Flink 中的状态

        在流处理中,数据是连续不断到来和处理的。每个任务进行计算处理时,可以基于当前数据直接转换得到输出结果;也可以依赖一些其他数据。这些由一个任务维护,并且用来计算输出结果的所有数据,就叫作这个任务的状态。

1.1、概述

        在 Flink 中,算子任务可以分为有状态和无状态两种情况。

1.1.1、无状态算子任务

        无状态的算子任务只需要观察每个独立事件,根据当前输入的数据直接转换输出结果。比如我们之前学的 map、flatMap、filter 等,计算时不依赖其它数据,就属于无状态的算子。

Flink(十一)【状态管理】,第1张

1.1.2、有状态算子任务

        而有状态的算子任务则除了当前数据外,还需要一些其他数据来得到计算结果。这里的“其他数据”就是所谓的状态(State)。比如我们之前学的 聚合算子、窗口算子都属于有状态的算子。

Flink(十一)【状态管理】,第2张

        比如我们之前窗口函数中学的增量聚合函数,每来一条数据它都会把处理后的结果保存到一个中间值,当窗口内再来一条数据就更新这个中间值,这个中间值就是所谓的状态。

        再比如我们的全窗口函数,它会把来的所有数据都保存到一个中间值当中,直到窗口关闭时才会触发计算,同样,这个中间值就是所谓的状态。

        此外还有我们的一些聚合算子比如 sum、min、max 等,它肯定是要把中间结果存储起来的,所以这都叫有状态算子。

1.2、状态的分类

1.2.1、托管状态(Managed State)和原始状态(Raw State)

        Flink 的状态有两种:托管状态(Managed State)和原始状态(Raw State)。托管状态就是由 Flink 统一管理的,也就是管理状态的存储、访问、故障恢复和重组等一系列问题都由 Flink 实现,我们只要调接口就可以;而原始状态则是自定义的,相当于就是开辟了一块内存,需要我们自己管理,实现状态的序列化和故障恢复。

        通常我们都是采用 Flink 托管状态来实现需求。只有在遇到托管状态无法实现的特殊需求时,我们才会考虑使用原始状态;一般情况下不推荐使用。

1.2.2、算子状态和按键分区状态

        我们知道在 Flink 中,一个算子任务会按照并行度分为多个并行子任务执行,而不同的子任务会占据不同的任务槽(task slot)。由于不同的 slot 在计算资源上是物理隔离的,所以 Flink能管理的状态在并行任务间是无法共享的,每个状态只能针对当前子任务的实例有效。而很多有状态的操作(比如聚合、窗口)都是要先做 keyBy 进行按键分区的。按键分区之后,任务所进行的所有计算都应该只针对当前 key 有效,所以状态也应该按照 key 彼此隔离。在这种情况下,状态的访问方式又会有所不同。

        基于这样的想法,我们又可以将托管状态分为两类:算子状态和按键分区状态。

(1)算子状态(Operator State)

        状态作用范围限定为当前的算子任务的各个子任务,也就是只对当前并行子任务实例有效。这就意味着对于一个并行子任务,占据了一个“分区”,它所处理的所有数据都会访问到相同的状态,状态对于同一任务而言是共享的。

Flink(十一)【状态管理】,第3张

        算子状态可以用在所有算子上,使用的时候其实就跟一个本地变量没什么区别——因为本地变量的作用域也是当前任务实例。在使用时,我们需要进一步实现 CheckpointedFunction 接口 。

        新版本的 Flink 重构了 Source (也就是使用 fromSource 的写法),所以新版本则需要继承 SourceReaderBase 抽象类。

(2)按键分区状态

状态是根据输入流中定义的键(key)来维护和访问的,所以只能定义在按键分区流(KeyedStream)中,也就 keyBy 之后才可以使用。

Flink(十一)【状态管理】,第4张

        按键分区状态应用非常广泛。之前讲到的聚合算子必须在 keyBy 之后才能使用,就是因为聚合的结果是以 Keyed State 的形式保存的。另外,也可以通过富函数类(Rich Function)来自定义 Keyed State,所以只要提供了富函数类接口的算子,也都可以使用 Keyed State。所以即使是 map、filter 这样无状态的基本转换算子,我们也可以通过富函数类给它们 “追加” Keyed State,或者实现 CheckpointedFunction 接口来定义 Operator State;从这个角度讲,Flink 中所有的算子都可以是有状态的,所以说 Flink 是“有状态的流处理”。

        无论是 Keyed State 还是 Operator State,它们都是在本地实例上维护的,也就是说每个并行子任务维护着自己的状态,算子的子任务之间状态不共享。

2、按键分区状态(Keyed State)

        在实际应用中,我们一般都需要将数据按照某个 key 进行分区,然后再进行计算处理;所以最为常见的状态类型就是 Keyed State。之前介绍到 keyBy 之后的聚合、窗口计算,算子所持有的状态,都是 Keyed State。

        另外,我们还可以通过富函数类(Rich Function)对转换算子进行扩展、实现自定义功能,比如 RichMapFunction、RichFilterFunction。在富函数中,我们可以调用.getRuntimeContext()获取当前的运行时上下文(RuntimeContext),进而获取到访问状态的句柄;这种富函数中自定义的状态也是 Keyed State。

        按键分区状态(Keyed State)顾名思义,是任务按照键(key)来访问和维护的状态。它的特点非常鲜明,就是以 key 为作用范围进行隔离。我们知道,在进行按键分区(keyBy)之后,具有相同键的所有数据,都会分配到同一个并行子任务中;所以如果当前任务定义了状态,Flink 就会在当前并行子任务实例中,为每个键值维护一个状态的实例。于是当前任务就会为分配来的所有数据,按照 key 维护和处理对应的状态。

因为一个并行子任务可能会处理多个 key 的数据,所以 Flink 需要对 Keyed State 进行一些特殊优化。在底层,Keyed State 类似于一个分布式的映射(map)数据结构,所有的状态会根据 key 保存成键值对(key-value)的形式。这样当一条数据到来时,任务就会自动将状态的访问范围限定为当前数据的 key,从 map 存储中读取出对应的状态值。所以具有相同 key 的所有数据都会到访问相同的状态,而不同 key 的状态之间是彼此隔离的。这种将状态绑定到 key 上的方式,相当于使得状态和流的逻辑分区一一对应了:不会有别的 key 的数据来访问当前状态;而当前状态对应 key 的数据也只会访问这一个状态,不会分发到其他分区去。这就保证了对状态的操作都是本地进行的,对数据流和状态的处理做到了分区一致性。

        另外,在应用的并行度改变时,状态也需要随之进行重组。不同 key 对应的 Keyed State可以进一步组成所谓的键组(key groups),每一组都对应着一个并行子任务。键组是 Flink 重新分配 Keyed State 的单元,键组的数量就等于定义的最大并行度。当算子并行度发生改变时,Keyed State 就会按照当前的并行度重新平均分配,保证运行时各个子任务的负载相同。需要注意,使用 Keyed State 必须基于 KeyedStream。没有进行 keyBy 分区的 DataStream,即使转换算子实现了对应的富函数类,也不能通过运行时上下文访问 Keyed State。

        需要注意:使用 Keyed State 必须基于 KeyedStream。

实际应用中,需要保存为状态的数据会有各种各样的类型,有时还需要复杂的集合类型,比如列表(List)和映射(Map)。对于这些常见的用法,Flink 的按键分区状态(Keyed State)提供了足够的支持。接下来我们就来了解一下 Keyed State 所支持的结构类型:

2.1、值状态(ValueState)

顾名思义,状态中只保存一个“值”(value)。ValueState本身是一个接口,源码中定义如下:

public interface ValueState extends State {
    T value() throws IOException;
    void update(T value) throws IOException;
}

这里的 T 是泛型,表示状态的数据内容可以是任何具体的数据类型。如果想要保存一个长整型值作为状态,那么类型就是 ValueState。

我们可以在代码中读写值状态,实现对于状态的访问和更新。

  • T value():获取当前状态的值;
  • update(T value):对状态进行更新,传入的参数 value 就是要覆写的状态值。

    在具体使用时,为了让运行时上下文清楚到底是哪个状态,我们还需要创建一个“状态描述器”(StateDescriptor)来提供状态的基本信息。例如源码中,ValueState 的状态描述器构造方法如下:

    public ValueStateDescriptor(String name, Class typeClass) {
         super(name, typeClass, null);
    }
    

    这里需要传入状态的名称和类型——这跟我们声明一个变量时做的事情完全一样。有了这个描述器,运行时环境就可以获取到状态的控制句柄(handler)了。

    接下来演示一个案例:对连续两个水位值超过10的传感器输出报警信息

    /**
     * 检测每种传感器的水为值,如果连续两个水位差值超过10就输出结果
     */
    public class KeyedValueState {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            SingleOutputStreamOperator sensorDS = env
                    .socketTextStream("localhost", 9999)
                    .map(new WaterSensorFunction())
                    // todo 指定 watermark 策略,我们直接使用实现好的
                    .assignTimestampsAndWatermarks(WatermarkStrategy
                            // 指定watermark的生成: 泛型方法,需要指定数据类型,乱序的watermark 需要设置等待时间
                            .forBoundedOutOfOrderness(Duration.ofSeconds(3))   // 等待3s
                            // 指定如何从数据中提取事件时间
                            .withTimestampAssigner((WaterSensor sensor, long recordTimestamp) -> {
                                return sensor.getTs() * 1000L; // 返回的时间戳单位是 ms
                            }));
            sensorDS.keyBy(WaterSensor::getId)
                    // process方法的参数类型: KIO
                    .process(new KeyedProcessFunction() {
                        // todo 1.定义状态
                        ValueState lastVcState;    // 初始化必须在生命周期中定义,因为初始化需要运行时环境,这里环境还没启动会初始化失败
                        @Override
                        public void open(Configuration parameters) throws Exception {
                            super.open(parameters);
                            // todo 2. 在open方法初始化状态
                            // 状态描述器的两个参数:1.起个名字(不重复就行),2.存储的类型
                            lastVcState = getRuntimeContext().getState(new ValueStateDescriptor("lastVcState", Types.INT));
                        }
                        @Override
                        public void processElement(WaterSensor value, Context ctx, Collector out) throws Exception {
                            // 要和上一条水位值进行比较
                            // todo 1. 取出上一条数据的水位值
                            // Integer的初始值为null 这里遇到第一条水位时需要注意判断null
                            int last_value = lastVcState.value()==null?0:lastVcState.value();
                            // todo 2. 判断两条水位是否都超过 10
                            if (Math.abs(value.getVc()-last_value)>10)
                                System.out.println("传感器"+value.getId()+"当前水位值="+value.getVc()+",与上一条水位值="+last_value+"相差超过10!!!");
                            // todo 3. 更新上一条水位值
                            lastVcState.update(value.getVc());
                        }
                    }).print();
            env.execute();
        }
    }
    

    代码解析:

    • 我们说过,如果没有合适的函数能解决我们的需求,那就直接用 process 就行了。
    • ValueState 应该在 open 方法中去初始化,因为直接在程序执行前去尝试调用 getRuntimeContext 获取执行环境是获取不到的。
    • 要用 ValueState 而不是 int 也不是 hashMap
      • 用 int 就不能区分不同的传感器了
      • 用 hashMap 的效率要比 ValueState 低而且存在一些别的问题
    • 注意初始值 Integer 的初始值为 null 

      2.2、列表状态(ListState)

      将需要保存的数据,以列表(List)的形式组织起来。在 ListState接口中同样有一个类型参数 T,表示列表中数据的类型。ListState 也提供了一系列的方法来操作状态,使用方式与一般的 List 非常相似。

      • Iterable get():获取当前的列表状态,返回的是一个可迭代类型 Iterable;
      • update(List values):传入一个列表 values,直接对状态进行覆盖;
      • add(T value):在状态列表中添加一个元素 value;
      • addAll(List values):向列表中添加多个元素,以列表 values 形式传入。

        类似地,ListState 的状态描述器就叫作 ListStateDescriptor,用法跟 ValueStateDescriptor完全一致。

        同样我们演示一个案例:针对每种传感器输出最高的3个水位值:

         

        /**
         * 获取每个水位器的 top3 的水位值
         */
        public class KeyedListState {
            public static void main(String[] args) throws Exception {
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(1);
                SingleOutputStreamOperator sensorDS = env
                        .socketTextStream("localhost", 9999)
                        .map(new WaterSensorFunction())
                        // todo 指定 watermark 策略,我们直接使用实现好的
                        .assignTimestampsAndWatermarks(WatermarkStrategy
                                // 指定watermark的生成: 泛型方法,需要指定数据类型,乱序的watermark 需要设置等待时间
                                .forBoundedOutOfOrderness(Duration.ofSeconds(3))   // 等待3s
                                // 指定如何从数据中提取事件时间
                                .withTimestampAssigner((WaterSensor sensor, long recordTimestamp) -> {
                                    return sensor.getTs() * 1000L; // 返回的时间戳单位是 ms
                                }));
                sensorDS.keyBy(WaterSensor::getId)
                        // process方法的参数类型: KIO
                        .process(new KeyedProcessFunction() {
                            // todo 1.定义状态
                            ListState maxState;
                            @Override
                            public void open(Configuration parameters) throws Exception {
                                super.open(parameters);
                                // todo 2. 在open方法初始化状态
                                // 状态描述器的两个参数:1.起个名字(不重复就行),2.存储的类型
                                maxState = getRuntimeContext().getListState(new ListStateDescriptor("maxState",Types.INT));
                            }
                            @Override
                            public void processElement(WaterSensor value, Context ctx, Collector out) throws Exception {
                                // todo 1. 添加水位值
                                // Integer的初始值为null 这里遇到第一条水位时需要注意判断null
                                maxState.add(value.getVc());
                                // todo 2. 排序
                                List list = new ArrayList<>();
                                for (int vc : maxState.get()) {
                                    list.add(vc);
                                }
                                list.sort((o1, o2) -> o2 - o1);
                                if (list.size()>3)  // 一超过3立即清理,防止数据量大的排序开销
                                    list.remove(3);
                                // todo 3. 输出top3
                                out.collect("传感器="+value.getId()+"最大的3个水为值为"+list.toString());
                                
                            }
                        }).print();
                env.execute();
            }
        }
        

        代码解析:

        • 这里同样不用普通的 List,因为它不能按 key 分组
        • 这里用了一个简单的优化,if(list.size()>3) list.remove(3) 使得list永远只有3个值,减少了大数据场景下每次的遍历开销
        • list.sort((o2,o1)->o2-o1) lambda基础知识

          2.3、Map 状态(MapState)

          把一些键值对(key-value)作为状态整体保存起来,可以认为就是一组 key-value 映射的列表。对应的 MapState接口中,就会有 UK、UV 两个泛型,分别表示保存的 key和 value 的类型。同样,MapState 提供了操作映射状态的方法,与 Map 的使用非常类似。

          • UV get(UK key):传入一个 key 作为参数,查询对应的 value 值;
          • put(UK key, UV value):传入一个键值对,更新 key 对应的 value 值;
          • putAll(Map map):将传入的映射 map 中所有的键值对,全部添加到映射状态中;
          • remove(UK key):将指定 key 对应的键值对删除;
          • boolean contains(UK key):判断是否存在指定的 key,返回一个 boolean 值。另外,MapState 也提供了获取整个映射相关信息的方法:
          • Iterable> entries():获取映射状态中所有的键值对;
          • Iterable keys():获取映射状态中所有的键(key),返回一个可迭代 Iterable 类型;
          • Iterable values():获取映射状态中所有的值(value),返回一个可迭代 Iterable类型;
          • boolean isEmpty():判断映射是否为空,返回一个 boolean 值。

            同样通过一个案例来了解-统计每种传感器每种水位出现的次数:

            /**
             * 输出每种传感器每种水位值出现的次数
             */
            public class KeyedMapState {
                public static void main(String[] args) throws Exception {
                    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                    env.setParallelism(1);
                    SingleOutputStreamOperator sensorDS = env
                            .socketTextStream("localhost", 9999)
                            .map(new WaterSensorFunction())
                            // todo 指定 watermark 策略,我们直接使用实现好的
                            .assignTimestampsAndWatermarks(WatermarkStrategy
                                    // 指定watermark的生成: 泛型方法,需要指定数据类型,乱序的watermark 需要设置等待时间
                                    .forBoundedOutOfOrderness(Duration.ofSeconds(3))   // 等待3s
                                    // 指定如何从数据中提取事件时间
                                    .withTimestampAssigner((WaterSensor sensor, long recordTimestamp) -> {
                                        return sensor.getTs() * 1000L; // 返回的时间戳单位是 ms
                                    }));
                    sensorDS.keyBy(WaterSensor::getId)
                            // process方法的参数类型: KIO
                            .process(new KeyedProcessFunction() {
                                // todo 1.定义状态
                                MapState countState;
                                @Override
                                public void open(Configuration parameters) throws Exception {
                                    super.open(parameters);
                                    // todo 2. 在open方法初始化状态
                                    // 状态描述器的两个参数:1.起个名字(不重复就行),2.存储的类型
                                    countState = getRuntimeContext().getMapState(new MapStateDescriptor("countState",Types.INT,Types.INT));
                                }
                                @Override
                                public void processElement(WaterSensor value, Context ctx, Collector out) throws Exception {
                                    // todo 1. 添加水位值
                                    if (countState.contains(value.getVc())){
                                        countState.put(value.getVc(),countState.get(value.getVc())+1);
                                    }else {
                                        countState.put(value.getVc(),1);
                                    }
                                    // todo 2. 输出每种水位出现的次数
                                    for (Map.Entry entry : countState.entries()) {
                                        out.collect("传感器" + value.getId() + " 的水位值 " + entry.getKey() + " 出现了 " + entry.getValue() + "次");
                                    }
                                }
                            }).print();
                    env.execute();
                }
            }
            

            用法和 Map 差不多,就是没有 getOrDefault 方法。

            2.4、规约状态(ReducingState)

                    类似于值状态(Value),不过需要对添加进来的所有数据进行归约,将归约聚合之后的值作为状态保存下来。ReducintState这个接口调用的方法类似于 ListState,只不过它保存的只是一个聚合值,所以调用.add()方法时,不是在状态列表里添加元素,而是直接把新数据和之前的状态进行归约,并用得到的结果更新状态。

                    归约逻辑的定义,是在归约状态描述器(ReducingStateDescriptor)中,通过传入一个归约函数(ReduceFunction)来实现的。这里的归约函数,就是我们之前介绍 reduce 聚合算子时讲到的 ReduceFunction,所以状态类型跟输入的数据类型是一样的。

            public ReducingStateDescriptor(
                 String name, ReduceFunction reduceFunction, Class typeClass) 
            {...}
            

            这里的描述器有三个参数,其中第二个参数就是定义了归约聚合逻辑的 ReduceFunction,另外两个参数则是状态的名称和类型。

            案例-计算每种传感器的水位和

            /**
             * 检测每种传感器的水为值,如果连续两个水位差值超过10就输出结果
             */
            public class KeyedReducingState {
                public static void main(String[] args) throws Exception {
                    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                    env.setParallelism(1);
                    SingleOutputStreamOperator sensorDS = env
                            .socketTextStream("localhost", 9999)
                            .map(new WaterSensorFunction())
                            // todo 指定 watermark 策略,我们直接使用实现好的
                            .assignTimestampsAndWatermarks(WatermarkStrategy
                                    // 指定watermark的生成: 泛型方法,需要指定数据类型,乱序的watermark 需要设置等待时间
                                    .forBoundedOutOfOrderness(Duration.ofSeconds(3))   // 等待3s
                                    // 指定如何从数据中提取事件时间
                                    .withTimestampAssigner((WaterSensor sensor, long recordTimestamp) -> {
                                        return sensor.getTs() * 1000L; // 返回的时间戳单位是 ms
                                    }));
                    sensorDS.keyBy(WaterSensor::getId)
                            // process方法的参数类型: KIO
                            .process(new KeyedProcessFunction() {
                                // todo 1.定义状态
                                ReducingState sumState;
                                @Override
                                public void open(Configuration parameters) throws Exception {
                                    super.open(parameters);
                                    // todo 2. 在open方法初始化状态
                                    // 状态描述器的两个参数:1.起个名字(不重复就行),2.存储的类型
                                    sumState = getRuntimeContext().getReducingState(new ReducingStateDescriptor("reduceState", new ReduceFunction() {
                                        @Override
                                        public Integer reduce(Integer value1, Integer value2) throws Exception {
                                            return value1+value2;
                                        }
                                    }, Types.INT));
                                }
                                @Override
                                public void processElement(WaterSensor value, Context ctx, Collector out) throws Exception {
                                    // todo 1. 添加水位值
                                    sumState.add(value.getVc());
                                    // todo 2. 输出每种水位出现的次数
                                    out.collect("传感器"+value.getId()+"的水位和为 "+sumState.get());
                                }
                            }).print();
                    env.execute();
                }
            }
            

            2.5、聚合状态(AggregationState)

                    与归约状态非常类似,聚合状态也是一个值,用来保存添加进来的所有数据的聚合结果。与 ReducingState 不同的是,它的聚合逻辑是由在描述器中传入一个更加一般化的聚合函数(AggregateFunction)来定义的;这也就是之前我们讲过的 AggregateFunction,里面通过一个累加器(Accumulator)来表示状态,所以聚合的状态类型可以和输入数据的类型不同,使用更加灵活。

                    同样地,AggregatingState 接口调用方法也与 ReducingState 相同,调用.add()方法添加元素时,会直接使用指定的 AggregateFunction 进行聚合并更新状态。

            案例-统计每个传感器的水位平均值

            /**
             * 统计每种传感器的平均水位值
             */
            public class KeyedAggreatingState {
                public static void main(String[] args) throws Exception {
                    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                    env.setParallelism(1);
                    SingleOutputStreamOperator sensorDS = env
                            .socketTextStream("localhost", 9999)
                            .map(new WaterSensorFunction())
                            // todo 指定 watermark 策略,我们直接使用实现好的
                            .assignTimestampsAndWatermarks(WatermarkStrategy
                                    // 指定watermark的生成: 泛型方法,需要指定数据类型,乱序的watermark 需要设置等待时间
                                    .forBoundedOutOfOrderness(Duration.ofSeconds(3))   // 等待3s
                                    // 指定如何从数据中提取事件时间
                                    .withTimestampAssigner((WaterSensor sensor, long recordTimestamp) -> {
                                        return sensor.getTs() * 1000L; // 返回的时间戳单位是 ms
                                    }));
                    sensorDS.keyBy(WaterSensor::getId)
                            // process方法的参数类型: KIO
                            .process(new KeyedProcessFunction() {
                                // todo 1.定义状态
                                AggregatingState avgState;
                                @Override
                                public void open(Configuration parameters) throws Exception {
                                    super.open(parameters);
                                    // todo 2. 在open方法初始化状态
                                    // 状态描述器的两个参数:1.起个名字(不重复就行),2.存储的类型
                                    avgState = getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor, Double>("avgState", new AggregateFunction, Double>() {
                                        @Override
                                        public Tuple2 createAccumulator() {
                                            return Tuple2.of(0,0);
                                        }
                                        @Override
                                        public Tuple2 add(Integer value, Tuple2 accumulator) {
                                            return Tuple2.of(accumulator.f0+value,accumulator.f1+1);
                                        }
                                        @Override
                                        public Double getResult(Tuple2 accumulator) {
                                            return accumulator.f0*1D/accumulator.f1;    // 两个相除要变成double要在分子*1D
                                        }
                                        @Override
                                        public Tuple2 merge(Tuple2 a, Tuple2 b) {
                                            // 只有在会话窗口才需要写 merge 方法
                                            //return Tuple2.of(a.f0+b.f0,a.f1+b.f1);
                                            return null;
                                        }},Types.TUPLE(Types.INT,Types.INT)));
                                }
                                @Override
                                public void processElement(WaterSensor value, Context ctx, Collector out) throws Exception {
                                    // todo 1. 添加水位值
                                    avgState.add(value.getVc());
                                    // todo 2. 输出每种水位出现的次数
                                    out.collect("传感器"+value.getId()+"的平均水位为 "+avgState.get());
                                }
                            }).print();
                    env.execute();
                }
            }
            

            代码解析:

            • 这里我们用了累加器,我们把每个水位值转为(水为值,出现次数)的形式,最后统一对统一水为值进行平均值计算
            • merge 方法只有在会话窗口中才需要去自定义
            • int 除以 int 需要在分子上 *1D 才能使结果为 double

              2.6、状态生存时间(TTL)

                      在实际应用中,很多状态会随着时间的推移逐渐增长,如果不加以限制,最终就会导致存储空间的耗尽。一个优化的思路是直接在代码中调用.clear()方法去清除状态,但是有时候我们的逻辑要求不能直接清除。这时就需要配置一个状态的“生存时间”(time-to-live,TTL),当状态在内存中存在的时间超出这个值时,就将它清除。

                      具体实现上,如果用一个进程不停地扫描所有状态看是否过期,显然会占用大量资源做无用功。状态的失效其实不需要立即删除,所以我们可以给状态附加一个属性,也就是状态的“失效时间”。状态创建的时候,设置 失效时间 = 当前时间 + TTL;之后如果有对状态的访问和修改,我们可以再对失效时间进行更新;当设置的清除条件被触发时(比如,状态被访问的时候,或者每隔一段时间扫描一次失效状态),就可以判断状态是否失效、从而进行清除了。

                      配置状态的 TTL 时,需要创建一个 StateTtlConfig 配置对象,然后调用状态描述器的 .enableTimeToLive() 方法启动 TTL 功能。

              StateTtlConfig ttlConfig = StateTtlConfig
                   .newBuilder(Time.seconds(10))
                   .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                   .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                   .build();
              ValueStateDescriptor stateDescriptor = new ValueStateDescriptor<>("mystate", String.class);
              stateDescriptor.enableTimeToLive(ttlConfig);
              

              这里用到了几个配置项:

              • .newBuilder()。状态 TTL 配置的构造器方法,必须调用,返回一个 Builder 之后再调用.build()方法就可以得到 StateTtlConfig 了。方法需要传入一个 Time 作为参数,这就是设定的状态生存时间。
              • .setUpdateType()。设置更新类型。更新类型指定了什么时候更新状态失效时间,这里的 OnCreateAndWrite表示只有创建状态和更改状态(写操作)时更新失效时间。另一种类型 OnReadAndWrite 则表示无论读写操作都会更新失效时间,也就是只要对状态进行了访问,就表明它是活跃的,从而延长生存时间。这个配置默认为 OnCreateAndWrite。
              • .setStateVisibility()。设置状态的可见性。所谓的“状态可见性”,是指因为清除操作并不是实时的,所以当状态过期之后还有可能基于存在,这时如果对它进行访问,能否正常读取到就是一个问题了。这里默认设置的是 NeverReturnExpired ,表示从不返回过期值,也就是只要过期就认为它已经被清除了,应用不能继续读取;这在处理会话或者隐私数据时比较重要。对应的另一种配置是 ReturnExpireDefNotCleanedUp,就是如果过期状态还存在,也返回它的值。
              • 除此之外,TTL 配置还可以设置在保存检查点(checkpoint)时触发清除操作,或者配置增量的清理(incremental cleanup),还可以针对 RocksDB 状态后端使用压缩过滤器(compaction filter)进行后台清理。

                这里需要注意,目前的 TTL 设置只支持处理时间。另外,所有集合类型的状态(例如ListState、MapState)在设置 TTL 时,都是针对每一项(per-entry)元素的。也就是说,一个列表状态中的每一个元素,都会以自己的失效时间来进行清理,而不是整个列表一起清理。

                注意:我们知道,我们上面创建状态时都是在 open 方法中进行初始化的,而 TTL 的配置也是通过 状态来调用的 ,所以 TTL 的定义也应该在 open 方法中进行。

                案例-

                public class KeyedStateTTL {
                    public static void main(String[] args) throws Exception {
                        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                        env.setParallelism(1);
                        SingleOutputStreamOperator sensorDS = env
                                .socketTextStream("localhost", 9999)
                                .map(new WaterSensorFunction())
                                // todo 指定 watermark 策略,我们直接使用实现好的
                                .assignTimestampsAndWatermarks(WatermarkStrategy
                                        // 指定watermark的生成: 泛型方法,需要指定数据类型,乱序的watermark 需要设置等待时间
                                        .forBoundedOutOfOrderness(Duration.ofSeconds(3))   // 等待3s
                                        // 指定如何从数据中提取事件时间
                                        .withTimestampAssigner((WaterSensor sensor, long recordTimestamp) -> {
                                            return sensor.getTs() * 1000L; // 返回的时间戳单位是 ms
                                        }));
                        sensorDS.keyBy(WaterSensor::getId)
                                // process方法的参数类型: KIO
                                .process(new KeyedProcessFunction() {
                                    ValueState lastVcState;    // 初始化必须在生命周期中定义,因为初始化需要运行时环境,这里环境还没启动会初始化失败
                                    @Override
                                    public void open(Configuration parameters) throws Exception {
                                        super.open(parameters);
                                        // todo 1. 创建 StateTtlConfig
                                        StateTtlConfig stateTtlConfig = StateTtlConfig
                                                .newBuilder(Time.seconds(5))    // 过期时间
                                                // 设置只有在创建或写的时候才会刷新失效时间(往后推5s)
                                                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                //                                .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
                                                .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                                                .build();
                                        // todo 2. 状态描述器启用 TTL
                                        ValueStateDescriptor stateDescriptor = new ValueStateDescriptor<>("lastVcState", Types.INT);
                                        stateDescriptor.enableTimeToLive(stateTtlConfig);
                                        this.lastVcState = getRuntimeContext().getState(stateDescriptor);
                                    }
                                    @Override
                                    public void processElement(WaterSensor value, Context ctx, Collector out) throws Exception {
                                        // 获取状态值-读
                                        Integer last_vc = lastVcState.value();
                                        out.collect("key="+value.getId()+",状态值= "+last_vc);
                                        // 更新状态值-写
                                        lastVcState.update(value.getVc());
                                    }
                                }).print();
                        env.execute();
                    }
                }
                

                 运行效果:

                s1,1,1        =>    null
                s1,1,2        =>    1
                s1,1,1        =>    2
                // 等待5s后
                s1,1,1        =>    null

                3、算子状态(Operator State)

                        算子状态(Operator State)就是一个算子并行实例上定义的状态,作用范围被限定为当前算子任务。算子状态跟数据的 key 无关,所以不同 key 的数据只要被分发到同一个并行子任务,就会访问到同一个 Operator State。Flink(十一)【状态管理】,第3张

                        算子状态的实际应用场景不如 Keyed State 多,一般用在 Source 或 Sink 等与外部系统连接的算子上(所以一般都不用我们去写,因为 Flink 已经帮我们把各种连接器写好了),或者完全没有 key 定义的场景。比如 Flink 的 Kafka 连接器中,就用到了算子状态。在我们给 Source 算子设置并行度后,Kafka 消费者的每一个并行实例,都会为对应的主题(topic)分区维护一个偏移量, 作为算子状态保存起来。这在保证 Flink 应用“精确一次”(exactly-once)状态一致性时非常有用。

                        当算子的并行度发生变化时,算子状态也支持在并行的算子任务实例之间做重组分配。根据状态的类型不同,重组分配的方案也会不同。

                        算子状态也支持不同的结构类型,主要有三种:ListState、UnionListState 和 BroadcastState。可以看到,算子状态中并没有什么 Map、Value啊,这是因为算子状态是我们一个子任务里大家共享一起使用的。

                3.1、列表状态(ListState)

                        与 Keyed State 中的 ListState 一样,将状态表示为一组数据的列表。

                        与 Keyed State 中的列表状态的区别是:在算子状态的上下文中,不会按键(key)分别处理状态,所以每一个并行子任务上只会保留一个“列表”(list),也就是当前并行子任务上所有状态项的集合。列表中的状态项就是可以重新分配的最细粒度,彼此之间完全独立。

                        当算子并行度进行缩放调整时,算子的列表状态中的所有元素项会被统一收集起来,相当于把多个分区的列表合并成了一个“大列表”,然后再均匀地分配给所有并行任务。这种“均匀分配”的具体方法就是“轮询”(round-robin),与之前介绍的 rebanlance 数据传输方式类似,是通过逐一“发牌”的方式将状态项平均分配的。这种方式也叫作“平均分割重组”(even-split redistribution)。

                        算子状态中不会存在“键组”(key group)这样的结构,所以为了方便重组分配,就把它直接定义成了“列表”(list)。这也就解释了,为什么算子状态中没有最简单的值状态(ValueState)。

                案例-在 map 算子中计算数据的个数:

                public class OperatorListState {
                    public static void main(String[] args) throws Exception {
                        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                        env.setParallelism(2);
                        env.socketTextStream("localhost",9999)
                                .map(new MyCountMapFunction())
                                .print();
                        env.execute();
                    }
                    // TODO 1.实现 CheckpointedFunction 接口
                    public static class MyCountMapFunction implements MapFunction, CheckpointedFunction {
                        private Long count = 0L;
                        private ListState countState;
                        @Override
                        public Long map(String value) throws Exception {
                            return ++count; //++count 不可以是count++
                        }
                        /**
                         * TODO 2. 我们的本地变量 count 要持久化到算子状态,这里需要对算子状态做快照
                         * @param context
                         * @throws Exception
                         */
                        @Override
                        public void snapshotState(FunctionSnapshotContext context) throws Exception {
                            // 2.1 清空算子状态
                            countState.clear();
                            // 2.2 将本地变量添加到算子状态中
                            countState.add(count);
                        }
                        /**
                         * TODO 3. 初始化本地变量,当我们的任务失败要恢复状态时,flink的checkpoint机制会从状态中把数据添加到本地变量,每个子任务调用一次
                         * @param context
                         * @throws Exception
                         */
                        @Override
                        public void initializeState(FunctionInitializationContext context) throws Exception {
                            // 3.1 从上下文初始化算子状态
                            countState = context.getOperatorStateStore()
                                    .getListState(new ListStateDescriptor("countState", Types.LONG));
                            // 3.2 从算子状态中把数据拷贝到本地变量
                            if (context.isRestored()) { // 如果初始化状态成功
                                for (Long c : countState.get()) {
                                    count += c;
                                }
                            }
                        }
                    }
                }

                这里我们设置并行度为 2 ,我们选择 Socket 作为 Source 算子时,Source 并行度只能为 1 ,但是这里我们的 Map 算子并行度为 2 ,运行结果:

                输入    输出
                a       1> 1
                b       2> 1
                c       1> 2
                d       2> 2
                e       1> 3
                f       2> 3
                

                可以看到,数据被均匀分到两个算子中去了。

                3.2、联合列表状态(UnionListState)

                        与 ListState 类似,联合列表状态也会将状态表示为一个列表。它与常规列表状态的区别在于,算子并行度进行缩放调整时对于状态的分配方式不同。UnionListState 的重点就在于“联合”(union)。在并行度调整时,常规列表状态是轮询分配状态项,而联合列表状态的算子则会直接广播状态的完整列表。这样,并行度缩放之后的并行子任务就获取到了联合后完整的“大列表”,可以自行选择要使用的状态项和要丢弃的状态项。这种分配也叫作“联合重组”(union redistribution)。如果列表中状态项数量太多,为资源和效率考虑一般不建议使用联合重组的方式。

                并行度 = 2
                算子1    算子2
                 1         2
                 3         4
                 5         6
                并行度 2->3
                //普通ListState:
                算子1    算子2    算子3
                 1        2        3
                 4        5        6
                //UnionListState:
                算子1    算子2    算子3
                 1        1        1        
                 2        2        2
                 3        3        3
                 4        4        4
                 5        5        5
                 6        6        6
                

                可以看到,当我们的分区进行重新调整时,ListState 会把数据先搜集在一起,再重新以轮询的方式进行数据的分配,而 UnionListState 同样会把数据先搜集在一起,再把全量数据以广播的形式分配给每个并行算子。 

                只需要修改上面的代码为:

                @Override
                        public void initializeState(FunctionInitializationContext context) throws Exception {
                            // 3.1 从上下文初始化算子状态
                            countState = context.getOperatorStateStore()
                                    .getUnionListState(new ListStateDescriptor("union-state", Types.LONG));
                            // 3.2 从算子状态中把数据拷贝到本地变量
                            if (context.isRestored()) { // 如果初始化状态成功
                                for (Long c : countState.get()) {
                                    count += c;
                                }
                            }
                        }

                3.3、广播状态(BroadcastState)

                        有时我们希望算子并行子任务都保持同一份“全局”状态,用来做统一的配置和规则设定。这时所有分区的所有数据都会访问到同一个状态,状态就像被“广播”到所有分区一样,这种特殊的算子状态,就叫作广播状态(BroadcastState)。

                        因为广播状态在每个并行子任务上的实例都一样,所以在并行度调整的时候就比较简单,只要复制一份到新的并行任务就可以实现扩展;而对于并行度缩小的情况,可以将多余的并行子任务连同状态直接砍掉——因为状态都是复制出来的,并不会丢失。

                        在底层,广播状态是以类似映射结构(map)的键值对(key-value)来保存的,必须基于一个“广播流”(BroadcastStream)来创建。

                案例-水位超过阈值发送警告,阈值可以动态修改

                public class OperatorBroadcastState {
                    public static void main(String[] args) throws Exception {
                        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                        env.setParallelism(2);
                        // 数据流
                        SingleOutputStreamOperator sensorDS = env
                                .socketTextStream("localhost", 9999)
                                .map(new WaterSensorFunction());
                        // 广播流 - 用来广播配置-动态修改水位报警阈值
                        DataStreamSource thresholdDS = env.socketTextStream("localhost", 8888);
                        // TODO 1. 将 配置流 广播
                        // 返回一个带有广播状态的广播流
                        MapStateDescriptor broadcastMapDescriptor = new MapStateDescriptor<>("broadcast-state", Types.STRING, Types.INT);
                        BroadcastStream configDS = thresholdDS.broadcast(broadcastMapDescriptor);
                        // TODO 2. 把 数据流 和 配置流 connect
                        BroadcastConnectedStream sensorCS = sensorDS.connect(configDS);
                        // TODO 3. 调用process
                        sensorCS.process(new BroadcastProcessFunction() {
                            // 处理数据流
                            @Override
                            public void processElement(WaterSensor value, ReadOnlyContext ctx, Collector out) throws Exception {
                                // TODO 5. 通过上下文获取广播状态,取出状态中的值
                                // 广播状态对数据流只能读
                                ReadOnlyBroadcastState broadcastState = ctx.getBroadcastState(broadcastMapDescriptor);
                                Integer threshold = broadcastState.get("threshold");
                                // 防止数据流已经来数据了,但广播流开始未指定阈值
                                threshold = threshold==null?0:threshold;
                                if (value.getVc()>threshold){
                                    out.collect("传感器 "+value.getId()+" 的水位超过指定的阈值: "+threshold+" !!!");
                                }
                            }
                            // 处理广播流
                            @Override
                            public void processBroadcastElement(String value, Context ctx, Collector out) throws Exception {
                                // TODO 4. 通过上下文获取广播状态, 往状态里面写数据
                                BroadcastState broadcastState = ctx.getBroadcastState(broadcastMapDescriptor);
                                broadcastState.put("threshold",Integer.valueOf(value));
                            }
                        }).print();
                        env.execute();
                    }
                }
                

                测试:

                netcat(9999端口) |  console
                -------------------------------------------------
                s1,1,1        1> 传感器 s1 的水位超过指定的阈值: 0 !!!
                5    // 8888端口修改阈值为5
                s1,1,1
                s1,1,4
                s1,1,5
                s1,1,6        1> 传感器 s1 的水位超过指定的阈值: 5 !!!

                4、状态后端(State Backends)

                        在 Flink 的状态管理机制中,很重要的一个功能就是对状态进行持久化(persistence)保存,这样就可以在发生故障后进行重启恢复。Flink 对状态进行持久化的方式,就是将当前所有分布式状态进行“快照”保存,写入一个“检查点”(checkpoint)或者保存点(savepoint)保存到外部存储系统中。具体的存储介质,一般是分布式文件系统(distributed file system)。

                        状态后端主要负责管理本地状态的存储方式和位置。

                4.1、状态后端的分类

                        状态后端是一个“开箱即用”的组件,可以在不改变应用程序逻辑的情况下独立配置。

                Flink 中提供了两类不同的状态后端,一种是“哈希表状态后端”(HashMapStateBackend),另一种是“内嵌 RocksDB 状态后端”(EmbeddedRocksDBStateBackend)。如果没有特别配置,系统默认的状态后端是 HashMapStateBackend。

                (1)哈希状态后端(HashMapStateBackend)

                        这种方式会把状态存放在内存里。具体实现上,哈希表状态后端在内部会直接把状态当作对象(objects),保存在 Taskmanager 的 JVM 堆(heap)上。普通的状态,以及窗口中收集的数据和触发器(triggers),都会以键值对(key-value)的形式存储起来,所以底层是一个哈希表(HashMap),这种状态后端也因此得名。

                        对于检查点的保存,一般是放在持久化的分布式文件系统(file system)中,也可以通过配置“检查点存储”(CheckpointStorage)来另外指定。HashMapStateBackend 是将本地状态全部放入内存的,这样可以获得最快的读写速度,使计算性能达到最佳;代价则是内存的占用。它适用于具有大状态、长窗口、大键值状态的作业,对所有高可用性设置也是有效的。

                (2)内嵌 RocksDB 状态后端(RocksDB)

                        RocksDB 是一种内嵌的 key-value 存储介质,可以把数据持久化到本地硬盘。配置EmbeddedRocksDBStateBackend 后,会将处理中的数据全部放入 RocksDB 数据库中,RocksDB默认存储在 TaskManager 的本地数据目录里。

                        与 HashMapStateBackend 直接在堆内存中存储对象不同,这种方式下状态主要是放在RocksDB 中的。数据被存储为序列化的字节数组(Byte Arrays),读写操作需要序列化/反序列化,因此状态的访问性能要差一些。另外,因为做了序列化,key 的比较也会按照字节进行,而不是直接调用.hashCode()和.equals()方法。

                        对于检查点,同样会写入到远程的持久化文件系统中。EmbeddedRocksDBStateBackend 始终执行的是异步快照,也就是不会因为保存检查点而阻塞数据的处理;而且它还提供了增量式保存检查点的机制,这在很多情况下可以大大提升保存效率。

                        由于它会把状态数据落盘,而且支持增量化的检查点,所以在状态非常大、窗口非常长、键/值状态很大的应用场景中是一个好选择,同样对所有高可用性设置有效。

                4.2、如何选择正确的状态后端

                        HashMap 和 RocksDB 两种状态后端最大的区别,就在于本地状态存放在哪里:前者是内存,后者是 RocksDB。在实际应用中,选择那种状态后端,主要是需要根据业务需求在处理性能和应用的扩展性上做一个选择。

                        HashMapStateBackend 是内存计算,读写速度非常快;但是,状态的大小会受到集群可用内存的限制,如果应用的状态随着时间不停地增长,就会耗尽内存资源。

                        而 RocksDB 是硬盘存储,所以可以根据可用的磁盘空间进行扩展,而且是唯一支持增量检查点的状态后端,所以它非常适合于超级海量状态的存储。不过由于每个状态的读写都需要做序列化/反序列化,而且可能需要直接从磁盘读取数据,这就会导致性能的降低,平均读写性能要比 HashMapStateBackend 慢一个数量级。

                        我们可以发现,实际应用就是权衡利弊后的取舍。最理想的当然是处理速度快且内存不受限制可以处理海量状态,那就需要非常大的内存资源了,这会导致成本超出项目预算。比起花更多的钱,稍慢的处理速度或者稍小的处理规模,老板可能更容易接受一点。

                4.3、状态后端的配置

                        在不做配置的时候,应用程序使用的默认状态后端是由集群配置文件 flink-conf.yaml 中指定的,配置的键名称为 state.backend。这个默认配置对集群上运行的所有作业都有效,我们可以通过更改配置值来改变默认的状态后端。另外,我们还可以在代码中为当前作业单独配置状态后端,这个配置会覆盖掉集群配置文件的默认值。

                (1)修改默认的状态后端配置
                //默认状态后端
                state.backend: hashmap
                // 或者修改为 rocksdb
                state.backend: rocksdb
                //存放检查点的文件路径
                state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
                
                (2)为每个作业(Per-job/Application模式)单独配置状态后端

                指定状态后端为 HashMapStateBackend:

                env.setStateBackend(new HashMapStateBackend());
                

                指定状态后端为 EmbeddedRocksDBStateBackend:

                env.setStateBackend(new EmbeddedRocksDBStateBackend());
                

                需要注意,如果想在 IDE 中使用 EmbeddedRocksDBStateBackend,需要为 Flink 项目添加依赖:

                
                 org.apache.flink
                 flink-statebackend-rocksdb_${scala.binary.version}
                 ${flink.version}
                
                

                而由于 Flink 发行版中(lib/flink-dist-1.17.0.jar)默认就包含了 RocksDB,所以我们打包项目的时候不需要打包这个依赖。

                (3)提交参数时指定
                flink run-application -t yarn-application
                -p    3
                -Dstate.backend.type=rocksdb
                -c 全类名
                jar包

                总结

                        到今天10点,终于是把专业课的期末考完了,不由得要吐槽一下这试卷实在劣质,上午考的 Spark 考得一堆角落里没有的边角料,Spark 正二八经的核心重点几乎是没考,比如RDD 血缘关系、运行架构 ... 反倒是考了一堆细枝末节(Scala 的 for 花样循环守卫 ...),无语无语。还有一堆事情让人焦虑 ...

                        最近确实是累,断更 7 天了,这不下午两点都没睡就又来学习了。宿舍实在是人间炼狱,来了自习室以为会好点,就这功夫,左边的哥们又开始抖腿了,前边坐着的情侣实在碍眼,来自习室的路上看到了一对对情侣,想想咱也读了次大学,混的连个对象都没有,这不能怪咱不争取,属实是这大环境不好,毕业都眼瞅着吃土了还哪有那心思呢。

                        眼下每一次偷懒和懈怠都是罪恶的,愈发觉得时间的珍贵,现在努力学习保持单身才是对自己、对自己未来老婆孩子的负责哇。

                 

网友评论

搜索
最新文章
热门文章
热门标签