目录
1.实现分析
2.代码实现
3.测试验证
4.源码地址
线上运行的CEP中肯定经常遇到规则变更的情况,如果每次变更时都将任务重启、重新发布是非常不优雅的。尤其在营销或者风控这种对实时性要求比较高的场景,如果规则窗口过长(一两个星期),状态过大,就会导致重启时间延长,期间就会造成一些想要处理的异常行为不能及时发现。
1.实现分析
- 外部加载:通常规则引擎会有专门的规则管理模块,提供用户去创建自己的规则,对于Flink任务来说需要到外部去加载规则
- 动态更新:需要提供定时去检测规则是否变更
- 历史状态清理:在模式匹配中是一系列NFAState 的不断变更,如果规则发生变更,需要清理历史状态
- API:需要对外提供易用的API
2.代码实现
首先实现一个用户API。
package cep.functions; import java.io.Serializable; import org.apache.flink.api.common.functions.Function; import cep.pattern.Pattern; /** * @author StephenYou * Created on 2023-07-23 * Description: 动态Pattern接口(用户调用API)不区分key */ public interface DynamicPatternFunction
extends Function, Serializable { /*** * 初始化 * @throws Exception */ public void init() throws Exception; /** * 注入新的pattern * @return */ public Pattern inject() throws Exception; /** * 一个扫描周期:ms * @return */ public long getPeriod() throws Exception; /** * 规则是否发生变更 * @return */ public boolean isChanged() throws Exception; } 希望上述API的调用方式如下。
//正常调用 CEP.pattern(dataStream,pattern); //动态Pattern CEP.injectionPattern(dataStream, new UserDynamicPatternFunction())
所以需要修改CEP-Lib源码
b.增加injectionPattern函数。
public class CEP { /*** * Dynamic injection pattern function * @param input * @param dynamicPatternFunction * @return * @param
*/ public static PatternStream injectionPattern throws Exception ( DataStream input, DynamicPatternFunction dynamicPatternFunction){ return new PatternStream<>(input, dynamicPatternFunction); } } 增加PatternStream构造函数,因为需要动态更新,所以有必要传进去整个函数。
public class PatternStream
{ PatternStream(final DataStream inputStream, DynamicPatternFunction dynamicPatternFunction) throws Exception { this(PatternStreamBuilder.forStreamAndPatternFunction(inputStream, dynamicPatternFunction)); } } 修改PatternStreamBuilder.build, 增加调用函数的过程。
final CepOperator
operator = null; if (patternFunction == null ) { operator = new CepOperator<>( inputSerializer, isProcessingTime, nfaFactory, comparator, pattern.getAfterMatchSkipStrategy(), processFunction, lateDataOutputTag); } else { operator = new CepOperator<>( inputSerializer, isProcessingTime, patternFunction, comparator, null, processFunction, lateDataOutputTag); } 增加对应的CepOperator构造函数。
public CepOperator( final TypeSerializer
inputSerializer, final boolean isProcessingTime, final DynamicPatternFunction patternFunction, @Nullable final EventComparator comparator, @Nullable final AfterMatchSkipStrategy afterMatchSkipStrategy, final PatternProcessFunction function, @Nullable final OutputTag lateDataOutputTag) { super(function); this.inputSerializer = Preconditions.checkNotNull(inputSerializer); this.patternFunction = patternFunction; this.isProcessingTime = isProcessingTime; this.comparator = comparator; this.lateDataOutputTag = lateDataOutputTag; if (afterMatchSkipStrategy == null) { this.afterMatchSkipStrategy = AfterMatchSkipStrategy.noSkip(); } else { this.afterMatchSkipStrategy = afterMatchSkipStrategy; } this.nfaFactory = null; } 加载Pattern,构造NFA
@Override public void open() throws Exception { super.open(); timerService = getInternalTimerService( "watermark-callbacks", VoidNamespaceSerializer.INSTANCE, this); //初始化 if (patternFunction != null) { patternFunction.init(); Pattern pattern = patternFunction.inject(); afterMatchSkipStrategy = pattern.getAfterMatchSkipStrategy(); boolean timeoutHandling = getUserFunction() instanceof TimedOutPartialMatchHandler; nfaFactory = NFACompiler.compileFactory(pattern, timeoutHandling); long period = patternFunction.getPeriod(); // 注册定时器检测规则是否变更 if (period > 0) { getProcessingTimeService().registerTimer(timerService.currentProcessingTime() + period, this::onProcessingTime); } } nfa = nfaFactory.createNFA(); nfa.open(cepRuntimeContext, new Configuration()); context = new ContextFunctionImpl(); collector = new TimestampedCollector<>(output); cepTimerService = new TimerServiceImpl(); // metrics this.numLateRecordsDropped = metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME); }
状态清理一共分为两块: 匹配状态数据清理、定时器清理;
进行状态清理:
@Override public void processElement(StreamRecord
element) throws Exception { if (patternFunction != null) { // 规则版本更新 if (needRefresh.value() < refreshVersion.get()) { //清除状态 computationStates.clear(); elementQueueState.clear(); partialMatches.releaseCacheStatisticsTimer(); //清除定时器 Iterable registerTime = registerTimeState.get(); if (registerTime != null) { Iterator iterator = registerTime.iterator(); while (iterator.hasNext()) { Long l = iterator.next(); //删除定时器 timerService.deleteEventTimeTimer(VoidNamespace.INSTANCE, l); timerService.deleteProcessingTimeTimer(VoidNamespace.INSTANCE, l); //状态清理 iterator.remove(); } } //更新当前的版本 needRefresh.update(refreshVersion.get()); } } } 上面是在处理每条数据时,清除状态和版本。接下来要进行状态和版本的初始化。
@Override public void initializeState(StateInitializationContext context) throws Exception { super.initializeState(context); //初始化状态 if (patternFunction != null) { /** * 两个标识位状态 */ refreshFlagState = context.getOperatorStateStore() .getUnionListState(new ListStateDescriptor
("refreshFlagState", Integer.class)); if (context.isRestored()) { if (refreshFlagState.get().iterator().hasNext()) { refreshVersion = new AtomicInteger(refreshFlagState.get().iterator().next()); } } else { refreshVersion = new AtomicInteger(0); } needRefresh = context.getKeyedStateStore() .getState(new ValueStateDescriptor ("needRefreshState", Integer.class, 0)); } } 3.测试验证
设置每10s变更一次Pattern。
PatternStream patternStream = CEP.injectionPattern(source, new TestDynamicPatternFunction()); patternStream.select(new PatternSelectFunction
, Map>() { @Override public Map select(Map map) throws Exception { map.put("processingTime", System.currentTimeMillis()); return map; } }).print(); env.execute("SyCep"); } public static class TestDynamicPatternFunction implements DynamicPatternFunction > { public TestDynamicPatternFunction() { this.flag = true; } boolean flag; int time = 0; @Override public void init() throws Exception { flag = true; } @Override public Pattern , Tuple3 > inject() throws Exception { // 2种pattern if (flag) { Pattern pattern = Pattern . >begin("start") .where(new IterativeCondition >() { @Override public boolean filter(Tuple3 value, Context > ctx) throws Exception { return value.f2.equals("success"); } }) .times(1) .followedBy("middle") .where(new IterativeCondition >() { @Override public boolean filter(Tuple3 value, Context > ctx) throws Exception { return value.f2.equals("fail"); } }) .times(1) .next("end"); return pattern; } else { Pattern pattern = Pattern . >begin("start2") .where(new IterativeCondition >() { @Override public boolean filter(Tuple3 value, Context > ctx) throws Exception { return value.f2.equals("success2"); } }) .times(2) .next("middle2") .where(new IterativeCondition >() { @Override public boolean filter(Tuple3 value, Context > ctx) throws Exception { return value.f2.equals("fail2"); } }) .times(2) .next("end2"); return pattern; } } @Override public long getPeriod() throws Exception { return 10000; } @Override public boolean isChanged() throws Exception { flag = !flag ; time += getPeriod(); System.out.println("change pattern : " + time); return true; } } 打印结果:符合预期
4.源码地址
感觉有用的话,帮忙点个小星星。^_^
GitHub - StephenYou520/SyCep: CEP 动态Pattern
猜你喜欢
- 4小时前HDFS 原理 详解
- 4小时前【JaveWeb教程】(15) SpringBootWeb之 响应 详细代码示例讲解
- 4小时前Vue项目中关于全局css的处理
- 4小时前JavaWeb期末考试复习资料
- 4小时前vue-router路由模式详解
- 4小时前防火墙部署安全区域
- 4小时前HTTPS:如何确保您的网站数据传输安全?
- 4小时前kafka服务器连接出现:[NetworkClient.java:935] [Producer clientId=producer-1] Node -1 disconnected原因分析
- 4小时前计算机毕业设计 基于Hadoop的物品租赁系统的设计与实现 Java实战项目 附源码+文档+视频讲解
- 4小时前3D Gaussian Splatting:用于实时的辐射场渲染
网友评论
- 搜索
- 最新文章
- 热门文章