上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

docker 搭建 flink 并上传任务

guduadmin41天前

文章目录

        • 一、docker 搭建 flink
          • 1、选择合适的 flink 版本
          • 2、重新创建 JobManager、TaskManager 容器并挂载配置文件
          • 二、flink 简单示例
            • 1、创建项目架构
            • 2、批处理简单示例
            • 3、流处理简单示例
            • 4、上传 flink 集群
              • ①、UI 界面提交任务
              • ②、命令提交任务
              • 5、web-ui 提交查看撤销任务
              • 三、待解决
                一、docker 搭建 flink
                1、选择合适的 flink 版本

                docker 安装就不介绍了,去 dockerHub 搜索 flink 镜像,选择合适的版本安装 https://hub.docker.com/_/flink/tags

                使用 docker 命令 docker pull flink: 1.16.0-scala_2.12-java8拉去镜像

                docker 搭建 flink 并上传任务,在这里插入图片描述,第1张

                1.16.0-scala_2.12-java8 镜像版本说明,flink 1.16.0,flink 内置 scala 版本 2.12,Java 版本 8

                建议先简单启动 flink 容器 JobManager、TaskManager 两个容器将配置文件复制出来方便挂载

                # 创建 docker 网络,方便 JobManager 和 TaskManager 内部访问
                 docker network create flink-network
                # 创建 JobManager 
                 docker run \
                  -itd \
                  --name=jobmanager \
                  --publish 8081:8081 \
                  --network flink-network \
                  --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" \
                  flink:1.16.0-scala_2.12-java8 jobmanager 
                  
                # 创建 TaskManager 
                 docker run \
                  -itd \
                  --name=taskmanager \
                  --network flink-network \
                  --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" \
                  flink:1.16.0-scala_2.12-java8 taskmanager 
                

                启动成功

                docker 搭建 flink 并上传任务,在这里插入图片描述,第2张

                访问 8081 端口如下

                docker 搭建 flink 并上传任务,在这里插入图片描述,第3张

                copy 配置文件

                # jobmanager 容器
                 docker cp jobmanager:/opt/flink/conf ./JobManager/
                # taskmanager 容器
                docker cp taskmanager:/opt/flink/conf ./TaskManager/
                
                2、重新创建 JobManager、TaskManager 容器并挂载配置文件

                修改 JobManager/conf/flink-conf.yaml web 端口号为 18081

                docker 搭建 flink 并上传任务,在这里插入图片描述,第4张

                修改 TaskManager/conf/flink-conf.yaml 容器任务槽为 5

                docker 搭建 flink 并上传任务,在这里插入图片描述,第5张

                启动容器挂载配置文件

                # 启动 jobmanager   
                docker run -itd  -v /root/docker/flink/JobManager/conf/:/opt/flink/conf/ --name=jobmanager --publish 18081:18081 --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" --network flink-network flink:1.16.0-scala_2.12-java8 jobmanager
                # 启动 taskmanager   
                docker run -itd  -v /root/docker/flink/TaskManager/conf/:/opt/flink/conf/ --name=taskmanager --network flink-network --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager"  flink:1.16.0-scala_2.12-java8 taskmanager
                

                参数解释

                • FLINK_PROPERTIES=“jobmanager.rpc.address: jobmanager” rpc 地址,必须设置,否则 jobmanager 和 taskmanager 的 rpc 地址都是随机生成,会连接不上,当然你也可以在直接修改配置文件 flink-conf.yaml

                  如下两个容器启动成功,可以看到 web 端口为 18081,taskmanager 启动一个,包含 5 个任务槽

                  docker 搭建 flink 并上传任务,在这里插入图片描述,第6张

                  二、flink 简单示例

                  官网参考地址:https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/configuration/overview/#getting-started

                  1、创建项目架构

                  使用 maven 命令指定原型 Flink Maven Archetype 快速创建一个包含了必要依赖的 Flink 程序骨架,自定义项目 groupId、artifactId、package 等信息

                  mvn archetype:generate ^
                    -DarchetypeGroupId=org.apache.flink ^
                    -DarchetypeArtifactId=flink-quickstart-java ^
                    -DarchetypeVersion=1.16.0	^
                    -DgroupId=com.ye ^
                    -DartifactId=flink-study ^
                    -Dversion=0.1 ^
                    -Dpackage=com.ye ^
                    -DinteractiveMode=false
                  

                  下载成功打开项目目录

                  docker 搭建 flink 并上传任务,在这里插入图片描述,第7张

                  如下:注意运行需要设置启动参数,否则启动会找不到类,因为 pom.xml 文件 flink 相关包都添加了 provided 表示只用于生产环境,另一种方法就是将 provided 修改为runtime

                  docker 搭建 flink 并上传任务,在这里插入图片描述,第8张

                  流处理和批处理在 flink 低版本(貌似1.12)需要区分,目前都使用流处理写法

                  2、批处理简单示例

                  下面代码用来统计单词出现的的次数

                  public class DataBatchJob {
                      /* 下面示例统计单词出现的次数 */
                      public static void main(String[] args) throws Exception {
                          // 获取 flink 环境
                          final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                          // 添加数据源
                          DataStreamSource streamSource = env.fromElements("hello world", "hello flink", "flink", "hello", "world");
                          // 对传入的流数据分组
                          SingleOutputStreamOperator> streamOperator = streamSource.flatMap(new FlatMapFunction>() {
                              // value 传入的数据,out
                              // Tuple2 二元组
                              // out 传出的值
                              @Override
                              public void flatMap(String value, Collector> out) throws Exception {
                                  String[] split = value.split(" ");
                                  for (String s : split) {
                                      out.collect(Tuple2.of(s, 1));
                                  }
                              }
                          });
                          // 按二元组的第 0 个位置分组
                          KeyedStream, Tuple> keyBy = streamOperator.keyBy(0);
                          // 按二元组的第 1 个位置求和
                          SingleOutputStreamOperator> sum = keyBy.sum(1);
                          sum.print();
                          env.execute("统计单词出现的次数");
                      }
                  }
                  

                  执行结果如下

                  docker 搭建 flink 并上传任务,在这里插入图片描述,第9张

                  上传 flink 集群

                  3、流处理简单示例

                  下面示例通过 socket 文本源,对输入的大于 500 和小于 500 的分别求和

                  public class DataStreamJob {
                      private static final Logger logger = LoggerFactory.getLogger(DataStreamJob.class);
                      /* 下面示例对大于 500 和小于 500 的分别求和 */
                      public static void main(String[] args) throws Exception {
                          
                          // 获取 flink 环境
                          final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                          // 添加 socket 文本流数据源
                          //DataStreamSource streamSource = env.fromElements("200", "100", "6000", "500", "2000", "300", "1500", "900");
                          DataStreamSource streamSource = env.socketTextStream("127.0.0.1", 7777);
                          // 对大于 500 和小于 500 进行分组
                          KeyedStream stringKeyedStream = streamSource.keyBy(new KeySelector() {
                              @Override
                              public String getKey(String s) throws Exception {
                                  int i = Integer.parseInt(s);
                                  return i > 500 ? "ge" : "lt";
                              }
                          });
                          // 开 10 秒滚动窗口,每 10 秒为一批数据 【00:00:00 ~ 00:00:10)、【00:00:10 ~ 00:00:20)左闭右开区间
                          WindowedStream windowedStream = stringKeyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
                          
                          // 窗口处理函数,泛型 String, Integer, String, TimeWindow 依次对应 输入类型、输出类型、 KEY类型(即keyBy 返回的类型), 窗口
                          SingleOutputStreamOperator outputStreamOperator = windowedStream.process(new ProcessWindowFunction() {
                              /*
                              * key: 分组的 key
                              * context: 上下文信息
                              * elements: 传过来的一批数据
                              * out: 数据输出
                              * */
                              @Override
                              public void process(String key, ProcessWindowFunction.Context context, Iterable elements, Collector out) throws Exception {
                                  System.out.println(key);
                                  AtomicInteger sum = new AtomicInteger();
                                  elements.forEach(item -> sum.addAndGet(Integer.parseInt(item)));
                                  out.collect(sum.get());
                              }
                          });
                          // 输出
                          outputStreamOperator.print();
                          env.execute("分组求和");
                      }
                  }
                  

                  在 window 或 Linux 开启 Socket 文本流测试

                  docker 搭建 flink 并上传任务,在这里插入图片描述,第10张

                  4、上传 flink 集群

                  打包项目:可以在 pom.xml 修改启动类,也可以在命令启动或者 ui 界面上传设置启动类参数

                  docker 搭建 flink 并上传任务,在这里插入图片描述,第11张

                  ①、UI 界面提交任务

                  使用 ui 界面上传 jar 到 flink 集群,点击 submit 运行

                  docker 搭建 flink 并上传任务,在这里插入图片描述,第12张

                  ②、命令提交任务
                  # 如果集群( 即JobManager) 在当前服务器可以使用如下命令
                  	$ bin/flink run -Dexecution.runtime-mode=BATCH 
                  # 如果集群( 即JobManager) 不在当前服务器,在 TaskManager 服务器提交作业可以使用如下命令
                  	# -m 指定 JobManager 服务器地址
                  	# -c 指定作业入口程序
                  	# -p 指定并行度
                  	$ bin/flink run -m 192.168.1.1:8081 -c com.ye.StreamWordCount -p 2 
                  # 撤销任务	
                  	$ bin/flink cancle 
                  
                  5、web-ui 提交查看撤销任务

                  批处理运行完成docker 搭建 flink 并上传任务,在这里插入图片描述,第13张

                  流处理正在运行

                  docker 搭建 flink 并上传任务,在这里插入图片描述,第14张

                  三、待解决

                  使用 docker 启动的 flink 集群发现 UI 界面的 stdout 没有 print 输出

                  docker 搭建 flink 并上传任务,在这里插入图片描述,第15张

网友评论

搜索
最新文章
热门文章
热门标签