在此之前需要部署一下私人docker仓库,教程搭建 Docker 镜像仓库
注意:每台节点的daemon.json都需要配置"insecure-registries": ["http://主机IP:8080"] 并重启
一、session 模式
Session 模式是指在 Kubernetes 上启动一个共享的 Flink 集群(由 JobManager 和多个 TaskManagers 组成),然后多个 Flink 作业可以提交到这个共享集群上运行。这个模式下的集群会长期运行,直到用户手动停止它。这种模式适合多个作业需要频繁启动和停止,且对集群资源的利用率要求较高的场景。
Kubernetes 中的 Flink Session 集群部署至少包含三个组件:
-
运行JobManager的部署
-
TaskManagers池的部署
-
暴露JobManager的 REST 和 UI 端口的服务
1.1 Native Kubernetes 模式
Flink 的 Native Kubernetes 模式允许用户将 Apache Flink 无缝集成至 Kubernetes 环境中,实现在 Kubernetes 上运行 Flink 作业和应用程序。这种模式的主要优点是 Flink 能够利用 Kubernetes 提供的资源编排和管理能力,简化 Flink 集群的部署和管理。
在 Native Kubernetes 模式下,Flink 集群的部署和管理是通过 Flink 的 Kubernetes Operator 或者是直接使用 kubectl 命令行工具来完成的。Flink 的每个组件都被作为 Kubernetes 资源(如Pods, Services等)来管理。
1.1.1 构建镜像 Dockerfile
1.创建dockerfile FROM flink:1.16.2 RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone RUN export LANG=zh_CN.UTF-8 2.开始构建镜像 docker build -t 192.168.20.62:2333/bigdata/flink-session:1.16.2 3.上传镜像 docker push 192.168.20.62:2333/bigdata/flink-session:1.16.2
1.1.2 创建命名空间和 serviceaccount
# 创建namespace kubectl create ns flink # 创建serviceaccount kubectl create serviceaccount flink-service-account -n flink # 用户授权 kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
1.1.3 创建 flink 集群
./bin/kubernetes-session.sh \ -Dkubernetes.cluster-id=my-first-flink-cluster \ -Dkubernetes.container.image=192.168.20.62:2333/bigdata/flink-session:1.16.2 \ -Dkubernetes.namespace=flink \ -Dkubernetes.jobmanager.service-account=flink-service-account \ -Dkubernetes.rest-service.exposed.type=NodePort
1.1.4 提交任务
./bin/flink run \ --target kubernetes-session \ -Dkubernetes.cluster-id=my-first-flink-cluster \ -Dkubernetes.namespace=flink \ -Dkubernetes.jobmanager.service-account=flink-service-account \ ./examples/streaming/TopSpeedWindowing.jar \ -Dkubernetes.taskmanager.cpu=2000m \ -Dexternal-resource.limits.kubernetes.cpu=4000m \ -Dexternal-resource.limits.kubernetes.memory=10Gi \ -Dexternal-resource.requests.kubernetes.cpu=2000m \ -Dexternal-resource.requests.kubernetes.memory=8Gi \ -Dkubernetes.taskmanager.cpu=2000m \
1.1.5 删除 flink 集群
kubectl delete deployment/my-first-flink-cluster -n flink kubectl delete ns flink --force
1.2 Standalone 模式
Standalone 模式通常指的是在 Kubernetes 集群上运行 Flink 的一个单独集群环境,但它不是专门为 Kubernetes 设计的。在 Kubernetes 上使用 Standalone 模式意味着你将手动设置 Flink 集群(包括 JobManager 和 TaskManagers),而不是通过 Kubernetes Operator 或者其他 Kubernetes 原生的资源调度和管理机制。换句话说,在这个模式下,Flink 集群的各个组件(JobManager和TaskManagers)运行在 Kubernetes Pod 中,但是它们的生命周期管理并不是通过 Kubernetes 原生的支持来实现的,而是类似于在任何其他环境中部署 Flink 的传统方式。
1.2.1 创建docker-entrypoint.sh脚本
#!/usr/bin/env bash ############################################################################### # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ############################################################################### COMMAND_STANDALONE="standalone-job" COMMAND_HISTORY_SERVER="history-server" # If unspecified, the hostname of the container is taken as the JobManager address JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)} CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml" drop_privs_cmd() { if [ $(id -u) != 0 ]; then # Don't need to drop privs if EUID != 0 return elif [ -x /sbin/su-exec ]; then # Alpine echo su-exec admin else # Others echo gosu admin fi } copy_plugins_if_required() { if [ -z "$ENABLE_BUILT_IN_PLUGINS" ]; then return 0 fi echo "Enabling required built-in plugins" for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" | tr ';' ' '); do echo "Linking ${target_plugin} to plugin directory" plugin_name=${target_plugin%.jar} mkdir -p "${FLINK_HOME}/plugins/${plugin_name}" if [ ! -e "${FLINK_HOME}/opt/${target_plugin}" ]; then echo "Plugin ${target_plugin} does not exist. Exiting." exit 1 else ln -fs "${FLINK_HOME}/opt/${target_plugin}" "${FLINK_HOME}/plugins/${plugin_name}" echo "Successfully enabled ${target_plugin}" fi done } set_config_option() { local option=$1 local value=$2 # escape periods for usage in regular expressions local escaped_option=$(echo ${option} | sed -e "s/\./\\\./g") # either override an existing entry, or append a new one if grep -E "^${escaped_option}:.*" "${CONF_FILE}" > /dev/null; then sed -i -e "s/${escaped_option}:.*/$option: $value/g" "${CONF_FILE}" else echo "${option}: ${value}" >> "${CONF_FILE}" fi } prepare_configuration() { set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS} set_config_option blob.server.port 6124 set_config_option query.server.port 6125 if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; then set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS} fi if [ -n "${FLINK_PROPERTIES}" ]; then echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}" fi envsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}" } maybe_enable_jemalloc() { if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; then JEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so" JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so" if [ -f "$JEMALLOC_PATH" ]; then export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATH elif [ -f "$JEMALLOC_FALLBACK" ]; then export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACK else if [ "$JEMALLOC_PATH" = "$JEMALLOC_FALLBACK" ]; then MSG_PATH=$JEMALLOC_PATH else MSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK" fi echo "WARNING: attempted to load jemalloc from $MSG_PATH but the library couldn't be found. glibc will be used instead." fi fi } maybe_enable_jemalloc copy_plugins_if_required prepare_configuration args=("$@") if [ "$1" = "help" ]; then printf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER})\n" printf " Or $(basename "$0") help\n\n" printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the 'DISABLE_JEMALLOC' environment variable to 'true'.\n" exit 0 elif [ "$1" = "jobmanager" ]; then args=("${args[@]:1}") echo "Starting Job Manager" exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}" elif [ "$1" = ${COMMAND_STANDALONE} ]; then args=("${args[@]:1}") echo "Starting Job Manager" exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}" elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; then args=("${args[@]:1}") echo "Starting History Server" exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}" elif [ "$1" = "taskmanager" ]; then args=("${args[@]:1}") echo "Starting Task Manager" exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}" fi args=("${args[@]}") # Running command in pass-through mode exec $(drop_privs_cmd) "${args[@]}"
1.2.2 编排 Dockerfile
FROM centos:7.9.2009 USER root # 安装常用工具 RUN yum install -y vim tar wget curl rsync bzip2 iptables tcpdump less telnet net-tools lsof # 设置时区,默认是UTC时区 RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone RUN mkdir -p /opt/apache ADD jdk-8u231-linux-x64.tar.gz /opt/apache/ ADD flink-1.16.2-bin-scala_2.12.tgz /opt/apache/ ENV FLINK_HOME /opt/apache/flink-1.16.2 ENV JAVA_HOME /opt/apache/jdk1.8.0_231 ENV PATH $JAVA_HOME/bin:$PATH # 创建用户应用jar目录 RUN mkdir $FLINK_HOME/usrlib/ #RUN mkdir home COPY docker-entrypoint.sh /opt/apache/ RUN chmod +x /opt/apache/docker-entrypoint.sh RUN groupadd --system --gid=9999 admin && useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=admin admin RUN chown -R admin:admin /opt/apache #设置的工作目录 WORKDIR $FLINK_HOME # 对外暴露端口 EXPOSE 6123 8081 # 执行脚本,构建镜像时不执行,运行实例才会执行 ENTRYPOINT ["/opt/apache/docker-entrypoint.sh"] CMD ["help"]
1.2.3 开始构建镜像
docker build -t 192.168.20.62:2333/bigdata/flink-centos-admin:1.16.2 . --no-cache # 上传镜像 docker push 192.168.20.62:2333/bigdata/flink-centos-admin:1.16.2
1.2.4 创建命名空间和 serviceaccount
# 创建namespace kubectl create ns flink # 创建serviceaccount kubectl create serviceaccount flink-service-account -n flink # 用户授权 kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
1.2.5 编排 yaml 文件
1.2.5.1 flink-configuration-configmap.yaml
apiVersion: v1 kind: ConfigMap metadata: name: flink-config labels: app: flink data: flink-conf.yaml: |+ jobmanager.rpc.address: flink-jobmanager taskmanager.numberOfTaskSlots: 2 blob.server.port: 6124 jobmanager.rpc.port: 6123 taskmanager.rpc.port: 6122 queryable-state.proxy.ports: 6125 jobmanager.memory.process.size: 3200m taskmanager.memory.process.size: 2728m taskmanager.memory.flink.size: 2280m parallelism.default: 2 log4j-console.properties: |+ # This affects logging for both user code and Flink rootLogger.level = INFO rootLogger.appenderRef.console.ref = ConsoleAppender rootLogger.appenderRef.rolling.ref = RollingFileAppender # Uncomment this if you want to _only_ change Flink's logging #logger.flink.name = org.apache.flink #logger.flink.level = INFO # The following lines keep the log level of common libraries/connectors on # log level INFO. The root logger does not override this. You have to manually # change the log levels here. logger.akka.name = akka logger.akka.level = INFO logger.kafka.name= org.apache.kafka logger.kafka.level = INFO logger.hadoop.name = org.apache.hadoop logger.hadoop.level = INFO logger.zookeeper.name = org.apache.zookeeper logger.zookeeper.level = INFO # Log all infos to the console appender.console.name = ConsoleAppender appender.console.type = CONSOLE appender.console.layout.type = PatternLayout appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n # Log all infos in the given rolling file appender.rolling.name = RollingFileAppender appender.rolling.type = RollingFile appender.rolling.append = false appender.rolling.fileName = ${sys:log.file} appender.rolling.filePattern = ${sys:log.file}.%i appender.rolling.layout.type = PatternLayout appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n appender.rolling.policies.type = Policies appender.rolling.policies.size.type = SizeBasedTriggeringPolicy appender.rolling.policies.size.size=100MB appender.rolling.strategy.type = DefaultRolloverStrategy appender.rolling.strategy.max = 10 # Suppress the irrelevant (wrong) warnings from the Netty channel handler logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline logger.netty.level = OFF
1.2.5.2 jobmanager-service.yaml
apiVersion: v1 kind: Service metadata: name: flink-jobmanager spec: type: ClusterIP ports: - name: rpc port: 6123 - name: blob-server port: 6124 - name: webui port: 8081 selector: app: flink component: jobmanager
1.2.5.3 jobmanager-rest-service.yaml
将 jobmanager rest端口公开为公共 Kubernetes 节点的端口
apiVersion: v1 kind: Service metadata: name: flink-jobmanager-rest spec: type: NodePort ports: - name: rest port: 8081 targetPort: 8081 nodePort: 30081 selector: app: flink component: jobmanager
1.2.5.4 taskmanager-query-state-service.yaml
公开 TaskManager 端口以访问可查询状态作为公共 Kubernetes 节点的端口
apiVersion: v1 kind: Service metadata: name: flink-taskmanager-query-state spec: type: NodePort ports: - name: query-state port: 6125 targetPort: 6125 nodePort: 30025 selector: app: flink component: taskmanager
1.2.5.5 jobmanager-session-deployment-non-ha.yaml
apiVersion: apps/v1 kind: Deployment metadata: name: flink-jobmanager spec: replicas: 1 selector: matchLabels: app: flink component: jobmanager template: metadata: labels: app: flink component: jobmanager spec: containers: - name: jobmanager image: 192.168.20.62:2333/bigdata/flink-centos-admin:1.16.2 args: ["jobmanager"] ports: - containerPort: 6123 name: rpc - containerPort: 6124 name: blob-server - containerPort: 8081 name: webui livenessProbe: tcpSocket: port: 6123 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: - name: flink-config-volume mountPath: /opt/apache/flink-1.16.2/conf/ securityContext: runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary volumes: - name: flink-config-volume configMap: name: flink-config items: - key: flink-conf.yaml path: flink-conf.yaml - key: log4j-console.properties path: log4j-console.properties
1.2.5.6 taskmanager-session-deployment.yaml
apiVersion: apps/v1 kind: Deployment metadata: name: flink-taskmanager spec: replicas: 2 selector: matchLabels: app: flink component: taskmanager template: metadata: labels: app: flink component: taskmanager spec: containers: - name: taskmanager image: 192.168.20.62:2333/bigdata/flink-centos-admin:1.16.2 args: ["taskmanager"] ports: - containerPort: 6122 name: rpc - containerPort: 6125 name: query-state livenessProbe: tcpSocket: port: 6122 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: - name: flink-config-volume mountPath: /opt/apache/flink-1.16.2/conf/ securityContext: runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary volumes: - name: flink-config-volume configMap: name: flink-config items: - key: flink-conf.yaml path: flink-conf.yaml - key: log4j-console.properties path: log4j-console.properties
1.2.6 创建 flink 集群
kubectl create ns flink # Configuration and service definition kubectl create -f flink-configuration-configmap.yaml -n flink # service kubectl create -f jobmanager-service.yaml -n flink kubectl create -f jobmanager-rest-service.yaml -n flink kubectl create -f taskmanager-query-state-service.yaml -n flink # Create the deployments for the cluster kubectl create -f jobmanager-session-deployment-non-ha.yaml -n flink kubectl create -f taskmanager-session-deployment.yaml -n flink
1.2.7 提交任务
./bin/flink run -m 192.168.20.62:30081 ./examples/streaming/TopSpeedWindowing.jar
1.2.8 删除集群
kubectl delete -f jobmanager-service.yaml -n flink kubectl delete -f flink-configuration-configmap.yaml -n flink kubectl delete -f taskmanager-session-deployment.yaml -n flink kubectl delete -f jobmanager-session-deployment.yaml -n flink kubectl delete ns flink --force
二 、application 模式(推荐)
Kubernetes 中一个基本的 Flink Application 集群部署包含三个组件
-
运行JobManager的应用程序
-
TaskManagers池的部署
-
暴露JobManager的 REST 和 UI 端口的服务
2.1 Native Kubernetes 模式(常用)
2.1.1 构建镜像 Dockerfile
FROM flink:1.16.2 RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone RUN export LANG=zh_CN.UTF-8 RUN mkdir -p $FLINK_HOME/usrlib COPY ./flink-1.16.2/examples/streaming/TopSpeedWindowing.jar /opt/flink/usrlib/ 开始构建镜像 docker build -t 192.168.20.62:2333/bigdata/flink-application:1.16.2 . --no-cache docker push 192.168.20.62:2333/bigdata/flink-application:1.16.2
2.1.2 创建命名空间和 serviceacount
# 创建namespace kubectl create ns flink # 创建serviceaccount kubectl create serviceaccount flink-service-account -n flink # 用户授权 kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
2.1.3 创建 flink 集群并提交任务
./bin/flink run-application \ --target kubernetes-application \ -Dkubernetes.cluster-id=my-first-application-cluster \ -Dkubernetes.container.image=192.168.20.62:2333/bigdata/flink-application:1.16.2 \ -Dkubernetes.jobmanager.replicas=1 \ -Dkubernetes.namespace=flink \ -Dkubernetes.jobmanager.service-account=flink-service-account \ -Dexternal-resource.limits.kubernetes.cpu=2000m \ -Dexternal-resource.limits.kubernetes.memory=2Gi \ -Dexternal-resource.requests.kubernetes.cpu=1000m \ -Dexternal-resource.requests.kubernetes.memory=1Gi \ -Dkubernetes.rest-service.exposed.type=NodePort \ local:///opt/flink/usrlib/TopSpeedWindowing.jar
local是application模式中唯一支持的方案。local 代表本地环境,这里即 pod 或者容器环境,并非宿主机。
2.1.4 删除 flink 集群
kubectl delete deployment/my-first-application-cluster -n flink kubectl delete ns flink --force
2.2 Standalone 模式
在此之前需要使用nfs设置一个共享目录,配置文件中设置共享目录是/mnt/bigdata/flink/usrlib
NFS共享存储服务
2.2.1 创建启动脚本docker-entrypoint.sh
#!/usr/bin/env bash ############################################################################### # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ############################################################################### COMMAND_STANDALONE="standalone-job" COMMAND_HISTORY_SERVER="history-server" # If unspecified, the hostname of the container is taken as the JobManager address JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)} CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml" drop_privs_cmd() { if [ $(id -u) != 0 ]; then # Don't need to drop privs if EUID != 0 return elif [ -x /sbin/su-exec ]; then # Alpine echo su-exec admin else # Others echo gosu admin fi } copy_plugins_if_required() { if [ -z "$ENABLE_BUILT_IN_PLUGINS" ]; then return 0 fi echo "Enabling required built-in plugins" for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" | tr ';' ' '); do echo "Linking ${target_plugin} to plugin directory" plugin_name=${target_plugin%.jar} mkdir -p "${FLINK_HOME}/plugins/${plugin_name}" if [ ! -e "${FLINK_HOME}/opt/${target_plugin}" ]; then echo "Plugin ${target_plugin} does not exist. Exiting." exit 1 else ln -fs "${FLINK_HOME}/opt/${target_plugin}" "${FLINK_HOME}/plugins/${plugin_name}" echo "Successfully enabled ${target_plugin}" fi done } set_config_option() { local option=$1 local value=$2 # escape periods for usage in regular expressions local escaped_option=$(echo ${option} | sed -e "s/\./\\\./g") # either override an existing entry, or append a new one if grep -E "^${escaped_option}:.*" "${CONF_FILE}" > /dev/null; then sed -i -e "s/${escaped_option}:.*/$option: $value/g" "${CONF_FILE}" else echo "${option}: ${value}" >> "${CONF_FILE}" fi } prepare_configuration() { set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS} set_config_option blob.server.port 6124 set_config_option query.server.port 6125 if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; then set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS} fi if [ -n "${FLINK_PROPERTIES}" ]; then echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}" fi envsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}" } maybe_enable_jemalloc() { if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; then JEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so" JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so" if [ -f "$JEMALLOC_PATH" ]; then export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATH elif [ -f "$JEMALLOC_FALLBACK" ]; then export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACK else if [ "$JEMALLOC_PATH" = "$JEMALLOC_FALLBACK" ]; then MSG_PATH=$JEMALLOC_PATH else MSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK" fi echo "WARNING: attempted to load jemalloc from $MSG_PATH but the library couldn't be found. glibc will be used instead." fi fi } maybe_enable_jemalloc copy_plugins_if_required prepare_configuration args=("$@") if [ "$1" = "help" ]; then printf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER})\n" printf " Or $(basename "$0") help\n\n" printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the 'DISABLE_JEMALLOC' environment variable to 'true'.\n" exit 0 elif [ "$1" = "jobmanager" ]; then args=("${args[@]:1}") echo "Starting Job Manager" exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}" elif [ "$1" = ${COMMAND_STANDALONE} ]; then args=("${args[@]:1}") echo "Starting Job Manager" exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}" elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; then args=("${args[@]:1}") echo "Starting History Server" exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}" elif [ "$1" = "taskmanager" ]; then args=("${args[@]:1}") echo "Starting Task Manager" exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}" fi args=("${args[@]}") # Running command in pass-through mode exec $(drop_privs_cmd) "${args[@]}"
2.2.2 编排Dockerfile
FROM centos:7.9.2009 USER root # 安装常用工具 RUN yum install -y vim tar wget curl rsync bzip2 iptables tcpdump less telnet net-tools lsof # 设置时区,默认是UTC时区 RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone RUN mkdir -p /opt/apache ADD jdk-8u231-linux-x64.tar.gz /opt/apache/ ADD flink-1.16.2-bin-scala_2.12.tgz /opt/apache/ ENV FLINK_HOME /opt/apache/flink-1.16.2 ENV JAVA_HOME /opt/apache/jdk1.8.0_231 ENV PATH $JAVA_HOME/bin:$PATH # 创建用户应用jar目录 RUN mkdir $FLINK_HOME/usrlib/ #RUN mkdir home COPY docker-entrypoint.sh /opt/apache/ COPY / RUN groupadd --system --gid=9999 admin && useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=admin admin RUN chown -R admin:admin /opt/apache RUN chmod +x /opt/apache/docker-entrypoint.sh #设置的工作目录 WORKDIR $FLINK_HOME # 对外暴露端口 EXPOSE 6123 8081 # 执行脚本,构建镜像时不执行,运行实例才会执行 ENTRYPOINT ["/opt/apache/docker-entrypoint.sh"] CMD ["help"] docker build -t 192.168.20.62:2333/bigdata/flink-centos-admin-application:1.16.2 . --no-cache docker push 192.168.20.62:2333/bigdata/flink-centos-admin-application:1.16.2
2.2.3 创建命名空间和 serviceacount
# 创建namespace kubectl create ns flink # 创建serviceaccount kubectl create serviceaccount flink-service-account -n flink # 用户授权 kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
2.2.4 flink-configuration-configmap.yaml
apiVersion: v1 kind: ConfigMap metadata: name: flink-config labels: app: flink data: flink-conf.yaml: |+ jobmanager.rpc.address: flink-jobmanager taskmanager.numberOfTaskSlots: 2 blob.server.port: 6124 jobmanager.rpc.port: 6123 taskmanager.rpc.port: 6122 queryable-state.proxy.ports: 6125 jobmanager.memory.process.size: 3200m taskmanager.memory.process.size: 2728m taskmanager.memory.flink.size: 2280m parallelism.default: 2 log4j-console.properties: |+ # This affects logging for both user code and Flink rootLogger.level = INFO rootLogger.appenderRef.console.ref = ConsoleAppender rootLogger.appenderRef.rolling.ref = RollingFileAppender # Uncomment this if you want to _only_ change Flink's logging #logger.flink.name = org.apache.flink #logger.flink.level = INFO # The following lines keep the log level of common libraries/connectors on # log level INFO. The root logger does not override this. You have to manually # change the log levels here. logger.akka.name = akka logger.akka.level = INFO logger.kafka.name= org.apache.kafka logger.kafka.level = INFO logger.hadoop.name = org.apache.hadoop logger.hadoop.level = INFO logger.zookeeper.name = org.apache.zookeeper logger.zookeeper.level = INFO # Log all infos to the console appender.console.name = ConsoleAppender appender.console.type = CONSOLE appender.console.layout.type = PatternLayout appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n # Log all infos in the given rolling file appender.rolling.name = RollingFileAppender appender.rolling.type = RollingFile appender.rolling.append = false appender.rolling.fileName = ${sys:log.file} appender.rolling.filePattern = ${sys:log.file}.%i appender.rolling.layout.type = PatternLayout appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n appender.rolling.policies.type = Policies appender.rolling.policies.size.type = SizeBasedTriggeringPolicy appender.rolling.policies.size.size=100MB appender.rolling.strategy.type = DefaultRolloverStrategy appender.rolling.strategy.max = 10 # Suppress the irrelevant (wrong) warnings from the Netty channel handler logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline logger.netty.level = OFF
2.2.5 jobmanager-service.yaml
apiVersion: v1 kind: Service metadata: name: flink-jobmanager spec: type: ClusterIP ports: - name: rpc port: 6123 - name: blob-server port: 6124 - name: webui port: 8081 selector: app: flink component: jobmanager
2.2.6 jobmanager-rest-service.yaml
将 jobmanager rest 端口公开为公共 Kubernetes 节点的端口。
apiVersion: v1 kind: Service metadata: name: flink-jobmanager-rest spec: type: NodePort ports: - name: rest port: 8081 targetPort: 8081 nodePort: 30081 selector: app: flink component: jobmanager
2.2.7 taskmanager-query-state-service.yaml
公开 TaskManager 端口以访问可查询状态作为公共 Kubernetes 节点的端口。
apiVersion: v1 kind: Service metadata: name: flink-taskmanager-query-state spec: type: NodePort ports: - name: query-state port: 6125 targetPort: 6125 nodePort: 30025 selector: app: flink component: taskmanager
2.2.8 jobmanager-application-non-ha.yaml
apiVersion: batch/v1 kind: Job metadata: name: flink-jobmanager spec: template: metadata: labels: app: flink component: jobmanager spec: restartPolicy: OnFailure containers: - name: jobmanager image: 192.168.20.62:2333/bigdata/flink-centos-admin-application:1.16.2 env: args: ["standalone-job", "--job-classname", "org.apache.flink.streaming.examples.windowing.TopSpeedWindowing","--output","/tmp/result"] ports: - containerPort: 6123 name: rpc - containerPort: 6124 name: blob-server - containerPort: 8081 name: webui livenessProbe: tcpSocket: port: 6123 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: - name: flink-config-volume mountPath: /opt/apache/flink-1.16.2/conf - name: job-artifacts-volume mountPath: /opt/apache/flink-1.16.2/usrlib securityContext: runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary volumes: - name: flink-config-volume configMap: name: flink-config items: - key: flink-conf.yaml path: flink-conf.yaml - key: log4j-console.properties path: log4j-console.properties - name: job-artifacts-volume hostPath: path: /mnt/bigdata/flink/usrlib
2.2.9 taskmanager-job-deployment.yaml
apiVersion: apps/v1 kind: Deployment metadata: name: flink-taskmanager spec: replicas: 2 selector: matchLabels: app: flink component: taskmanager template: metadata: labels: app: flink component: taskmanager spec: containers: - name: taskmanager image: 192.168.20.62:2333/bigdata/flink-centos-admin-application:1.16.2 env: args: ["taskmanager"] ports: - containerPort: 6122 name: rpc - containerPort: 6125 name: query-state livenessProbe: tcpSocket: port: 6122 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: - name: flink-config-volume mountPath: /opt/apache/flink-1.16.2/conf - name: job-artifacts-volume mountPath: /opt/apache/flink-1.16.2/usrlib securityContext: runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary volumes: - name: flink-config-volume configMap: name: flink-config items: - key: flink-conf.yaml path: flink-conf.yaml - key: log4j-console.properties path: log4j-console.properties - name: job-artifacts-volume hostPath: path: /mnt/bigdata/flink/usrlib
2.2.10 创建 flink 集群并提交任务
kubectl create ns flink # Configuration and service definition kubectl create -f flink-configuration-configmap.yaml -n flink # service kubectl create -f jobmanager-service.yaml -n flink kubectl create -f jobmanager-rest-service.yaml -n flink kubectl create -f taskmanager-query-state-service.yaml -n flink # Create the deployments for the cluster kubectl create -f jobmanager-application-non-ha.yaml -n flink kubectl create -f taskmanager-job-deployment.yaml -n flink
2.2.11 删除 flink 集群
kubectl delete -f flink-configuration-configmap.yaml -n flink kubectl delete -f jobmanager-service.yaml -n flink kubectl delete -f jobmanager-rest-service.yaml -n flink kubectl delete -f taskmanager-query-state-service.yaml -n flink kubectl delete -f jobmanager-application-non-ha.yaml -n flink kubectl delete -f taskmanager-job-deployment.yaml -n flink kubectl delete ns flink --force
-
猜你喜欢
- 2小时前Java爬虫爬取图片壁纸
- 2小时前生信技能37 - ClinGen数据库获取单倍剂量不足和三倍剂量敏感基因
- 2小时前【python】15.图像和办公文档处理
- 2小时前外包干了1个月,技术退步一大半。。。
- 2小时前VUE登录注册页面,完整vue,直接复制
- 2小时前【论文阅读】Deep Graph Contrastive Representation Learning
- 2小时前TDengine Kafka Connector将 Kafka 中指定 topic 的数据(批量或实时)同步到 TDengine
- 2小时前数据湖架构Hudi(二)Hudi版本0.12源码编译、Hudi集成spark、使用IDEA与spark对hudi表增删改查
- 2小时前Hive的更新和删除
- 56分钟前手机游戏键盘怎么使用(手机游戏键盘如何使用)
网友评论
- 搜索
- 最新文章
- 热门文章