上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

51、Flink的管理执行(执行配置、程序打包和并行执行)的介绍及示例

guduadmin43小时前

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列

    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列

    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列

    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列

    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列

    本部分和实际的运维、监控工作相关。

    二、Flink 示例专栏

    Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

    两专栏的所有文章入口点击:Flink 系列文章汇总索引


    文章目录

    • Flink 系列文章
    • 一、执行配置
    • 二、程序打包和分布式运行
      • 1、打包程序
      • 2、总结
      • 三、并行执行
        • 1、设置并行度
          • 1)、算子层次
          • 2)、执行环境层次
          • 3)、客户端层次
          • 4)、系统层次
          • 2、设置最大并行度

            本文介绍了Flink的管理执行的三个内容,即执行配置、打包和分布式运行以及并行执行(设置并行度的几种方式)。

            如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

            本文除了maven依赖外,没有其他依赖。

            一、执行配置

            StreamExecutionEnvironment 包含了 ExecutionConfig,它允许在运行时设置作业特定的配置值。要更改影响所有作业的默认值,请参阅配置。

            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            ExecutionConfig executionConfig = env.getConfig();
            

            以下是可用的配置选项(下面粗体部分为系统默认值):

            • setClosureCleanerLevel()

              closure cleaner 的级别默认设置为 ClosureCleanerLevel.RECURSIVE。closure cleaner 删除 Flink 程序中对匿名 function 的调用类的不必要引用。禁用 closure cleaner 后,用户的匿名 function 可能正引用一些不可序列化的调用类。这将导致序列化器出现异常。

              可设置的值是:

              NONE:完全禁用 closure cleaner ,

              TOP_LEVEL:只清理顶级类而不递归到字段中,

              RECURSIVE:递归清理所有字段。

            • getParallelism() / setParallelism(int parallelism)

              为作业设置默认的并行度。

            • getMaxParallelism() / setMaxParallelism(int parallelism)

              为作业设置默认的最大并行度。此设置决定最大并行度并指定动态缩放的上限。

            • getNumberOfExecutionRetries() / setNumberOfExecutionRetries(int numberOfExecutionRetries)

              设置失败任务重新执行的次数。

              值为零会有效地禁用容错。

              -1 表示使用系统默认值(在配置中定义)。

              该配置已弃用,请改用重启策略 。

            • getExecutionRetryDelay() / setExecutionRetryDelay(long executionRetryDelay)

              设置系统在作业失败后重新执行之前等待的延迟(以毫秒为单位)。

              在 TaskManagers 上成功停止所有任务后,开始计算延迟,一旦延迟过去,任务会被重新启动。

              此参数对于延迟重新执行的场景很有用,当尝试重新执行作业时,由于相同的问题,作业会立刻再次失败,该参数便于作业再次失败之前让某些超时相关的故障完全浮出水面(例如尚未完全超时的断开连接)。

              此参数仅在执行重试次数为一次或多次时有效。

              该配置已被弃用,请改用重启策略 。

            • getExecutionMode() / setExecutionMode()。默认的执行模式是 PIPELINED

              设置执行模式以执行程序。

              执行模式定义了数据交换是以批处理方式还是以流方式执行。

            • enableForceKryo() / disableForceKryo()

              默认情况下不强制使用 Kryo。

              强制 GenericTypeInformation 对 POJO 使用 Kryo 序列化器,即使我们可以将它们作为 POJO 进行分析。

              在某些情况下,应该优先启用该配置。

              例如,当 Flink 的内部序列化器无法正确处理 POJO 时。

            • enableForceAvro() / disableForceAvro()

              默认情况下不强制使用 Avro。

              强制 Flink AvroTypeInfo 使用 Avro 序列化器而不是 Kryo 来序列化 Avro 的 POJO。

            • enableObjectReuse() / disableObjectReuse()

              默认情况下,Flink 中不重用对象。

              启用对象重用模式会指示运行时重用用户对象以获得更好的性能。

              当一个算子的用户代码 function 没有意识到这种行为时可能会导致bug。

            • getGlobalJobParameters() / setGlobalJobParameters()

              此方法允许用户将自定义对象设置为作业的全局配置。

              由于 ExecutionConfig 可在所有用户定义的 function 中访问,因此这是一种使配置在作业中全局可用的简单方法。

            • addDefaultKryoSerializer(Class type, Serializer serializer)

              为指定的类型注册 Kryo 序列化器实例。

            • addDefaultKryoSerializer(Class type, Class> serializerClass)

              为指定的类型注册 Kryo 序列化器的类。

            • registerTypeWithKryoSerializer(Class type, Serializer serializer)

              使用 Kryo 注册指定类型并为其指定序列化器。

              通过使用 Kryo 注册类型,该类型的序列化将更加高效。

            • registerKryoType(Class type)

              如果类型最终被 Kryo 序列化,那么它将在 Kryo 中注册,以确保只有标记(整数 ID)被写入。

              如果一个类型没有在 Kryo 注册,它的全限定类名将在每个实例中被序列化,从而导致更高的 I/O 成本。

            • registerPojoType(Class type)

              将指定的类型注册到序列化栈中。

              如果该类型最终被序列化为 POJO,那么该类型将注册到 POJO 序列化器中。

              如果该类型最终被 Kryo 序列化,那么它将在 Kryo 中注册,以确保只有标记被写入。

              如果一个类型没有在 Kryo 注册,它的全限定类名将在每个实例中被序列化,从而导致更高的I/O成本。

              用 registerKryoType() 注册的类型对 Flink 的 Kryo 序列化器实例来说是不可用的。

              • disableAutoTypeRegistration()

                自动类型注册在默认情况下是启用的。

                自动类型注册是将用户代码使用的所有类型(包括子类型)注册到 Kryo 和 POJO 序列化器。

              • setTaskCancellationInterval(long interval)

                设置尝试连续取消正在运行任务的等待时间间隔(以毫秒为单位)。

                当一个任务被取消时,会创建一个新的线程,如果任务线程在一定时间内没有终止,新线程就会定期调用任务线程上的 interrupt() 方法。

                这个参数是指连续调用 interrupt() 的时间间隔,默认设置为 30000 毫秒,或 30秒 。

                通过 getRuntimeContext() 方法在 Rich* function 中访问到的 RuntimeContext 也允许在所有用户定义的 function 中访问 ExecutionConfig。

                二、程序打包和分布式运行

                Flink 程序可以使用 remote environment 在集群上执行。或者,程序可以被打包成 JAR 文件(Java Archives)执行。如果使用命令行的方式执行程序,将程序打包是必需的。

                1、打包程序

                为了能够通过命令行或 web 界面执行打包的 JAR 文件,程序必须使用通过 StreamExecutionEnvironment.getExecutionEnvironment() 获取的 environment。

                当 JAR 被提交到命令行或 web 界面后,该 environment 会扮演集群环境的角色。如果调用 Flink 程序的方式与上述接口不同,该 environment 会扮演本地环境的角色。

                打包程序只要简单地将所有相关的类导出为 JAR 文件,JAR 文件的 manifest 必须指向包含程序入口点(拥有公共 main 方法)的类。

                实现的最简单方法是将 main-class 写入 manifest 中(比如 main-class: org.apache.flinkexample.MyProgram)。

                main-class 属性与 Java 虚拟机通过指令 java -jar pathToTheJarFile 执行 JAR 文件时寻找 main 方法的类是相同的。

                大多数 IDE 提供了在导出 JAR 文件时自动包含该属性的功能。

                2、总结

                调用打包后程序的完整流程包括两步:

                • 搜索 JAR 文件 manifest 中的 main-class 或 program-class 属性。如果两个属性同时存在,program-class 属性会优先于 main-class 属性。对于 JAR manifest 中两个属性都不存在的情况,命令行和 web 界面支持手动传入入口点类名参数。

                • 系统接着调用该类的 main 方法。

                  三、并行执行

                  一个 Flink 程序由多个任务 task 组成(转换/算子、数据源和数据接收器)。一个 task 包括多个并行执行的实例,且每一个实例都处理 task 输入数据的一个子集。一个 task 的并行实例数被称为该 task 的 并行度 (parallelism)。

                  使用 savepoints 时,应该考虑设置最大并行度。当作业从一个 savepoint 恢复时,你可以改变特定算子或着整个程序的并行度,并且此设置会限定整个程序的并行度的上限。由于在 Flink 内部将状态划分为了 key-groups,且性能所限不能无限制地增加 key-groups,因此设定最大并行度是有必要的。

                  1、设置并行度

                  一个 task 的并行度可以从多个层次指定:

                  1)、算子层次

                  单个算子、数据源和数据接收器的并行度可以通过调用 setParallelism()方法来指定。如下所示:

                   // 设置 算子 并行度
                   static void test1() throws Exception {
                       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                       DataStream source = env.socketTextStream("192.168.10.42", 8888)
                               .map(o -> {
                                   String[] lines = o.split(",");
                                   return "name:" + lines[0] + " age: " + lines[1];
                               }).setParallelism(8);// 设置map的并行度
                       source.print();
                       env.execute();
                   }
                  

                  2)、执行环境层次

                  Flink 程序运行在执行环境的上下文中。执行环境为所有执行的算子、数据源、数据接收器 (data sink) 定义了一个默认的并行度。可以显式配置算子层次的并行度去覆盖执行环境的并行度。

                  可以通过调用 setParallelism() 方法指定执行环境的默认并行度。如果想以并行度3来执行所有的算子、数据源和数据接收器。可以在执行环境上设置默认并行度,如下所示:

                  // 设置 执行环境层次 并行度
                  static void test2() throws Exception {
                      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                      env.setParallelism(8);
                      DataStream source = env.socketTextStream("192.168.10.42", 8888)
                              .map(o -> {
                                  String[] lines = o.split(",");
                                  return "name:" + lines[0] + " age: " + lines[1];
                              });
                      source.print();
                      env.execute();
                  }
                  

                  3)、客户端层次

                  将作业提交到 Flink 时可在客户端设定其并行度。客户端可以是 Java 或 Scala 程序,Flink 的命令行接口(CLI)就是一种典型的客户端。

                  在 CLI 客户端中,可以通过 -p 参数指定并行度,例如:

                  ./bin/flink run -p 10 ../examples/*WordCount-java*.jar
                  

                  或者在 Java 程序中,可以通过如下方式指定并行度:

                  说明:

                  1、该种方法比较复杂,是不是相当于把Flink自身的客户端实现重新实现了一遍呢?大致逻辑如下,代码示例

                  2、具体实现可以参考其客户端的实现以及测试用例中的实现。

                  3、客户端的入口类为org.apache.flink.client.cli.CliFrontend;其测试用例类为org.apache.flink.client.program.ClientTest

                  import static org.apache.flink.util.Preconditions.checkNotNull;
                  import java.io.File;
                  import java.net.URL;
                  import java.util.List;
                  import java.util.concurrent.CompletableFuture;
                  import java.util.stream.Stream;
                  import javax.annotation.Nonnull;
                  import org.apache.flink.api.common.JobID;
                  import org.apache.flink.api.common.Plan;
                  import org.apache.flink.api.java.ExecutionEnvironment;
                  import org.apache.flink.api.java.io.DiscardingOutputFormat;
                  import org.apache.flink.client.ClientUtils;
                  import org.apache.flink.client.FlinkPipelineTranslationUtil;
                  import org.apache.flink.client.cli.ExecutionConfigAccessor;
                  import org.apache.flink.client.deployment.ClusterClientJobClientAdapter;
                  import org.apache.flink.client.program.ClusterClient;
                  import org.apache.flink.client.program.MiniClusterClient;
                  import org.apache.flink.client.program.PackagedProgram;
                  import org.apache.flink.configuration.AkkaOptions;
                  import org.apache.flink.configuration.Configuration;
                  import org.apache.flink.configuration.CoreOptions;
                  import org.apache.flink.configuration.DeploymentOptions;
                  import org.apache.flink.configuration.JobManagerOptions;
                  import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
                  import org.apache.flink.core.execution.JobClient;
                  import org.apache.flink.core.execution.PipelineExecutor;
                  import org.apache.flink.core.execution.PipelineExecutorFactory;
                  import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
                  import org.apache.flink.runtime.jobgraph.JobGraph;
                  import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
                  import org.apache.flink.runtime.testutils.InternalMiniClusterExtension;
                  import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
                  /*
                   * @Author: alanchan
                   * @LastEditors: alanchan
                   * @Description: 
                   */
                  public class TestParallelismByClientDemo {
                      private static final String TEST_EXECUTOR_NAME = "test_executor";
                      private static Plan plan;
                  	private static Configuration config;
                      private static final InternalMiniClusterExtension MINI_CLUSTER_RESOURCE = new InternalMiniClusterExtension(new MiniClusterResourceConfiguration.Builder().build());
                      public static void main(String[] args) throws Exception {
                  		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
                  		env.generateSequence(1, 1000).output(new DiscardingOutputFormat<>());
                  		plan = env.createProgramPlan();
                  		config = new Configuration();
                  		config.setString(JobManagerOptions.ADDRESS, "localhost");
                  		config.set(AkkaOptions.ASK_TIMEOUT_DURATION, AkkaOptions.ASK_TIMEOUT_DURATION.defaultValue());
                          // 1、构造PackagedProgram
                          Configuration configuration = new Configuration();
                  		configuration.setString(DeploymentOptions.TARGET, TEST_EXECUTOR_NAME);
                  		configuration.set(CoreOptions.DEFAULT_PARALLELISM, 2);
                  		// 
                  		// org.apache.flink
                  		// flink-runtime
                  		// ${flink.version}
                  		// 
                          String entryPointClass = TestExecute.class.getName();
                          String jarFilePath = "../examples/flinktest.jar";//打包jar文件的路径
                          File jarFile = new File(jarFilePath);
                          List classpaths = PackagedProgram.getJobJarAndDependencies(jarFile,entryPointClass);
                      //    Creates an instance that wraps the plan defined in the jar file using the given arguments
                      //    For generating the plan the class defined in the className parameter is used.
                      //    private PackagedProgram(
                      //            @Nullable File jarFile, //jarFile The jar file which contains the plan.
                      //            List classpaths, //classpaths Additional classpath URLs needed by the Program.
                      //            @Nullable String entryPointClassName, //entryPointClassName Name of the class which generates the plan. Overrides the class defined in the jar file manifest.
                      //            Configuration configuration, //configuration Flink configuration which affects the classloading policy of the Program execution.
                      //            SavepointRestoreSettings savepointRestoreSettings,
                      //            String... args) //args Optional. The arguments used to create the pact plan, depend on implementation of the pact plan. See getDescription().
                          PackagedProgram program = PackagedProgram.newBuilder()
                                      .setJarFile(jarFile)
                                      .setUserClassPaths(classpaths)
                                      .setEntryPointClassName(entryPointClass)
                                      .setConfiguration(configuration)
                                      .setSavepointRestoreSettings(SavepointRestoreSettings.fromConfiguration(configuration))
                                      .setArguments(args)
                                      .build();
                          // 2、构造客户端执行环境
                          // public static void executeProgram(
                          //     PipelineExecutorServiceLoader executorServiceLoader,
                          //     Configuration configuration,
                          //     PackagedProgram program,
                          //     boolean enforceSingleJobExecution,
                          //     boolean suppressSysout)
                          // ClientUtils.executeProgram(new DefaultExecutorServiceLoader(), configuration, program, false, false);
                          ClusterClient clusterClient = new MiniClusterClient(new Configuration(), MINI_CLUSTER_RESOURCE.getMiniCluster());
                          ClientUtils.executeProgram(new TestExecutorServiceLoader(clusterClient, plan), configuration, program, false, false);
                      }
                      public static final class TestExecute {
                  		public static void main(String[] args) throws Exception {
                  			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
                  			for (int i = 0; i < 2; i++) {
                  				env.fromElements(1, 2).output(new DiscardingOutputFormat<>());
                  				JobClient jc = env.executeAsync();
                  				jc.getJobExecutionResult();
                  			}
                  		}
                  	}
                      private static final class TestExecutorServiceLoader implements PipelineExecutorServiceLoader {
                  		private final ClusterClient clusterClient;
                  		private final Plan plan;
                  		TestExecutorServiceLoader(final ClusterClient clusterClient, final Plan plan) {
                  			this.clusterClient = checkNotNull(clusterClient);
                  			this.plan = checkNotNull(plan);
                  		}
                  		@Override
                  		public PipelineExecutorFactory getExecutorFactory(@Nonnull Configuration configuration) {
                  			return new PipelineExecutorFactory() {
                  				@Override
                  				public String getName() {
                  					return "my-name";
                  				}
                  				@Override
                  				public boolean isCompatibleWith(@Nonnull Configuration configuration) {
                  					return TEST_EXECUTOR_NAME.equalsIgnoreCase(configuration.getString(DeploymentOptions.TARGET));
                  				}
                  				@Override
                  				public PipelineExecutor getExecutor(@Nonnull Configuration configuration) {
                  					return (pipeline, config, classLoader) -> {
                  						final int parallelism = config.getInteger(CoreOptions.DEFAULT_PARALLELISM);
                  						final JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(classLoader, plan, config, parallelism);
                  						final ExecutionConfigAccessor accessor = ExecutionConfigAccessor.fromConfiguration(config);
                  						jobGraph.addJars(accessor.getJars());
                  						jobGraph.setClasspaths(accessor.getClasspaths());
                  						final JobID jobID = clusterClient.submitJob(jobGraph).get();
                  						return CompletableFuture.completedFuture(new ClusterClientJobClientAdapter<>(() -> clusterClient, jobID, classLoader));
                  					};
                  				}
                  			};
                  		}
                  		@Override
                  		public Stream getExecutorNames() {
                  			throw new UnsupportedOperationException("not implemented");
                  		}
                  	}
                  }
                  

                  4)、系统层次

                  可以通过设置 ./conf/flink-conf.yaml 文件中的 parallelism.default 参数,在系统层次来指定所有执行环境的默认并行度。

                  更多的信息参考下文链接:

                  11、Flink配置flink-conf.yaml详细说明(HA配置、checkpoint、web、安全、zookeeper、historyserver、workers、zoo.cfg)

                  2、设置最大并行度

                  最大并行度可以在所有设置并行度的地方进行设定(客户端和系统层次除外)。与调用 setParallelism() 方法修改并行度相似,你可以通过调用 setMaxParallelism() 方法来设定最大并行度。

                  默认的最大并行度等于将 operatorParallelism + (operatorParallelism / 2) 值四舍五入到大于等于该值的一个整型值,并且这个整型值是 2 的幂次方,注意默认最大并行度下限为 128,上限为 32768。

                  为最大并行度设置一个非常大的值将会降低性能,因为一些 state backends 需要维持内部的数据结构,而这些数据结构将会随着 key-groups 的数目而扩张(key-group 是状态重新分配的最小单元)。

                  从之前的作业恢复时,改变该作业的最大并发度将会导致状态不兼容。

                  以上,本文介绍了Flink的管理执行的三个内容,即执行配置、打包和分布式运行以及并行执行(设置并行度的几种方式)。

网友评论

搜索
最新文章
热门文章
热门标签