一. windowAll 和window介绍
datastream 流中没有使用keyby需要使用windowAll函数,使用了keyby的需要使用window函数
Keyed Windows stream .keyBy(...) <- keyed versus non-keyed windows .window(...) <- required: "assigner" [.trigger(...)] <- optional: "trigger" (else default trigger) [.evictor(...)] <- optional: "evictor" (else no evictor) [.allowedLateness(...)] <- optional: "lateness" (else zero) [.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data) .reduce/aggregate/apply() <- required: "function" [.getSideOutput(...)] <- optional: "output tag" Non-Keyed Windows stream .windowAll(...) <- required: "assigner" [.trigger(...)] <- optional: "trigger" (else default trigger) [.evictor(...)] <- optional: "evictor" (else no evictor) [.allowedLateness(...)] <- optional: "lateness" (else zero) [.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data) .reduce/aggregate/apply() <- required: "function" [.getSideOutput(...)] <- optional: "output tag"
windowAll的函数: 并行度只能是1,性能不高
window的函数:并行度可以任意,性能高
二. 原flink服务存在问题
2.1 flink作业介绍
线上的flink作业的架构如下图所示:
1.先从rocketmq读取数据,通过windowAll类型的窗口进行10s的数据攒批;
2.攒批的数据经过聚合函数处理,该函数没有做任何操作,只是把数据传到下游,因为使用了windowAll所以此聚合函数并行度只能是1,考虑到该函数没有任何操作只是传递数据,所以不存在性能瓶颈,但是事实不是这样,后面会详细说明;
3.处理函数会调用远程的服务,如果符合要求的数据,就传到sink;
4.sink把数据存到doris数据库。
除了聚合函数外,所有函数都支持设置任意并行度
2.2 出现性能瓶颈
经过监控发现高峰期出现rocketmq消息积压
并且通过flinkui 发现处理函数存在被压,应该是远程服务慢导致,所以想通过增加并行度来提高处理速度
调整并行度:
除了聚合函数外,其他函数并行度都设置为2,通过savepoint恢复任务
不幸运的是报错了:
方法位于:org.apache.flink.runtime.checkpoint.StateAssignmentOperation#checkParallelismPreconditions(org.apache.flink.runtime.checkpoint.OperatorState,org.apache.flink.runtime.executiongraph.ExecutionJobVertex)
探究原因 看报错信息得知,通过savepoint恢复数据时,原算子的最大并行度是1,但是设置了2个并行度,所以恢复失败,聚合算子增加并行度,其他算子为什么最大并行度会是1呢?
跟flink算子链合并有关:
算子链合并介绍:没有keyby操作和并行度相同,算子就会合并在一起,目的是减少数据的网络传输和序列化
如上图所示:聚合函数,处理函数,sink由于并行度相同,所以算子合并了,savepoint时,会取三个函数中最大并行度中最小的一个作为恢复时的最大的并行度;所以最大并行度是1,导致设置了并行度为2就恢复不了了。
解决办法
1.不通过savepoint恢复:
启动时就把除了聚合函数外,其他算子并行度都设置为2,由于并行度不同,所以聚合函数无法和其他算子进行算子合并,就不会发生上面问题。
但是此方法无法通过savepoint启动,解决办法是rocketmq的source可以设置offset或者timestap指定从那个位点开始消费,mysqlsource可以也通过设置binlog的offset指定从那个位点开始消费,达到从savepoint恢复的效果
2.先关闭算子链合并操作,在增加并发度:先通过savepoint恢复时把算子链合并关闭,由于关闭了算子链合并,所以算子没有合并再一起,再次进行savepoint时,除了聚合函数,其他算子最大并行度就是128,后面就可以增加并行度,然后通过savepoint来恢复作业了。那调整了并行度可以再次合并算子链吗?答案是不可以的,报错信息如下:
对应代码:org.apache.flink.runtime.checkpoint.Checkpoints#loadAndValidateCheckpoint
就是说savepoint的最大并行度是128,但是新的程序最大并行度为1,因为经过了算子链合并,所以会报这个错。
此方法就会导致没办法使用算子链合并的优化,会导致序列化和网络传输增加,影响性能
三. 转换后存在问题
由于上面2个方法都存在一些问题,还有没有别的方法解决?
想法:把windowAll 改成Window ,这样聚合函数就得摆脱并行度为1的限制了
改完之后的图如下
source算子先经过keyby,在进行window操作,聚合函数是继承了ProcessWindowFunction的,这样聚合函数就可以设置任意并行度了。
这样可以通过savepoint来恢复启动吗?
很遗憾,启动后还是报错了,报错如下:
方法代码: org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation#readMetaData
意思是说之前key的序列化和现在key的序列化不兼容导致,所以也没有办法通过savepoint方式恢复,只能启动的设置通过设置source的偏移量或者的从那个时间戳开始消费,这样来达到类似从savepoint的方式恢复启动。
四. 总结
windowall窗口使用注意事项:
1.当并行度设置为1时,会进行算子链合并,如果增加并行度,通过savepoint恢复启动时就会受到windowall的函数影响导致不允许增加并行度,启动失败;
2.需要设置并行度时,在算子链合并的情况下不能通过savepoint恢复启动,只能通过指定从哪个位点开始消费这样方式来启动;或者关闭算子链优化,但是会减低性能;
3.windowall转window时,也不能通过savepoint启动,也只能指定从哪个位点开始消费来启动,这种方式的好处在于每个算子都可以设置任意并行度。
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章