上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

【Flink-1.17-教程】-【四】Flink DataStream API(3)转换算子(Transformation)【用户自定义函数(UDF)】

guduadmin241月前

【Flink-1.17-教程】-【四】Flink DataStream API(3)转换算子(Transformation)【用户自定义函数(UDF)】

  • 1)函数类(Function Classes)
  • 2)富函数类(Rich Function Classes)

    用户自定义函数(user-defined function,UDF),即用户可以根据自身需求,重新实现算子的逻辑。

    用户自定义函数分为:函数类、匿名函数、富函数类。

    1)函数类(Function Classes)

    Flink 暴露了所有 UDF 函数的接口,具体实现方式为接口或者抽象类,例如 MapFunction、FilterFunction、ReduceFunction 等。所以用户可以自定义一个函数类,实现对应的接口。

    需求:用来从用户的点击数据中筛选包含“sensor_1”的内容:

    方式一:实现 FilterFunction 接口

    public class TransFunctionUDF {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource stream = env.fromElements(
                    new WaterSensor("sensor_1", 1, 1),
                    new WaterSensor("sensor_1", 2, 2),
                    new WaterSensor("sensor_2", 2, 2),
                    new WaterSensor("sensor_3", 3, 3)
            );
            DataStream filter = stream.filter(new UserFilter());
            filter.print();
            env.execute();
        }
        public static class UserFilter implements
                FilterFunction {
            @Override
            public boolean filter(WaterSensor e) throws Exception {
                return e.id.equals("sensor_1");
            }
        }
    }
    

    方式二:通过匿名类来实现 FilterFunction 接口

    DataStream stream = stream.filter(new FilterFunction<
                WaterSensor>() {
            @Override
            public boolean filter(WaterSensor e) throws Exception {
                return e.id.equals("sensor_1");
            }
        });
    

    方式二的优化:为了类可以更加通用,我们还可以将用于过滤的关键字"home"抽象出来作为类的属性,调用构造方法时传进去

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource stream = env.fromElements(
                    new WaterSensor("sensor_1", 1, 1),
                    new WaterSensor("sensor_1", 2, 2),
                    new WaterSensor("sensor_2", 2, 2),
                    new WaterSensor("sensor_3", 3, 3)
            );
            DataStream stream = stream.filter(new
                    FilterFunctionImpl("sensor_1"));
            public static class FilterFunctionImpl implements
                    FilterFunction {
                private String id;
                FilterFunctionImpl(String id) {
                    this.id = id;
                }
                @Override
                public boolean filter(WaterSensor value) throws Exception {
                    return thid.id.equals(value.id);
                }
            }
        }
    

    方式三:采用匿名函数(Lambda)

    public class TransFunctionUDF {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource stream = env.fromElements(
                    new WaterSensor("sensor_1", 1, 1),
                    new WaterSensor("sensor_1", 2, 2),
                    new WaterSensor("sensor_2", 2, 2),
                    new WaterSensor("sensor_3", 3, 3)
            );
    //map 函数使用 Lambda 表达式,不需要进行类型声明
            SingleOutputStreamOperator filter =
                    stream.filter(sensor -> "sensor_1".equals(sensor.id));
            filter.print();
            env.execute();
        }
    }
    

    2)富函数类(Rich Function Classes)

    “富函数类”也是 DataStream API 提供的一个函数类的接口,所有的 Flink 函数类都有其 Rich 版 本 。 富函数类一般是以抽象类的形式出现的。例如:RichMapFunction 、RichFilterFunction、RichReduceFunction 等。

    与常规函数类的不同主要在于,富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。

    Rich Function 有生命周期的概念。典型的生命周期方法有:

    • open() 方法,是 Rich Function 的初始化方法,也就是会开启一个算子的生命周期。当一个算子的实际工作方法例如 map() 或者 filter() 方法被调用之前,open() 会首先被调用。

    • close() 方法,是生命周期中的最后一个调用的方法,类似于结束方法。一般用来做一些清理工作。

      需要注意的是,这里的生命周期方法,对于一个并行子任务来说只会调用一次;而对应的,实际工作方法,例如 RichMapFunction 中的 map(),在每条数据到来后都会触发一次调用。

      来看一个例子说明:

      public class RichFunctionExample {
          public static void main(String[] args) throws Exception {
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              env.setParallelism(2);
              env
                      .fromElements(1, 2, 3, 4)
                      .map(new RichMapFunction() {
                          @Override
                          public void open(Configuration parameters) throws Exception {
                              super.open(parameters);
                              System.out.println(" 索 引 是 : " + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期开始");
                          }
                          @Override
                          public Integer map(Integer integer) throws
                                  Exception {
                              return integer + 1;
                          }
                          @Override
                          public void close() throws Exception {
                              super.close();
                              System.out.println(" 索 引 是 : " + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期结束");
                          }
                      })
                      .print();
              env.execute();
          }
      }
      

网友评论

搜索
最新文章
热门文章
热门标签
 
 40几岁的女人梦见自己怀孕了  梦见猫缠着自己甩不掉  梦见前男友回来找我复合