上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

【flink番外篇】10、对有状态或及时 UDF 和自定义算子进行单元测试

guduadmin312小时前

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列

    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列

    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列

    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列

    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列

    本部分和实际的运维、监控工作相关。

    二、Flink 示例专栏

    Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

    两专栏的所有文章入口点击:Flink 系列文章汇总索引


    文章目录

    • Flink 系列文章
    • 一、有状态算子单元测试的介绍
      • 1、DataStream API 测试依赖
      • 2、Table API 测试依赖
      • 3、maven依赖
      • 二、flatmap function 单元测试
        • 1、OneInputStreamOperatorTestHarness使用示例
        • 2、KeyedOneInputStreamOperatorTestHarness使用示例
        • 三、Process Function 单元测试
          • 1、OneInputStreamOperatorTestHarness使用示例
          • 2、ProcessFunctionTestHarnesses使用示例

            本文着重介绍了Flink的有状态算子的单元测试,通过四个例子介绍了flatMap 和 process function的有状态单元测试。

            如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

            本文除了maven依赖外,没有其他依赖。

            关于单元测试更详细内容参考:50、Flink的单元测试介绍及示例

            一、有状态算子单元测试的介绍

            对使用管理状态或定时器的用户自定义函数的功能测试会更加困难,因为它涉及到测试用户代码和 Flink 运行时的交互。 为此,Flink 提供了一组所谓的测试工具,可用于测试用户自定义函数和自定义算子:

            • OneInputStreamOperatorTestHarness (适用于 DataStream 上的算子)
            • KeyedOneInputStreamOperatorTestHarness (适用于 KeyedStream 上的算子)
            • TwoInputStreamOperatorTestHarness (f适用于两个 DataStream 的 ConnectedStreams 算子)
            • KeyedTwoInputStreamOperatorTestHarness (适用于两个 KeyedStream 上的 ConnectedStreams 算子)

              要使用测试工具,还需要一组其他的依赖项,比如DataStream和TableAPI的依赖。

              1、DataStream API 测试依赖

              如果要为使用 DataStream API 构建的作业开发测试用例,则需要添加以下依赖项:

              
                  org.apache.flink
                  flink-test-utils
                  1.17.2
                  test
              
              

              在各种测试实用程序中,该模块提供了 MiniCluster (一个可配置的轻量级 Flink 集群,能在 JUnit 测试中运行),可以直接执行作业。

              2、Table API 测试依赖

              如果您想在您的 IDE 中本地测试 Table API 和 SQL 程序,除了前述提到的 flink-test-utils 之外,您还要添加以下依赖项:

              
                  org.apache.flink
                  flink-table-test-utils
                  1.17.2
                  test
              
              

              这将自动引入查询计划器和运行时,分别用于计划和执行查询。

              flink-table-test-utils 模块已在 Flink 1.15 中引入,截至Flink 1.17版本被认为是实验性的。

              3、maven依赖

              本文示例的maven依赖

              	
              		UTF-8
              		UTF-8
              		1.8
              		1.8
              		1.8
              		2.12
              		1.17.0
              	
              	
              		
              			org.apache.flink
              			flink-clients
              			${flink.version}
              			provided
              		
              		
              			org.apache.flink
              			flink-java
              			${flink.version}
              			provided
              		
              		
              			org.apache.flink
              			flink-streaming-java
              			${flink.version}
              			provided
              		
              		
              			org.apache.flink
              			flink-csv
              			${flink.version}
              			provided
              		
              		
              			org.apache.flink
              			flink-json
              			${flink.version}
              			provided
              		
              		
              			junit
              			junit
              			4.13
              		
              		
              			org.mockito
              			mockito-core
              			4.0.0
              			test
              		
              	
              	
              

              二、flatmap function 单元测试

              可以使用测试工具将记录和 watermark 推送到用户自定义函数或自定义算子中,控制处理时间,最后对算子的输出(包括旁路输出)进行校验。

              1、OneInputStreamOperatorTestHarness使用示例

              /*
               * @Author: alanchan
               * @LastEditors: alanchan
               * @Description: 单元测试flatmap,如果是偶数则存储原值及平方数
               */
              import java.util.concurrent.ConcurrentLinkedQueue;
              import org.apache.flink.api.common.functions.FlatMapFunction;
              import org.apache.flink.streaming.api.operators.StreamFlatMap;
              import org.apache.flink.streaming.api.watermark.Watermark;
              import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
              import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
              import org.apache.flink.streaming.util.TestHarnessUtil;
              import org.apache.flink.util.Collector;
              import org.junit.Before;
              import org.junit.Test;
              public class TestStatefulFlatMapDemo3 {
                  static class AlanFlatMapFunction implements FlatMapFunction {
                      @Override
                      public void flatMap(Integer value, Collector out) throws Exception {
                          if (value % 2 == 0) {
                              out.collect(value);
                              out.collect(value * value);
                          }
                      }
                  }
                  OneInputStreamOperatorTestHarness testHarness;
                  @Before
                  public void setupTestHarness() throws Exception {
                      StreamFlatMap operator = new StreamFlatMap(new AlanFlatMapFunction());
                      testHarness = new OneInputStreamOperatorTestHarness(operator);
                      testHarness.open();
                  }
                  @Test
                  public void testFlatMap2() throws Exception {
                      long initialTime = 0L;
                      ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
                      testHarness.processElement(new StreamRecord(1, initialTime + 1));
                      testHarness.processElement(new StreamRecord(2, initialTime + 2));
                      testHarness.processWatermark(new Watermark(initialTime + 2));
                      testHarness.processElement(new StreamRecord(3, initialTime + 3));
                      testHarness.processElement(new StreamRecord(4, initialTime + 4));
                      testHarness.processElement(new StreamRecord(5, initialTime + 5));
                      testHarness.processElement(new StreamRecord(6, initialTime + 6));
                      testHarness.processElement(new StreamRecord(7, initialTime + 7));
                      testHarness.processElement(new StreamRecord(8, initialTime + 8));
                      expectedOutput.add(new StreamRecord(2, initialTime + 2));
                      expectedOutput.add(new StreamRecord(4, initialTime + 2));
                      expectedOutput.add(new Watermark(initialTime + 2));
                      expectedOutput.add(new StreamRecord(4, initialTime + 4));
                      expectedOutput.add(new StreamRecord(16, initialTime + 4));
                      expectedOutput.add(new StreamRecord(6, initialTime + 6));
                      expectedOutput.add(new StreamRecord(36, initialTime + 6));
                      expectedOutput.add(new StreamRecord(8, initialTime + 8));
                      expectedOutput.add(new StreamRecord(64, initialTime + 8));
                      TestHarnessUtil.assertOutputEquals("输出结果", expectedOutput, testHarness.getOutput());
                  }
              }
               
              

              2、KeyedOneInputStreamOperatorTestHarness使用示例

              KeyedOneInputStreamOperatorTestHarness 和 KeyedTwoInputStreamOperatorTestHarness 可以通过为键的类另外提供一个包含 TypeInformation 的 KeySelector 来实例化。

              /*
               * @Author: alanchan
               * @LastEditors: alanchan
               * @Description: 按照城市分类,并将城市缩写变成大写
               */
              import com.google.common.collect.Lists;
              import org.apache.flink.api.common.functions.RichFlatMapFunction;
              import org.apache.flink.api.common.state.ValueState;
              import org.apache.flink.api.common.state.ValueStateDescriptor;
              import org.apache.flink.api.common.typeinfo.Types;
              import org.apache.flink.api.java.functions.KeySelector;
              import org.apache.flink.configuration.Configuration;
              import org.apache.flink.streaming.api.operators.StreamFlatMap;
              import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
              import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
              import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
              import org.apache.flink.util.Collector;
              import org.junit.Assert;
              import org.junit.Before;
              import org.junit.Test;
              import lombok.AllArgsConstructor;
              import lombok.Data;
              import lombok.NoArgsConstructor;
              public class TestStatefulFlatMapDemo2 {
                  @Data
                  @NoArgsConstructor
                  @AllArgsConstructor
                  static class User {
                      private int id;
                      private String name;
                      private int age;
                      private String city;
                  }
                  static class AlanFlatMapFunction extends RichFlatMapFunction {
                      // The state is only accessible by functions applied on a {@code KeyedStream}
                      ValueState previousInput;
                      @Override
                      public void open(Configuration parameters) throws Exception {
                          super.open(parameters);
                          previousInput = getRuntimeContext()
                                  .getState(new ValueStateDescriptor("previousInput", User.class));
                      }
                      @Override
                      public void flatMap(User input, Collector out) throws Exception {
                          previousInput.update(input);
                          input.setCity(input.getCity().toUpperCase());
                          out.collect(input);
                      }
                  }
                  AlanFlatMapFunction alanFlatMapFunction = new AlanFlatMapFunction();
                  OneInputStreamOperatorTestHarness testHarness;
                  @Before
                  public void setupTestHarness() throws Exception {
                      alanFlatMapFunction = new AlanFlatMapFunction();
                      testHarness = new KeyedOneInputStreamOperatorTestHarness<>(new StreamFlatMap<>(alanFlatMapFunction),
                              new KeySelector() {
                                  @Override
                                  public String getKey(User value) throws Exception {
                                      return value.getCity();
                                  }
                              }, Types.STRING);
                      
                      testHarness.open();
                  }
                  @Test
                  public void testFlatMap() throws Exception {
                      testHarness.processElement(new User(1, "alanchan", 18, "sh"), 10);
                      ValueState previousInput = alanFlatMapFunction.getRuntimeContext().getState(
                              new ValueStateDescriptor<>("previousInput", User.class));
                      User stateValue = previousInput.value();
                      Assert.assertEquals(
                              Lists.newArrayList(new StreamRecord<>(new User(1, "alanchan", 18, "sh".toUpperCase()), 10)),
                              testHarness.extractOutputStreamRecords());
                      Assert.assertEquals(new User(1, "alanchan", 18, "sh".toUpperCase()), stateValue);
                      testHarness.processElement(new User(2, "alan", 19, "bj"), 10000);
                      Assert.assertEquals(
                              Lists.newArrayList(
                                      new StreamRecord<>(new User(1, "alanchan", 18, "sh".toUpperCase()), 10),
                                      new StreamRecord<>(new User(2, "alan", 19, "bj".toUpperCase()), 10000)),
                              testHarness.extractOutputStreamRecords());
                      Assert.assertEquals(new User(2, "alan", 19, "bj".toUpperCase()), previousInput.value());
                  }
              }
              

              三、Process Function 单元测试

              除了之前可以直接用于测试 ProcessFunction 的测试工具之外,Flink 还提供了一个名为 ProcessFunctionTestHarnesses 的测试工具工厂类,可以简化测试工具的实例化。

              1、OneInputStreamOperatorTestHarness使用示例

              import com.google.common.collect.Lists;
              import org.apache.flink.api.common.typeinfo.Types;
              import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
              import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
              import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
              import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
              import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
              import org.apache.flink.util.Collector;
              import org.junit.Assert;
              import org.junit.Before;
              import org.junit.Test;
              /*
               * @Author: alanchan
               * @LastEditors: alanchan
               * @Description: 
               */
              public class TestProcessOperatorDemo1 {
                  // public abstract class KeyedProcessFunction
                  static class AlanProcessFunction extends KeyedProcessFunction {
                      @Override
                      public void processElement(String value, KeyedProcessFunction.Context ctx,
                              Collector out) throws Exception {
                          ctx.timerService().registerProcessingTimeTimer(50);
                          out.collect("vx->" + value);
                      }
                      @Override
                      public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {
                          // 到达时间点触发事件操作
                          out.collect(String.format("定时器在 %d 被触发", timestamp));
                      }
                  }
                  private OneInputStreamOperatorTestHarness testHarness;
                  private AlanProcessFunction processFunction;
                  @Before
                  public void setupTestHarness() throws Exception {
                      processFunction = new AlanProcessFunction();
                      testHarness = new KeyedOneInputStreamOperatorTestHarness<>(
                              new KeyedProcessOperator<>(processFunction),
                              x -> "1",
                              Types.STRING);
                      // Function time is initialized to 0
                      testHarness.open();
                  }
                  @Test
                  public void testProcessElement() throws Exception {
                      testHarness.processElement("alanchanchn", 10);
                      Assert.assertEquals(
                              Lists.newArrayList(
                                      new StreamRecord<>("vx->alanchanchn", 10)),
                              testHarness.extractOutputStreamRecords());
                  }
                  @Test
                  public void testOnTimer() throws Exception {
                      // test first record
                      testHarness.processElement("alanchanchn", 10);
                      Assert.assertEquals(1, testHarness.numProcessingTimeTimers());
                      // Function time 设置为 100
                      testHarness.setProcessingTime(100);
                      Assert.assertEquals(
                              Lists.newArrayList(
                                      new StreamRecord<>("vx->alanchanchn", 10),
                                      new StreamRecord<>("定时器在 100 被触发")),
                              testHarness.extractOutputStreamRecords());
                  }
              }
              

              2、ProcessFunctionTestHarnesses使用示例

              本示例通过ProcessFunctionTestHarnesses验证了ProcessFunction、KeyedProcessFunction、CoProcessFunction、KeyedCoProcessFunction和BroadcastProcessFunction,基本完成了覆盖。

              import java.util.Arrays;
              import java.util.Collections;
              import org.apache.flink.api.common.state.MapStateDescriptor;
              import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
              import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
              import org.apache.flink.api.common.typeinfo.TypeInformation;
              import org.apache.flink.api.java.functions.KeySelector;
              import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
              import org.apache.flink.streaming.api.functions.ProcessFunction;
              import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
              import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
              import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
              import org.apache.flink.streaming.util.BroadcastOperatorTestHarness;
              import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
              import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
              import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses;
              import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
              import org.apache.flink.util.Collector;
              import org.junit.Assert;
              import org.junit.Test;
              import lombok.AllArgsConstructor;
              import lombok.Data;
              import lombok.NoArgsConstructor;
              /*
               * @Author: alanchan
               * 
               * @LastEditors: alanchan
               * 
               * @Description:
               */
              public class TestProcessOperatorDemo3 {
                  @Data
                  @NoArgsConstructor
                  @AllArgsConstructor
                  static class User {
                      private int id;
                      private String name;
                      private int age;
                      private String city;
                  }
                  // 测试ProcessFunction 的 processElement
                  @Test
                  public void testProcessFunction() throws Exception {
                      // public abstract class ProcessFunction
                      ProcessFunction function = new ProcessFunction() {
                          @Override
                          public void processElement(
                                  String value, Context ctx, Collector out) throws Exception {
                              out.collect("vx->" + value);
                          }
                      };
                      OneInputStreamOperatorTestHarness harness = ProcessFunctionTestHarnesses
                              .forProcessFunction(function);
                      harness.processElement("alanchanchn", 10);
                      Assert.assertEquals(harness.extractOutputValues(), Collections.singletonList("vx->alanchanchn"));
                  }
                  // 测试KeyedProcessFunction 的 processElement
                  @Test
                  public void testKeyedProcessFunction() throws Exception {
                      // public abstract class KeyedProcessFunction
                      KeyedProcessFunction function = new KeyedProcessFunction() {
                          @Override
                          public void processElement(String value, KeyedProcessFunction.Context ctx,
                                  Collector out) throws Exception {
                              out.collect("vx->" + value);
                          }
                      };
                      OneInputStreamOperatorTestHarness harness = ProcessFunctionTestHarnesses
                              .forKeyedProcessFunction(function, x -> "name", BasicTypeInfo.STRING_TYPE_INFO);
                      harness.processElement("alanchan", 10);
                      Assert.assertEquals(harness.extractOutputValues(), Collections.singletonList(1));
                  }
                  // 测试CoProcessFunction 的 processElement1、processElement2
                  @Test
                  public void testCoProcessFunction() throws Exception {
                      // public abstract class CoProcessFunction
                      CoProcessFunction function = new CoProcessFunction() {
                          @Override
                          public void processElement1(String value, CoProcessFunction.Context ctx,
                                  Collector out) throws Exception {
                              String[] userStr = value.split(",");
                              out.collect(
                                      new User(Integer.parseInt(userStr[0]), userStr[1], Integer.parseInt(userStr[2]), userStr[3]));
                          }
                          @Override
                          public void processElement2(User value, CoProcessFunction.Context ctx,
                                  Collector out) throws Exception {
                              out.collect(value);
                          }
                      };
                      TwoInputStreamOperatorTestHarness harness = ProcessFunctionTestHarnesses
                              .forCoProcessFunction(function);
                      harness.processElement2(new User(2, "alan", 19, "bj"), 100);
                      harness.processElement1("1,alanchan,18,sh", 10);
                      Assert.assertEquals(harness.extractOutputValues(),
                              Arrays.asList(new User(1, "alanchan", 18, "sh"), new User(2, "alan", 19, "bj")));
                  }
                  // 测试KeyedCoProcessFunction 的 processElement1和processElement2
                  @Test
                  public void testKeyedCoProcessFunction() throws Exception {
                      // public abstract class KeyedCoProcessFunction
                      KeyedCoProcessFunction function = new KeyedCoProcessFunction() {
                          @Override
                          public void processElement1(String value, KeyedCoProcessFunction.Context ctx,
                                  Collector out) throws Exception {
                              String[] userStr = value.split(",");
                              out.collect(
                                      new User(Integer.parseInt(userStr[0]), userStr[1], Integer.parseInt(userStr[2]), userStr[3]));
                          }
                          @Override
                          public void processElement2(User value, KeyedCoProcessFunction.Context ctx,
                                  Collector out) throws Exception {
                              out.collect(value);
                          }
                      };
                      // public static 
                      // KeyedTwoInputStreamOperatorTestHarness
                      // forKeyedCoProcessFunction(
                      // KeyedCoProcessFunction function,
                      // KeySelector keySelector1,
                      // KeySelector keySelector2,
                      // TypeInformation keyType)
                      KeyedTwoInputStreamOperatorTestHarness harness = ProcessFunctionTestHarnesses
                              .forKeyedCoProcessFunction(function, new KeySelector() {
                                  @Override
                                  public String getKey(String value) throws Exception {
                                      return value.split(",")[3];
                                  }
                              }, new KeySelector() {
                                  @Override
                                  public String getKey(User value) throws Exception {
                                      return value.getCity();
                                  }
                              }, TypeInformation.of(String.class));
                      harness.processElement2(new User(2, "alan", 19, "bj"), 100);
                      harness.processElement1("1,alanchan,18,sh", 10);
                      Assert.assertEquals(harness.extractOutputValues(),
                              Arrays.asList(new User(1, "alanchan", 18, "sh"), new User(2, "alan", 19, "bj")));
                  }
                  // 测试 BroadcastProcessFunction 的 processElement 和 processBroadcastElement
                  @Test
                  public void testBroadcastOperator() throws Exception {
                      // 定义广播
                      // 数据格式:
                      // sh,上海
                      // bj,北京
                      // public class MapStateDescriptor
                      MapStateDescriptor broadcastDesc = new MapStateDescriptor("Alan_RulesBroadcastState",
                              String.class,
                              String.class);
                      // public abstract class BroadcastProcessFunction
                      // * @param  The input type of the non-broadcast side.
                      // * @param  The input type of the broadcast side.
                      // * @param  The output type of the operator.
                      BroadcastProcessFunction function = new BroadcastProcessFunction() {
                          // 负责处理广播流的元素
                          @Override
                          public void processBroadcastElement(String value, BroadcastProcessFunction.Context ctx,
                                  Collector out) throws Exception {
                              System.out.println("收到广播数据:" + value);
                              // 得到广播流的存储状态
                              ctx.getBroadcastState(broadcastDesc).put(value.split(",")[0], value.split(",")[1]);
                          }
                          // 处理非广播流,关联维度
                          @Override
                          public void processElement(User value, BroadcastProcessFunction.ReadOnlyContext ctx,
                                  Collector out) throws Exception {
                              // 得到广播流的存储状态
                              ReadOnlyBroadcastState state = ctx.getBroadcastState(broadcastDesc);
                              value.setCity(state.get(value.getCity()));
                              out.collect(value);
                          }
                      };
                      BroadcastOperatorTestHarness harness = ProcessFunctionTestHarnesses
                              .forBroadcastProcessFunction(function, broadcastDesc);
                      harness.processBroadcastElement("sh,上海", 10);
                      harness.processBroadcastElement("bj,北京", 20);
                      harness.processElement(new User(2, "alan", 19, "bj"), 10);
                      harness.processElement(new User(1, "alanchan", 18, "sh"), 30);
                      Assert.assertEquals(harness.extractOutputValues(),
                              Arrays.asList(new User(1, "alanchan", 18, "上海"), new User(2, "alan", 19, "北京")));
                  }
              }
              

              以上,本文着重介绍了Flink的有状态算子的单元测试,通过四个例子介绍了flatMap 和 process function的有状态单元测试。

              网友评论

              搜索
              最新文章
              热门文章
              热门标签