什么是Cep?
在流式数据中(事件流),筛选出符合条件的一系列动作(事件)【复杂事件处理】
什么是 Flink-Cep?
Flink Cep库Api 【实时操作】
官方文档
什么是Pattern?
Pattern就是Cep里的规则制定
Pattern分为个体模式,组合模式(模式序列)和模式组
模式组是将组合模式作为条件的个体模式
Cep开发流程
- DataStream 或 Keyedstream
- 定义规则(Pattern)
- 将规则应用于KeyedStream,生成PatternStream
- 将PatternStream,通过Select方法,将符合规则的数据输出
代码实战
依赖
org.apache.flink flink-cep_${scala.binary.version} ${flink.version}
Cep开发伪代码(个体模式和组合模式)
public class CepDemo { public static void main(String[] args) { // 创建流式计算上下文环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 生成DataStream DataStreamdataStream = null; // 生成KeyedStream (分组) KeyedStream keyedStream = dataStream.keyBy(""); // 生成模式(规则) ( Pattern 对象) /* ********************** * * 【个体模式】 * 1. 【单例】模式:只接收1个事件 * 2. 【循环】模式:能接收多个事件或1个事件, 单例模式 + 量词(times()) * *********************/ // 生成名叫 “login” 的单个Pattern Pattern pattern = Pattern. begin("login").where(new SimpleCondition () { @Override public boolean filter(String s) throws Exception { // Patter规则内容 return false; } }).times(3); /* ********************** * 【组合模式】 * * 组合方式: * 1. next: 严格紧邻 (连续) * 2. fallowedBy: 宽松近邻 (非连续) * 3. fallowedByAny: 非严格匹配,比 fallowedBy 更宽松 * * *********************/ // 生成了两个Patten所组成的Pattern序列,分别名叫 "login", "sale" Pattern patterns = Pattern. begin("login")//.where() .followedBy("sale");//.where(); // 将 Pattern 应用于 KeyedStream, 生成 PatternStream 对象 PatternStream patternStream = CEP.pattern(keyedStream, patterns); // 通过PatternStream 对象的 select() 方法, 将符合规则的数据提取输出 DataStream
【生成模式】
基于【个体模式】检测最近1分钟内登录失败超过3次的用户
CEP模式:允许这3次登录失败事件之间出现其他行为事件(不连续)【宽松近邻】
public class LoginFailBySingleton { public static void main(String[] args) { // Kafka数据源 DataStreameventStream = KafkaUtil.read(args); // 生成KeyedStream 用户id分组 KeyedStream keyedStream = eventStream.keyBy((KeySelector ) EventPO::getUser_id_int); // 生成模式 (规则/Pattern) Pattern. begin("login_fail_first") // Pattern名称 /* 1. IterativeCondition 抽象类 表示通用的匹配规则 需要实现 filter(), 需要传入2个参数 2.SimpleCondition 是 IterativeCondition 的子类,表示简单的匹配规则 需要实现 filter(), 需要传入1个参数 */ .where(new SimpleCondition () { @Override public boolean filter(EventPO eventPO) { // 登录失败事件 return EventConstant.LOGIN_FAIL.equals(eventPO.getEvent_name()); } }) .times(3) // 3次,宽松近邻 .within(Time.seconds(60)); // 最近一分钟(时间) } }
检测最近1分钟内【连续】登录失败超过3次的用户
基于【个体模式】
CEP模式:3次登录失败事件必须是连续的【严格紧邻】
添加该方法即可.consecutive()
public class LoginFailByConsecutive { public static void main(String[] args) { // Kafka DataStreameventStream = KafkaUtil.read(args); // 生成KeyedStream KeyedStream keyedStream = eventStream.keyBy(new KeySelector () { @Override public Integer getKey(EventPO eventPO) throws Exception { return eventPO.getUser_id_int(); } }); Pattern. begin("login_fail_first") .where(new SimpleCondition () { @Override public boolean filter(EventPO eventPO) throws Exception { return EventConstant.LOGIN_FAIL.equals(eventPO.getEvent_name()); } }) /* ********************** * 1. 个体模式的循环模式 匹配的是 宽松近邻 (能够允许插入其他事件) * 2. consecutive() 就指定匹配模式是 严格紧邻(连续) * *********************/ .times(3) .consecutive() // 连续 .within(Time.seconds(60)); } }
宽松近邻与严格紧邻
宽松近邻:不连续事件
严格紧邻:连续事件
例子:
事件流1(连续登录失败的事件流):event_A(login_fail),event_B(login_fail),event_C(login_fail)【严格紧邻】
事件流2(不连续登录失败的事件流):event_A(login_fail),event_D(login_success),event_B(login_fail),event_C(login_fail)【宽松近邻】
基于【组合模式】
单体模式、组合模式通用
组合模式.next(...)严格紧邻(连续事件)
public class LoginFailByComposite { public static void main(String[] args) { DataStreameventStream = KafkaUtil.read(args); KeyedStream keyedStream = eventStream.keyBy(new KeySelector () { @Override public Integer getKey(EventPO eventPO) throws Exception { return eventPO.getUser_id_int(); } }); // 三个连续登录失败事件【组合模式】 Pattern. begin("login_fail_first") .where(new SimpleCondition () { @Override public boolean filter(EventPO eventPO) throws Exception { return EventConstant.LOGIN_FAIL.equals(eventPO.getEvent_name()); } }) .next("login_fail_second") // 严格紧邻 .where(new SimpleCondition () { @Override public boolean filter(EventPO eventPO) throws Exception { return EventConstant.LOGIN_FAIL.equals(eventPO.getEvent_name()); } }) .next("login_fail_third") .where(new SimpleCondition () { @Override public boolean filter(EventPO eventPO) throws Exception { return EventConstant.LOGIN_FAIL.equals(eventPO.getEvent_name()); } }) .within(Time.seconds(60)); } }
基于【迭代条件】检测最近15分钟内IP更换次数超过3次的用户
注意:
-
对于每个模式 (规则/Pattern)
可以设置条件,判定到达的行为事件,是否能够进入到这个模式
如:设置条件为只有登录成功这个行为事件,才能够进入到这个模式
-
条件的设置方法是:where()
where() 的参数是 IterativeCondition对象
-
IterativeCondition 称为迭代条件
能够设置较复杂的条件,尤其和循环模式相结合
public class IpChangeByIterative { public static void main(String[] args) { DataStreameventStream = KafkaUtil.read(args); KeyedStream keyedStream = eventStream.keyBy((KeySelector ) EventPO::getUser_id_int); Pattern pattern = Pattern. // 组合模式以begin开头, // 不设置条件,所有行为事件都可以进入到这个模式 begin("ip") // 判断用户行为事件在15分钟内IP是否发生变化(更换IP之间可以有其他事件) .followedBy("next").where(new IpChangeCondition()) // 15分钟内IP发生变化次数超过3次 .timesOrMore(3) // 满足条件的行为事件必须在最近15分钟内 .within(Time.seconds(900)); // 将模式应用到事件流 /* ********************** * * CEP.pattern(), * 还可以有第3个参数, * 第3个参数是比较器 EventComparator 对象, * 可以对于同时进入模式的行为事件,进行更精确的排序 * * *********************/ PatternStream patternStream = CEP.pattern(keyedStream, pattern); // 提取数据... } }
判断条件
继承IterativeCondition类
Context 是上下文对象,getEventsForPattern(...)根据传入的模式名获取对应模式中已匹配的所有行为事件
public class IpChangeCondition extends IterativeCondition{ @Override public boolean filter(EventPO eventPO, Context context) throws Exception { boolean change = false; // 当前模式名称是"ip", 获取当前模式之前已经匹配的事件 for (EventPO preEvent : context.getEventsForPattern("ip")) { // 前一个行为事件的IP String preIP = preEvent.getEvent_context().getDevice().getIp(); // 当前行为事件的IP String IP = eventPO.getEvent_context().getDevice().getIp(); // 判断前后行为事件的IP是否发生变化 if (!Objects.equals(preIP, IP)) { change = true; break; } } return change; } }
用户在15分钟内的行为路径是"登录-领券-下单"(明显薅羊毛行为特征)
组合模式
public class ClipCouponsRoute { public static void main(String[] args) { DataStreameventStream = KafkaUtil.read(args); KeyedStream keyedStream = eventStream.keyBy((KeySelector ) EventPO::getUser_id_int); // 生成模式 (规则/Pattern)【组合模式】 Pattern pattern = Pattern // 过滤登录行为事件 . begin("login").where(new SimpleCondition () { @Override public boolean filter(EventPO eventPO) throws Exception { return EventConstant.LOGIN_SUCCESS.equals(eventPO.getEvent_type()); } }) // 宽松近邻:过滤领取优惠券行为事件 .followedBy("receive").where(new SimpleCondition () { @Override public boolean filter(EventPO eventPO) throws Exception { return EventConstant.COUPON_RECEIVE.equals(eventPO.getEvent_type()); } }) // 宽松近邻:过滤使用优惠券行为事件 .followedBy("use").where(new SimpleCondition () { @Override public boolean filter(EventPO eventPO) throws Exception { return EventConstant.COUPON_USE.equals(eventPO.getEvent_type()); } }) // 模式有效时间:15分钟内 .within(Time.minutes(15)); } }
【提取、输出事件流】
将模式应用到事件流生成PatternStream
生成Pattern之后,就要提取输出事件流
// ... // 生成模式 (规则/Pattern) Patternpattern = Pattern. begin("ip") .followedBy("next").where(new IpChangeCondition()) .timesOrMore(3) .within(Time.seconds(900)); // 将模式应用到事件流 /* ********************** * CEP.pattern(), * 还可以有第3个参数, * 第3个参数是比较器 EventComparator 对象, * 可以对于同时进入模式的行为事件,进行更精确的排序 * *********************/ PatternStream patternStream = CEP.pattern(keyedStream, pattern);
PatternStream三个提取匹配事件方法
- select(): 参数是 PatternSelectFunction 对象,有返回值
- flatselect():参数是 PatternFlatSelectFunction 对象,无返回值,可以通过 Collector.collect() 以事件流输出
- process(): 参数是 PatternProcessFunction 对象,无返回值,可以通过 Collector.collect() 以事件流输出也可以通过 Context对象获取上下文信息
建议使用 flatSelect(), 可以更加灵活;官方建议使用 process()
以15分钟IP变化为例,完整代码:
public class IpChangeByIterative { public static void main(String[] args) { DataStreameventStream = KafkaUtil.read(args); KeyedStream keyedStream = eventStream.keyBy((KeySelector ) EventPO::getUser_id_int); // 生成模式 (规则/Pattern) Pattern pattern = Pattern. begin("ip") .followedBy("next").where(new IpChangeCondition()) .timesOrMore(3) .within(Time.seconds(900)); // 将模式应用到事件流 PatternStream patternStream = CEP.pattern(keyedStream, pattern); // 提取匹配事件 DataStream result = patternStream.process(new IpChangeProcessFunction()); // 执行规则命中的策略动作 } }
public class IpChangeProcessFunction extends PatternProcessFunction{ /** * @param map Map<模式名, 模式名对应匹配事件列表> * @param context 上下文对象 * @param collector 输出事件流 */ @Override public void processMatch(Map > map, Context context, Collector collector) throws Exception { } }
提取输出事件流,下游算子处理
Flink-Cep基石 NFA状态转移流程
薅羊毛用户是有着明显目的的
正常用户行为事件流:
来回比较不同商品价格,最终决定购买哪件商品。
薅羊毛用户行为事件流:
带有很强的目的性,“登录-领券-下单”事件流一气呵成。
Cep底层原理:
- CEP模式匹配:每个模式包含多个状态
- CEP模式匹配:状态转换的过程(NFA)
以羊毛党购买商品为例,状态变化流程:
匹配上事件,设置状态,三个状态都不一样,符合条件的事件,放到结果集中,与预先设置条件数量一致。
状态最后转换为最终状态,后续传递给下游算子计算。
CEP工作流程:
- 定义一个一个的Pattern,如有多个Pattern,将Pattern串联起来构成模式匹配的逻辑表达
- 将模式匹配分拆,创建NFA对象
- NFA对象包含了这个模式匹配的状态和状态转换表达式
- 状态变化、处理
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章