上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

Flink CEP(三)pattern动态更新(附源码)

guduadmin24小时前

目录

1.实现分析

2.代码实现

3.测试验证

4.源码地址


        

线上运行的CEP中肯定经常遇到规则变更的情况,如果每次变更时都将任务重启、重新发布是非常不优雅的。尤其在营销或者风控这种对实时性要求比较高的场景,如果规则窗口过长(一两个星期),状态过大,就会导致重启时间延长,期间就会造成一些想要处理的异常行为不能及时发现。

1.实现分析

  • 外部加载:通常规则引擎会有专门的规则管理模块,提供用户去创建自己的规则,对于Flink任务来说需要到外部去加载规则
  • 动态更新:需要提供定时去检测规则是否变更
  • 历史状态清理:在模式匹配中是一系列NFAState 的不断变更,如果规则发生变更,需要清理历史状态
  • API:需要对外提供易用的API

    2.代码实现

           首先实现一个用户API。

    package cep.functions;
    import java.io.Serializable;
    import org.apache.flink.api.common.functions.Function;
    import cep.pattern.Pattern;
    /**
     * @author StephenYou
     * Created on 2023-07-23
     * Description: 动态Pattern接口(用户调用API)不区分key
     */
    public interface DynamicPatternFunction extends Function, Serializable {
        /***
         * 初始化
         * @throws Exception
         */
        public void init() throws Exception;
        /**
         * 注入新的pattern
         * @return
         */
        public Pattern inject() throws Exception;
        /**
         * 一个扫描周期:ms
         * @return
         */
        public long getPeriod() throws Exception;
        /**
         * 规则是否发生变更
         * @return
         */
        public boolean isChanged() throws Exception;
    }
    

            希望上述API的调用方式如下。

    //正常调用
    CEP.pattern(dataStream,pattern);
    //动态Pattern
    CEP.injectionPattern(dataStream, new UserDynamicPatternFunction())

            所以需要修改CEP-Lib源码

            b.增加injectionPattern函数。

    public class CEP {
        /***
         * Dynamic injection pattern function 
         * @param input
         * @param dynamicPatternFunction
         * @return
         * @param 
         */
        public static  PatternStream injectionPattern throws Exception (
                DataStream input,
                DynamicPatternFunction dynamicPatternFunction){
            return new PatternStream<>(input, dynamicPatternFunction); 
        }
    }

            增加PatternStream构造函数,因为需要动态更新,所以有必要传进去整个函数。

    public class PatternStream {
        PatternStream(final DataStream inputStream, DynamicPatternFunction dynamicPatternFunction) throws Exception {
            this(PatternStreamBuilder.forStreamAndPatternFunction(inputStream, dynamicPatternFunction));
        }
    }

            修改PatternStreamBuilder.build, 增加调用函数的过程。

            final CepOperator operator = null;
            if (patternFunction == null ) {
                operator = new CepOperator<>(
                        inputSerializer,
                        isProcessingTime,
                        nfaFactory,
                        comparator,
                        pattern.getAfterMatchSkipStrategy(),
                        processFunction,
                        lateDataOutputTag);
            } else {
                operator = new CepOperator<>(
                        inputSerializer,
                        isProcessingTime,
                        patternFunction,
                        comparator,
                        null,
                        processFunction,
                        lateDataOutputTag);
            }

            增加对应的CepOperator构造函数。

        public CepOperator(
                final TypeSerializer inputSerializer,
                final boolean isProcessingTime,
                final DynamicPatternFunction patternFunction,
                @Nullable final EventComparator comparator,
                @Nullable final AfterMatchSkipStrategy afterMatchSkipStrategy,
                final PatternProcessFunction function,
                @Nullable final OutputTag lateDataOutputTag) {
            super(function);
            this.inputSerializer = Preconditions.checkNotNull(inputSerializer);
            this.patternFunction = patternFunction;
            this.isProcessingTime = isProcessingTime;
            this.comparator = comparator;
            this.lateDataOutputTag = lateDataOutputTag;
            if (afterMatchSkipStrategy == null) {
                this.afterMatchSkipStrategy = AfterMatchSkipStrategy.noSkip();
            } else {
                this.afterMatchSkipStrategy = afterMatchSkipStrategy;
            }
            this.nfaFactory = null;
        }

            加载Pattern,构造NFA

        @Override
        public void open() throws Exception {
            super.open();
            timerService =
                    getInternalTimerService(
                            "watermark-callbacks", VoidNamespaceSerializer.INSTANCE, this);
            //初始化
            if (patternFunction != null) {
                patternFunction.init();
                Pattern pattern = patternFunction.inject();
                afterMatchSkipStrategy = pattern.getAfterMatchSkipStrategy();
                boolean timeoutHandling = getUserFunction() instanceof TimedOutPartialMatchHandler;
                nfaFactory = NFACompiler.compileFactory(pattern, timeoutHandling);
                long period = patternFunction.getPeriod();
                // 注册定时器检测规则是否变更
                if (period > 0) {
                    getProcessingTimeService().registerTimer(timerService.currentProcessingTime() + period, this::onProcessingTime);
                }
            }
            nfa = nfaFactory.createNFA();
            nfa.open(cepRuntimeContext, new Configuration());
            context = new ContextFunctionImpl();
            collector = new TimestampedCollector<>(output);
            cepTimerService = new TimerServiceImpl();
            // metrics
            this.numLateRecordsDropped = metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);
        }

            状态清理一共分为两块: 匹配状态数据清理、定时器清理;

            进行状态清理:

        @Override
        public void processElement(StreamRecord element) throws Exception {
            if (patternFunction != null) {
                // 规则版本更新
                if (needRefresh.value() < refreshVersion.get()) {
                    //清除状态
                    computationStates.clear();
                    elementQueueState.clear();
                    partialMatches.releaseCacheStatisticsTimer();
                    //清除定时器
                    Iterable registerTime = registerTimeState.get();
                    if (registerTime != null) {
                        Iterator iterator = registerTime.iterator();
                        while (iterator.hasNext()) {
                            Long l = iterator.next();
                            //删除定时器
                            timerService.deleteEventTimeTimer(VoidNamespace.INSTANCE, l);
                            timerService.deleteProcessingTimeTimer(VoidNamespace.INSTANCE, l);
                            //状态清理
                            iterator.remove();
                        }
                    }
                    //更新当前的版本
                    needRefresh.update(refreshVersion.get());
                }
            }
    }

            上面是在处理每条数据时,清除状态和版本。接下来要进行状态和版本的初始化。

        @Override
        public void initializeState(StateInitializationContext context) throws Exception {
            super.initializeState(context);
            //初始化状态
            if (patternFunction != null) {
                /**
                 * 两个标识位状态
                 */
                refreshFlagState = context.getOperatorStateStore()
                        .getUnionListState(new ListStateDescriptor("refreshFlagState", Integer.class));
                if (context.isRestored()) {
                    if (refreshFlagState.get().iterator().hasNext()) {
                        refreshVersion = new AtomicInteger(refreshFlagState.get().iterator().next());
                    }
                } else {
                    refreshVersion = new AtomicInteger(0);
                }
                needRefresh = context.getKeyedStateStore()
                        .getState(new ValueStateDescriptor("needRefreshState", Integer.class, 0));
            }
    }

    3.测试验证

            设置每10s变更一次Pattern。

     PatternStream patternStream = CEP.injectionPattern(source, new TestDynamicPatternFunction());
            patternStream.select(new PatternSelectFunction, Map>() {
                @Override
                public Map select(Map map) throws Exception {
                    map.put("processingTime", System.currentTimeMillis());
                    return map;
                }
            }).print();
            env.execute("SyCep");
        }
        public static class TestDynamicPatternFunction implements DynamicPatternFunction> {
            public TestDynamicPatternFunction() {
                this.flag = true;
            }
            boolean flag;
            int time = 0;
            @Override
            public void init() throws Exception {
                flag = true;
            }
            @Override
            public Pattern, Tuple3> inject()
                                throws Exception {
                // 2种pattern
                if (flag) {
                    Pattern pattern = Pattern
                            .>begin("start")
                            .where(new IterativeCondition>() {
                                @Override
                                public boolean filter(Tuple3 value,
                                        Context> ctx) throws Exception {
                                    return value.f2.equals("success");
                                }
                            })
                            .times(1)
                            .followedBy("middle")
                            .where(new IterativeCondition>() {
                                @Override
                                public boolean filter(Tuple3 value,
                                        Context> ctx) throws Exception {
                                    return value.f2.equals("fail");
                                }
                            })
                            .times(1)
                            .next("end");
                    return pattern;
                } else {
                    Pattern pattern = Pattern
                            .>begin("start2")
                            .where(new IterativeCondition>() {
                                @Override
                                public boolean filter(Tuple3 value,
                                        Context> ctx) throws Exception {
                                    return value.f2.equals("success2");
                                }
                            })
                            .times(2)
                            .next("middle2")
                            .where(new IterativeCondition>() {
                                @Override
                                public boolean filter(Tuple3 value,
                                        Context> ctx) throws Exception {
                                    return value.f2.equals("fail2");
                                }
                            })
                            .times(2)
                            .next("end2");
                    return pattern;
                }
            }
            @Override
            public long getPeriod() throws Exception {
                return 10000;
            }
            @Override
            public boolean isChanged() throws Exception {
                flag = !flag ;
                time += getPeriod();
                System.out.println("change pattern : " + time);
                return true;
            }
        }

    打印结果:符合预期

    Flink CEP(三)pattern动态更新(附源码),第1张

    4.源码地址

    感觉有用的话,帮忙点个小星星。^_^

     GitHub - StephenYou520/SyCep: CEP 动态Pattern

网友评论

搜索
最新文章
热门文章
热门标签