案例用到的测试数据请参考文章:
Flink自定义Source模拟数据流
原文链接:https://blog.csdn.net/m0_52606060/article/details/135436048
概述
用户自定义函数(user-defined function,UDF),即用户可以根据自身需求,重新实现算子的逻辑。
用户自定义函数分为:函数类、匿名函数、富函数类。
函数类(Function Classes)
Flink暴露了所有UDF函数的接口,具体实现方式为接口或者抽象类,例如MapFunction、FilterFunction、ReduceFunction等。所以用户可以自定义一个函数类,实现对应的接口。
需求:用来从用户的订单数据中筛选订单金额大于50的内容:
方式一:通过匿名类来实现FilterFunction接口:
import com.zxl.bean.Orders; import com.zxl.datas.OrdersData; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class DemoTest { public static void main(String[] args) throws Exception { //创建Flink流处理执行环境 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); //设置并行度为1 environment.setParallelism(1); //调用Flink自定义Source // TODO: 2024/1/6 订单数据 DataStreamSourceordersDataStreamSource = environment.addSource(new OrdersData()); // TODO: 2024/1/7 实现自定义接口FilterFunction DataStream streamOperator = ordersDataStreamSource.filter(new FilterFunction () { @Override public boolean filter(Orders orders) throws Exception { //过滤金额大于10000元的订单 if (orders.getOrder_amount() > 50) { return true; } else { return false; } } }); streamOperator.print(); environment.execute(); } }
方式二: 实现FilterFunction接口
import com.zxl.bean.Orders; import org.apache.flink.api.common.functions.FilterFunction; public class OrderFilter implements FilterFunction{ @Override public boolean filter(Orders orders) throws Exception { //过滤金额大于10000元的订单 if (orders.getOrder_amount() > 50) { return true; } else { return false; } } }
import com.zxl.Functions.OrderFilter; import com.zxl.bean.Orders; import com.zxl.datas.OrdersData; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class DemoTest { public static void main(String[] args) throws Exception { //创建Flink流处理执行环境 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); //设置并行度为1 environment.setParallelism(1); //调用Flink自定义Source // TODO: 2024/1/6 订单数据 DataStreamSourceordersDataStreamSource = environment.addSource(new OrdersData()); // TODO: 2024/1/7 返回类型记得修改为 DataStream DataStream operator = ordersDataStreamSource.filter(new OrderFilter()); operator.print(); environment.execute(); } }
方式三:采用匿名函数(Lambda)
//创建Flink流处理执行环境 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); //设置并行度为1 environment.setParallelism(1); //调用Flink自定义Source // TODO: 2024/1/6 订单数据 DataStreamSourceordersDataStreamSource = environment.addSource(new OrdersData()); // TODO: 2024/1/7 函数使用Lambda表达式,不需要进行类型声明 DataStream streamOperator = ordersDataStreamSource.filter(orders -> orders.getOrder_amount() > 50); streamOperator.print(); environment.execute();
富函数类(Rich Function Classes)
“富函数类”也是DataStream API提供的一个函数类的接口,所有的Flink函数类都有其Rich版本。富函数类一般是以抽象类的形式出现的。例如:RichMapFunction、RichFilterFunction、RichReduceFunction等。
与常规函数类的不同主要在于,富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。
Rich Function有生命周期的概念。典型的生命周期方法有:
open()方法,是Rich Function的初始化方法,也就是会开启一个算子的生命周期。当一个算子的实际工作方法例如map()或者filter()方法被调用之前,open()会首先被调用。
close()方法,是生命周期中的最后一个调用的方法,类似于结束方法。一般用来做一些清理工作。
需要注意的是,这里的生命周期方法,对于一个并行子任务来说只会调用一次;而对应的,实际工作方法,例如RichMapFunction中的map(),在每条数据到来后都会触发一次调用。
import com.zxl.bean.Orders; import com.zxl.datas.OrdersData; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class DemoTest { public static void main(String[] args) throws Exception { //创建Flink流处理执行环境 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); //设置并行度为1 environment.setParallelism(1); //调用Flink自定义Source // TODO: 2024/1/6 订单数据 DataStreamSourceordersDataStreamSource = environment.addSource(new OrdersData()); ordersDataStreamSource.print(); // TODO: 2024/1/7 接口类型第一个是传入类型,第二个是输出类型 DataStream operator = ordersDataStreamSource.map(new RichMapFunction () { @Override public void open(Configuration parameters) throws Exception { super.open(parameters); System.out.println("索引是:" + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期开始"); } @Override public String map(Orders orders) throws Exception { return orders.getOrder_date().toString()+"字符串"; } @Override public void close() throws Exception { super.close(); System.out.println("索引是:" + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期结束"); } }); operator.print(); environment.execute(); } }
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章