【Flink-1.17-教程】-【四】Flink DataStream API(2)转换算子(Transformation)【基本转换算子、聚合算子】
- 1)基本转换算子(map / filter / flatMap)
- 1.1.映射(map)
- 1.2.过滤(filter)
- 1.3.扁平映射(flatMap)
- 2)聚合算子(Aggregation)
- 2.1.按键分区(keyBy)
- 2.2.简单聚合(sum / min / max / minBy / maxBy)
- 2.3.归约聚合(reduce)
数据源读入数据之后,我们就可以使用各种转换算子,将一个或多个 DataStream 转换为新的 DataStream。
1)基本转换算子(map / filter / flatMap)
1.1.映射(map)
map 是大家非常熟悉的大数据操作算子,主要用于将数据流中的数据进行转换,形成新的数据流。简单来说,就是一个“一 一映射”,消费一个元素就产出一个元素。
我们只需要基于 DataStream 调用 map() 方法就可以进行转换处理。方法需要传入的参数是接口 MapFunction 的实现;返回值类型还是 DataStream,不过泛型(流中的元素类型)可能改变。
下面的代码用不同的方式,实现了提取 WaterSensor 中的 id 字段的功能。
public class MapDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource
sensorDS = env.fromElements( new WaterSensor("s1", 1L, 1), new WaterSensor("s2", 2L, 2), new WaterSensor("s3", 3L, 3) ); // TODO map算子: 一进一出 // TODO 方式一: 匿名实现类 // SingleOutputStreamOperator map = sensorDS.map(new MapFunction () { // @Override // public String map(WaterSensor value) throws Exception { // return value.getId(); // } // }); // TODO 方式二: lambda表达式 // SingleOutputStreamOperator map = sensorDS.map(sensor -> sensor.getId()); // TODO 方式三: 定义一个类来实现MapFunction // SingleOutputStreamOperator map = sensorDS.map(new MyMapFunction()); SingleOutputStreamOperator map = sensorDS.map(new MapFunctionImpl()); map.print(); env.execute(); } public static class MyMapFunction implements MapFunction { @Override public String map(WaterSensor value) throws Exception { return value.getId(); } } } MapFunctionImpl :
public class MapFunctionImpl implements MapFunction
{ @Override public String map(WaterSensor value) throws Exception { return value.getId(); } } 上面代码中,MapFunction 实现类的泛型类型,与输入数据类型和输出数据的类型有关。在实现 MapFunction 接口的时候,需要指定两个泛型,分别是输入事件和输出事件的类型,还需要重写一个 map() 方法,定义从一个输入事件转换为另一个输出事件的具体逻辑。
1.2.过滤(filter)
filter 转换操作,顾名思义是对数据流执行一个过滤,通过一个布尔条件表达式设置过滤条件,对于每一个流内元素进行判断,若为 true 则元素正常输出,若为 false 则元素被过滤掉。
进行 filter 转换之后的新数据流的数据类型与原数据流是相同的。filter 转换需要传入的参数需要实现 FilterFunction 接口,而 FilterFunction 内要实现 filter() 方法,就相当于一个返回布尔类型的条件表达式。
案例需求:下面的代码会将数据流中传感器 id 为 sensor_1 的数据过滤出来。
public class FilterDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource
sensorDS = env.fromElements( new WaterSensor("s1", 1L, 1), new WaterSensor("s1", 11L, 11), new WaterSensor("s2", 2L, 2), new WaterSensor("s3", 3L, 3) ); // TODO filter: true保留,false过滤掉 // SingleOutputStreamOperator filter = sensorDS.filter(new FilterFunction () { // @Override // public boolean filter(WaterSensor value) throws Exception { // return "s1".equals(value.getId()); // } // }); SingleOutputStreamOperator filter = sensorDS.filter(new FilterFunctionImpl("s1")); filter.print(); env.execute(); } } FilterFunctionImpl:
public class FilterFunctionImpl implements FilterFunction
{ public String id; public FilterFunctionImpl(String id) { this.id = id; } @Override public boolean filter(WaterSensor value) throws Exception { return this.id.equals(value.getId()); } } 1.3.扁平映射(flatMap)
flatMap 操作又称为扁平映射,主要是将数据流中的整体(一般是集合类型)拆分成一个一个的个体使用。消费一个元素,可以产生 0 到多个元素。flatMap 可以认为是“扁平化”(flatten)和“映射”(map)两步操作的结合,也就是先按照某种规则对数据进行打散拆分,再对拆分后的元素做转换处理。
同 map 一样,flatMap 也可以使用 Lambda 表达式或者 FlatMapFunction 接口实现类的方式来进行传参,返回值类型取决于所传参数的具体逻辑,可以与原数据流相同,也可以不同。
案例需求:如果输入的数据是 sensor_1,只打印 vc;如果输入的数据是 sensor_2,既打印 ts 又打印 vc。
实现代码如下:
public class FlatmapDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource
sensorDS = env.fromElements( new WaterSensor("s1", 1L, 1), new WaterSensor("s1", 11L, 11), new WaterSensor("s2", 2L, 2), new WaterSensor("s3", 3L, 3) ); /** * TODO flatmap: 一进多出(包含0出) * 对于s1的数据,一进一出 * 对于s2的数据,一进2出 * 对于s3的数据,一进0出(类似于过滤的效果) * * map怎么控制一进一出: * =》 使用 return * * flatmap怎么控制的一进多出 * =》 通过 Collector来输出, 调用几次就输出几条 * * */ SingleOutputStreamOperator flatmap = sensorDS.flatMap(new FlatMapFunction () { @Override public void flatMap(WaterSensor value, Collector out) throws Exception { if ("s1".equals(value.getId())) { // 如果是 s1,输出 vc out.collect(value.getVc().toString()); } else if ("s2".equals(value.getId())) { // 如果是 s2,分别输出ts和vc out.collect(value.getTs().toString()); out.collect(value.getVc().toString()); } } }); flatmap.print(); env.execute(); } } 要点:
1、flatmap:一进多出(包含0出)一进一出、一进2出、一进0出(类似于过滤的效果)。
2、map 怎么控制一进一出:使用 return。
3、flatmap 怎么控制的一进多出:通过 Collector 来输出,调用几次就输出几条。
2)聚合算子(Aggregation)
计算的结果不仅依赖当前数据,还跟之前的数据有关,相当于要把所有数据聚在一起进行汇总合并——这就是所谓的“聚合”(Aggregation),类似于 MapReduce 中的 reduce 操作。
2.1.按键分区(keyBy)
对于 Flink 而言,DataStream 是没有直接进行聚合的 API 的。因为我们对海量数据做聚合肯定要进行分区并行处理,这样才能提高效率。所以在 Flink 中,要做聚合,需要先进行分区;这个操作就是通过 keyBy 来完成的。
keyBy 是聚合前必须要用到的一个算子。keyBy 通过指定键(key),可以将一条流从逻辑上划分成不同的分区(partitions)。这里所说的分区,其实就是并行处理的子任务。
基于不同的 key,流中的数据将被分配到不同的分区中去;这样一来,所有具有相同的 key 的数据,都将被发往同一个分区。
在内部,是通过计算 key 的哈希值(hash code),对分区数进行取模运算来实现的。所以这里 key 如果是 POJO 的话,必须要重写 hashCode() 方法。
keyBy() 方法需要传入一个参数,这个参数指定了一个或一组 key。有很多不同的方法来指定 key:比如对于 Tuple 数据类型,可以指定字段的位置或者多个位置的组合;对于 POJO 类型,可以指定字段的名称(String);另外,还可以传入 Lambda 表达式或者实现一个键选择器(KeySelector),用于说明从数据中提取 key 的逻辑。
我们可以以 id 作为 key 做一个分区操作,代码实现如下:
public class KeybyDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); DataStreamSource
sensorDS = env.fromElements( new WaterSensor("s1", 1L, 1), new WaterSensor("s1", 11L, 11), new WaterSensor("s2", 2L, 2), new WaterSensor("s3", 3L, 3) ); // 按照 id 分组 /** * TODO keyby: 按照id分组 * 要点: * 1、返回的是 一个 KeyedStream,键控流 * 2、keyby不是 转换算子, 只是对数据进行重分区, 不能设置并行度 * 3、分组 与 分区 的关系: * 1) keyby是对数据分组,保证 相同key的数据 在同一个分区(子任务) * 2) 分区: 一个子任务可以理解为一个分区,一个分区(子任务)中可以存在多个分组(key) */ KeyedStream sensorKS = sensorDS .keyBy(new KeySelector () { @Override public String getKey(WaterSensor value) throws Exception { return value.getId(); } }); sensorKS.print(); env.execute(); } } 需要注意的是,keyBy 得到的结果将不再是 DataStream,而是会将 DataStream 转换为 KeyedStream。KeyedStream 可以认为是“分区流”或者“键控流”,它是对 DataStream 按照 key 的一个逻辑分区,所以泛型有两个类型:除去当前流中的元素类型外,还需要指定 key 的类型。
KeyedStream 也继承自 DataStream,所以基于它的操作也都归属于 DataStream API。但它跟之前的转换操作得到的 SingleOutputStreamOperator 不同,只是一个流的分区操作,并不是一个转换算子。KeyedStream 是一个非常重要的数据结构,只有基于它才可以做后续的聚合操作(比如 sum,reduce)。
要点:
1、返回的是一个 KeyedStream,键控流,而不是 SingleOutputStreamOperator。
2、keyby 不是转换算子,只是对数据进行重分区,不能设置并行度。
3、分组与分区的关系:
(1)keyby 是对数据分组,保证相同 key 的数据在同一个分区(子任务)。
(2)分区: 一个子任务可以理解为一个分区,一个分区(子任务)中可以存在多个分组。
2.2.简单聚合(sum / min / max / minBy / maxBy)
有了按键分区的数据流 KeyedStream,我们就可以基于它进行聚合操作了。Flink 为我们内置实现了一些最基本、最简单的聚合 API,主要有以下几种:
-
sum():在输入流上,对指定的字段做叠加求和的操作。
-
min():在输入流上,对指定的字段求最小值。
-
max():在输入流上,对指定的字段求最大值。
-
minBy():与 min() 类似,在输入流上针对指定字段求最小值。不同的是,min() 只计算指定字段的最小值,其他字段会保留最初第一个数据的值;而 minBy() 则会返回包含字段最小值的整条数据。
-
maxBy():与 max() 类似,在输入流上针对指定字段求最大值。 两者区别与 min() / minBy() 完全一致。
简单聚合算子使用非常方便,语义也非常明确。这些聚合方法调用时,也需要传入参数;但并不像基本转换算子那样需要实现自定义函数,只要说明聚合指定的字段就可以了。指定字段的方式有两种:指定位置,和指定名称。
对于元组类型的数据,可以使用这两种方式来指定字段。需要注意的是,元组中字段的名称,是以 f0、f1、f2、…来命名的。
如果数据流的类型是 POJO 类,那么就只能通过字段名称来指定,不能通过位置来指定
了。
public class SimpleAggregateDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource
sensorDS = env.fromElements( new WaterSensor("s1", 1L, 1), new WaterSensor("s1", 11L, 11), new WaterSensor("s2", 2L, 2), new WaterSensor("s3", 3L, 3) ); KeyedStream sensorKS = sensorDS .keyBy(new KeySelector () { @Override public String getKey(WaterSensor value) throws Exception { return value.getId(); } }); /** * TODO 简单聚合算子 * 1、 keyby之后才能调用 * 2、 分组内的聚合:对同一个key的数据进行聚合 */ // 传位置索引的,适用于 Tuple类型,POJO不行 // SingleOutputStreamOperator result = sensorKS.sum(2); // SingleOutputStreamOperator result = sensorKS.sum("vc"); /** * max\maxby的区别: 同min * max:只会取比较字段的最大值,非比较字段保留第一次的值 * maxby:取比较字段的最大值,同时非比较字段 取 最大值这条数据的值 */ // SingleOutputStreamOperator result = sensorKS.max("vc"); // SingleOutputStreamOperator result = sensorKS.min("vc"); SingleOutputStreamOperator result = sensorKS.maxBy("vc"); // SingleOutputStreamOperator result = sensorKS.minby("vc"); result.print(); env.execute(); } } 简单聚合算子返回的,同样是一个 SingleOutputStreamOperator,也就是从 KeyedStream 又转换成了常规的 DataStream。所以可以这样理解:keyBy 和聚合是成对出现的,先分区、后聚合,得到的依然是一个 DataStream。而且经过简单聚合之后的数据流,元素的数据类型
保持不变。
一个聚合算子,会为每一个 key 保存一个聚合的值,在 Flink 中我们把它叫作“状态”(state)。所以每当有一个新的数据输入,算子就会更新保存的聚合结果,并发送一个带有更新后聚合值的事件到下游算子。对于无界流来说,这些状态是永远不会被清除的,所以我们使用聚合算子,应该只用在含有有限个 key 的数据流上。
要点:
1、keyby之后才能调用。
2、分组内的聚合:对同一个 key 的数据进行聚合。
3、传位置索引的,适用于 Tuple 类型,POJO 不行。
4、max / maxby的区别:同 min 一致。
(1)max:只会取比较字段的最大值,非比较字段保留第一次的值。
(2)maxby:取比较字段的最大值,同时非比较字段取最大值这条数据的值。
2.3.归约聚合(reduce)
reduce 可以对已有的数据进行归约处理,把每一个新输入的数据和当前已经归约出来的值,再做一个聚合计算。
reduce 操作也会将 KeyedStream 转换为 DataStream。它不会改变流的元素数据类型,所以输出类型和输入类型是一样的。
调用 KeyedStream 的 reduce 方法时,需要传入一个参数,实现 ReduceFunction 接口。接口在源码中的定义如下:
public interface ReduceFunction
extends Function, Serializable { T reduce(T value1, T value2) throws Exception; } ReduceFunction 接口里需要实现 **reduce()**方法,这个方法接收两个输入事件,经过转换处理之后输出一个相同类型的事件。在流处理的底层实现过程中,实际上是将中间“合并的结果”作为任务的一个状态保存起来的;之后每来一个新的数据,就和之前的聚合状态进一步做归约。
我们可以单独定义一个函数类实现 ReduceFunction 接口,也可以直接传入一个匿名类。当然,同样也可以通过传入 Lambda 表达式实现类似的功能。
为了方便后续使用,定义一个 WaterSensorMapFunction:
public class WaterSensorMapFunction implements MapFunction
{ @Override public WaterSensor map(String value) throws Exception { String[] datas = value.split(","); return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2])); } } reduce 示例:
public class ReduceDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource
sensorDS = env.fromElements( new WaterSensor("s1", 1L, 1), new WaterSensor("s1", 11L, 11), new WaterSensor("s1", 21L, 21), new WaterSensor("s2", 2L, 2), new WaterSensor("s3", 3L, 3) ); KeyedStream sensorKS = sensorDS .keyBy(new KeySelector () { @Override public String getKey(WaterSensor value) throws Exception { return value.getId(); } }); /** * TODO reduce: * 1、keyby之后调用 * 2、输入类型 = 输出类型,类型不能变 * 3、每个key的第一条数据来的时候,不会执行reduce方法,存起来,直接输出 * 4、reduce方法中的两个参数 * value1: 之前的计算结果,存状态 * value2: 现在来的数据 */ SingleOutputStreamOperator reduce = sensorKS.reduce(new ReduceFunction () { @Override public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception { System.out.println("value1=" + value1); System.out.println("value2=" + value2); return new WaterSensor(value1.id, value2.ts, value1.vc + value2.vc); } }); reduce.print(); env.execute(); } } 案例:使用 reduce 实现 max 和 maxBy 的功能。
public class ReduceDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env .socketTextStream("hadoop102", 7777) .map(new WaterSensorMapFunction()) .keyBy(WaterSensor::getId) .reduce(new ReduceFunction
() { @Override public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception { System.out.println("Demo7_Reduce.reduce"); int maxVc = Math.max(value1.getVc(), value2.getVc()); //实现 max(vc)的效果 取最大值,其他字段以当前组的第一个为主 //value1.setVc(maxVc); //实现 maxBy(vc)的效果 取当前最大值的所有字段 if (value1.getVc() > value2.getVc()) { value1.setVc(maxVc); return value1; } else { value2.setVc(maxVc); return value2; } } }) .print(); env.execute(); } } reduce 同简单聚合算子一样,也要针对每一个 key 保存状态。因为状态不会清空,所以我们需要将 reduce 算子作用在一个有限 key 的流上。
要点:
1、keyby 之后调用。
2、输入类型 = 输出类型,类型不能变。
3、每个 key 的第一条数据来的时候,不会执行 reduce 方法,存起来,直接输出。
4、reduce 方法中的两个参数:
- value1:每组第一条进来的数据,或者,之前的计算结果,存状态。
- value2:除了第一条以外,每条新进来的数据。
-
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章