上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

k8s之flink的几种创建方式

guduadmin217小时前

在此之前需要部署一下私人docker仓库,教程搭建 Docker 镜像仓库

注意:每台节点的daemon.json都需要配置"insecure-registries": ["http://主机IP:8080"] 并重启

一、session 模式

Session 模式是指在 Kubernetes 上启动一个共享的 Flink 集群(由 JobManager 和多个 TaskManagers 组成),然后多个 Flink 作业可以提交到这个共享集群上运行。这个模式下的集群会长期运行,直到用户手动停止它。这种模式适合多个作业需要频繁启动和停止,且对集群资源的利用率要求较高的场景。

Kubernetes 中的 Flink Session 集群部署至少包含三个组件:

  • 运行JobManager的部署

  • TaskManagers池的部署

  • 暴露JobManager的 REST 和 UI 端口的服务

    1.1 Native Kubernetes 模式

    Flink 的 Native Kubernetes 模式允许用户将 Apache Flink 无缝集成至 Kubernetes 环境中,实现在 Kubernetes 上运行 Flink 作业和应用程序。这种模式的主要优点是 Flink 能够利用 Kubernetes 提供的资源编排和管理能力,简化 Flink 集群的部署和管理。

    在 Native Kubernetes 模式下,Flink 集群的部署和管理是通过 Flink 的 Kubernetes Operator 或者是直接使用 kubectl 命令行工具来完成的。Flink 的每个组件都被作为 Kubernetes 资源(如Pods, Services等)来管理。

    1.1.1 构建镜像 Dockerfile

    1.创建dockerfile
    FROM flink:1.16.2
    RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone
    RUN export LANG=zh_CN.UTF-8
    2.开始构建镜像
    docker build -t  192.168.20.62:2333/bigdata/flink-session:1.16.2
    3.上传镜像
    docker push 192.168.20.62:2333/bigdata/flink-session:1.16.2

    1.1.2 创建命名空间和 serviceaccount

    # 创建namespace
    kubectl create ns flink
    # 创建serviceaccount
    kubectl create serviceaccount flink-service-account -n flink
    # 用户授权
    kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account

    1.1.3 创建 flink 集群

    ./bin/kubernetes-session.sh \
    -Dkubernetes.cluster-id=my-first-flink-cluster  \
    -Dkubernetes.container.image=192.168.20.62:2333/bigdata/flink-session:1.16.2 \
    -Dkubernetes.namespace=flink \
    -Dkubernetes.jobmanager.service-account=flink-service-account \
    -Dkubernetes.rest-service.exposed.type=NodePort

    1.1.4 提交任务

    ./bin/flink run \
    --target kubernetes-session \
    -Dkubernetes.cluster-id=my-first-flink-cluster \
    -Dkubernetes.namespace=flink \
    -Dkubernetes.jobmanager.service-account=flink-service-account \
    ./examples/streaming/TopSpeedWindowing.jar \
    -Dkubernetes.taskmanager.cpu=2000m \
    -Dexternal-resource.limits.kubernetes.cpu=4000m \
    -Dexternal-resource.limits.kubernetes.memory=10Gi \
    -Dexternal-resource.requests.kubernetes.cpu=2000m \
    -Dexternal-resource.requests.kubernetes.memory=8Gi \
    -Dkubernetes.taskmanager.cpu=2000m \

    1.1.5 删除 flink 集群

    kubectl delete deployment/my-first-flink-cluster -n flink
    kubectl delete ns flink --force

    1.2 Standalone 模式

    Standalone 模式通常指的是在 Kubernetes 集群上运行 Flink 的一个单独集群环境,但它不是专门为 Kubernetes 设计的。在 Kubernetes 上使用 Standalone 模式意味着你将手动设置 Flink 集群(包括 JobManager 和 TaskManagers),而不是通过 Kubernetes Operator 或者其他 Kubernetes 原生的资源调度和管理机制。换句话说,在这个模式下,Flink 集群的各个组件(JobManager和TaskManagers)运行在 Kubernetes Pod 中,但是它们的生命周期管理并不是通过 Kubernetes 原生的支持来实现的,而是类似于在任何其他环境中部署 Flink 的传统方式。

    1.2.1 创建docker-entrypoint.sh脚本

    #!/usr/bin/env bash
    ###############################################################################
    #  Licensed to the Apache Software Foundation (ASF) under one
    #  or more contributor license agreements.  See the NOTICE file
    #  distributed with this work for additional information
    #  regarding copyright ownership.  The ASF licenses this file
    #  to you under the Apache License, Version 2.0 (the
    #  "License"); you may not use this file except in compliance
    #  with the License.  You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    #  Unless required by applicable law or agreed to in writing, software
    #  distributed under the License is distributed on an "AS IS" BASIS,
    #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    #  See the License for the specific language governing permissions and
    # limitations under the License.
    ###############################################################################
    COMMAND_STANDALONE="standalone-job"
    COMMAND_HISTORY_SERVER="history-server"
    # If unspecified, the hostname of the container is taken as the JobManager address
    JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}
    CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml"
    drop_privs_cmd() {
        if [ $(id -u) != 0 ]; then
            # Don't need to drop privs if EUID != 0
            return
        elif [ -x /sbin/su-exec ]; then
            # Alpine
            echo su-exec admin
        else
            # Others
            echo gosu admin
        fi
    }
    copy_plugins_if_required() {
      if [ -z "$ENABLE_BUILT_IN_PLUGINS" ]; then
        return 0
      fi
      echo "Enabling required built-in plugins"
      for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" | tr ';' ' '); do
        echo "Linking ${target_plugin} to plugin directory"
        plugin_name=${target_plugin%.jar}
        mkdir -p "${FLINK_HOME}/plugins/${plugin_name}"
        if [ ! -e "${FLINK_HOME}/opt/${target_plugin}" ]; then
          echo "Plugin ${target_plugin} does not exist. Exiting."
          exit 1
        else
          ln -fs "${FLINK_HOME}/opt/${target_plugin}" "${FLINK_HOME}/plugins/${plugin_name}"
          echo "Successfully enabled ${target_plugin}"
        fi
      done
    }
    set_config_option() {
      local option=$1
      local value=$2
      # escape periods for usage in regular expressions
      local escaped_option=$(echo ${option} | sed -e "s/\./\\\./g")
      # either override an existing entry, or append a new one
      if grep -E "^${escaped_option}:.*" "${CONF_FILE}" > /dev/null; then
            sed -i -e "s/${escaped_option}:.*/$option: $value/g" "${CONF_FILE}"
      else
            echo "${option}: ${value}" >> "${CONF_FILE}"
      fi
    }
    prepare_configuration() {
        set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS}
        set_config_option blob.server.port 6124
        set_config_option query.server.port 6125
        if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; then
            set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}
        fi
        if [ -n "${FLINK_PROPERTIES}" ]; then
            echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}"
        fi
        envsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"
    }
    maybe_enable_jemalloc() {
        if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; then
            JEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so"
            JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so"
            if [ -f "$JEMALLOC_PATH" ]; then
                export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATH
            elif [ -f "$JEMALLOC_FALLBACK" ]; then
                export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACK
            else
                if [ "$JEMALLOC_PATH" = "$JEMALLOC_FALLBACK" ]; then
                    MSG_PATH=$JEMALLOC_PATH
                else
                    MSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK"
                fi
                echo "WARNING: attempted to load jemalloc from $MSG_PATH but the library couldn't be found. glibc will be used instead."
            fi
        fi
    }
    maybe_enable_jemalloc
    copy_plugins_if_required
    prepare_configuration
    args=("$@")
    if [ "$1" = "help" ]; then
        printf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER})\n"
        printf "    Or $(basename "$0") help\n\n"
        printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the 'DISABLE_JEMALLOC' environment variable to 'true'.\n"
        exit 0
    elif [ "$1" = "jobmanager" ]; then
        args=("${args[@]:1}")
        echo "Starting Job Manager"
        exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}"
    elif [ "$1" = ${COMMAND_STANDALONE} ]; then
        args=("${args[@]:1}")
        echo "Starting Job Manager"
        exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}"
    elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; then
        args=("${args[@]:1}")
        echo "Starting History Server"
        exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}"
    elif [ "$1" = "taskmanager" ]; then
        args=("${args[@]:1}")
        echo "Starting Task Manager"
        exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}"
    fi
    args=("${args[@]}")
    # Running command in pass-through mode
    exec $(drop_privs_cmd) "${args[@]}"
    

    1.2.2 编排 Dockerfile

    FROM centos:7.9.2009
    USER root
    # 安装常用工具
    RUN yum install -y vim tar wget curl rsync bzip2 iptables tcpdump less telnet net-tools lsof
    # 设置时区,默认是UTC时区
    RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone
    RUN mkdir -p /opt/apache
    ADD jdk-8u231-linux-x64.tar.gz /opt/apache/
    ADD flink-1.16.2-bin-scala_2.12.tgz  /opt/apache/
    ENV FLINK_HOME /opt/apache/flink-1.16.2
    ENV JAVA_HOME /opt/apache/jdk1.8.0_231
    ENV PATH $JAVA_HOME/bin:$PATH
    # 创建用户应用jar目录
    RUN mkdir $FLINK_HOME/usrlib/
    #RUN mkdir home
    COPY docker-entrypoint.sh /opt/apache/
    RUN chmod +x /opt/apache/docker-entrypoint.sh
    RUN groupadd --system --gid=9999 admin && useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=admin admin
    RUN chown -R admin:admin /opt/apache
    #设置的工作目录
    WORKDIR $FLINK_HOME
    # 对外暴露端口
    EXPOSE 6123 8081
    # 执行脚本,构建镜像时不执行,运行实例才会执行
    ENTRYPOINT ["/opt/apache/docker-entrypoint.sh"]
    CMD ["help"]

    1.2.3 开始构建镜像

    docker build -t 192.168.20.62:2333/bigdata/flink-centos-admin:1.16.2 . --no-cache
    # 上传镜像
    docker push 192.168.20.62:2333/bigdata/flink-centos-admin:1.16.2

    1.2.4 创建命名空间和 serviceaccount

    # 创建namespace
    kubectl create ns flink
    # 创建serviceaccount
    kubectl create serviceaccount flink-service-account -n flink
    # 用户授权
    kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account

    1.2.5 编排 yaml 文件

    1.2.5.1  flink-configuration-configmap.yaml
    apiVersion: v1
    kind: ConfigMap
    metadata:
      name: flink-config
      labels:
        app: flink
    data:
      flink-conf.yaml: |+
        jobmanager.rpc.address: flink-jobmanager
        taskmanager.numberOfTaskSlots: 2
        blob.server.port: 6124
        jobmanager.rpc.port: 6123
        taskmanager.rpc.port: 6122
        queryable-state.proxy.ports: 6125
        jobmanager.memory.process.size: 3200m
        taskmanager.memory.process.size: 2728m
        taskmanager.memory.flink.size: 2280m
        parallelism.default: 2
      log4j-console.properties: |+
        # This affects logging for both user code and Flink
        rootLogger.level = INFO
        rootLogger.appenderRef.console.ref = ConsoleAppender
        rootLogger.appenderRef.rolling.ref = RollingFileAppender
        # Uncomment this if you want to _only_ change Flink's logging
        #logger.flink.name = org.apache.flink
        #logger.flink.level = INFO
        # The following lines keep the log level of common libraries/connectors on
        # log level INFO. The root logger does not override this. You have to manually
        # change the log levels here.
        logger.akka.name = akka
        logger.akka.level = INFO
        logger.kafka.name= org.apache.kafka
        logger.kafka.level = INFO
        logger.hadoop.name = org.apache.hadoop
        logger.hadoop.level = INFO
        logger.zookeeper.name = org.apache.zookeeper
        logger.zookeeper.level = INFO
        # Log all infos to the console
        appender.console.name = ConsoleAppender
        appender.console.type = CONSOLE
        appender.console.layout.type = PatternLayout
        appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
        # Log all infos in the given rolling file
        appender.rolling.name = RollingFileAppender
        appender.rolling.type = RollingFile
        appender.rolling.append = false
        appender.rolling.fileName = ${sys:log.file}
        appender.rolling.filePattern = ${sys:log.file}.%i
        appender.rolling.layout.type = PatternLayout
        appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
        appender.rolling.policies.type = Policies
        appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
        appender.rolling.policies.size.size=100MB
        appender.rolling.strategy.type = DefaultRolloverStrategy
        appender.rolling.strategy.max = 10
        # Suppress the irrelevant (wrong) warnings from the Netty channel handler
        logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
        logger.netty.level = OFF
    1.2.5.2 jobmanager-service.yaml
    apiVersion: v1
    kind: Service
    metadata:
      name: flink-jobmanager
    spec:
      type: ClusterIP
      ports:
      - name: rpc
        port: 6123
      - name: blob-server
        port: 6124
      - name: webui
        port: 8081
      selector:
        app: flink
        component: jobmanager
    1.2.5.3 jobmanager-rest-service.yaml

    将 jobmanager rest端口公开为公共 Kubernetes 节点的端口

    apiVersion: v1
    kind: Service
    metadata:
      name: flink-jobmanager-rest
    spec:
      type: NodePort
      ports:
      - name: rest
        port: 8081
        targetPort: 8081
        nodePort: 30081
      selector:
        app: flink
        component: jobmanager
    1.2.5.4 taskmanager-query-state-service.yaml

    公开 TaskManager 端口以访问可查询状态作为公共 Kubernetes 节点的端口

    apiVersion: v1
    kind: Service
    metadata:
      name: flink-taskmanager-query-state
    spec:
      type: NodePort
      ports:
      - name: query-state
        port: 6125
        targetPort: 6125
        nodePort: 30025
      selector:
        app: flink
        component: taskmanager
    1.2.5.5  jobmanager-session-deployment-non-ha.yaml
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: flink-jobmanager
    spec:
      replicas: 1
      selector:
        matchLabels:
          app: flink
          component: jobmanager
      template:
        metadata:
          labels:
            app: flink
            component: jobmanager
        spec:
          containers:
          - name: jobmanager
            image: 192.168.20.62:2333/bigdata/flink-centos-admin:1.16.2
            args: ["jobmanager"]
            ports:
            - containerPort: 6123
              name: rpc
            - containerPort: 6124
              name: blob-server
            - containerPort: 8081
              name: webui
            livenessProbe:
              tcpSocket:
                port: 6123
              initialDelaySeconds: 30
              periodSeconds: 60
            volumeMounts:
            - name: flink-config-volume
              mountPath: /opt/apache/flink-1.16.2/conf/
            securityContext:
              runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
          volumes:
          - name: flink-config-volume
            configMap:
              name: flink-config
              items:
              - key: flink-conf.yaml
                path: flink-conf.yaml
              - key: log4j-console.properties
                path: log4j-console.properties
    1.2.5.6  taskmanager-session-deployment.yaml
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: flink-taskmanager
    spec:
      replicas: 2
      selector:
        matchLabels:
          app: flink
          component: taskmanager
      template:
        metadata:
          labels:
            app: flink
            component: taskmanager
        spec:
          containers:
          - name: taskmanager
            image: 192.168.20.62:2333/bigdata/flink-centos-admin:1.16.2
            args: ["taskmanager"]
            ports:
            - containerPort: 6122
              name: rpc
            - containerPort: 6125
              name: query-state
            livenessProbe:
              tcpSocket:
                port: 6122
              initialDelaySeconds: 30
              periodSeconds: 60
            volumeMounts:
            - name: flink-config-volume
              mountPath: /opt/apache/flink-1.16.2/conf/
            securityContext:
              runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
          volumes:
          - name: flink-config-volume
            configMap:
              name: flink-config
              items:
              - key: flink-conf.yaml
                path: flink-conf.yaml
              - key: log4j-console.properties
                path: log4j-console.properties

    1.2.6 创建 flink 集群

    kubectl create ns flink
    # Configuration and service definition
    kubectl create -f flink-configuration-configmap.yaml -n flink
    # service
    kubectl create -f jobmanager-service.yaml -n flink
    kubectl create -f jobmanager-rest-service.yaml -n flink
    kubectl create -f taskmanager-query-state-service.yaml -n flink
    # Create the deployments for the cluster
    kubectl create -f jobmanager-session-deployment-non-ha.yaml -n flink
    kubectl create -f taskmanager-session-deployment.yaml -n flink

    1.2.7 提交任务

    ./bin/flink run -m 192.168.20.62:30081 ./examples/streaming/TopSpeedWindowing.jar

    1.2.8 删除集群

    kubectl delete -f jobmanager-service.yaml -n flink
    kubectl delete -f flink-configuration-configmap.yaml -n flink
    kubectl delete -f taskmanager-session-deployment.yaml -n flink
    kubectl delete -f jobmanager-session-deployment.yaml -n flink
    kubectl delete ns flink --force

    二 、application 模式(推荐)

    Kubernetes 中一个基本的 Flink Application 集群部署包含三个组件

    • 运行JobManager的应用程序

    • TaskManagers池的部署

    • 暴露JobManager的 REST 和 UI 端口的服务

      2.1 Native Kubernetes 模式(常用)

      2.1.1 构建镜像 Dockerfile

      FROM flink:1.16.2
      RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone
      RUN export LANG=zh_CN.UTF-8
      RUN mkdir -p $FLINK_HOME/usrlib
      COPY   ./flink-1.16.2/examples/streaming/TopSpeedWindowing.jar /opt/flink/usrlib/
      开始构建镜像
      docker build -t 192.168.20.62:2333/bigdata/flink-application:1.16.2 . --no-cache
      docker push  192.168.20.62:2333/bigdata/flink-application:1.16.2

      2.1.2 创建命名空间和 serviceacount

      # 创建namespace
      kubectl create ns flink
      # 创建serviceaccount
      kubectl create serviceaccount flink-service-account -n flink
      # 用户授权
      kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account

      2.1.3 创建 flink 集群并提交任务

      ./bin/flink run-application \
       --target kubernetes-application \
       -Dkubernetes.cluster-id=my-first-application-cluster  \
       -Dkubernetes.container.image=192.168.20.62:2333/bigdata/flink-application:1.16.2 \
       -Dkubernetes.jobmanager.replicas=1 \
       -Dkubernetes.namespace=flink \
       -Dkubernetes.jobmanager.service-account=flink-service-account \
       -Dexternal-resource.limits.kubernetes.cpu=2000m \
       -Dexternal-resource.limits.kubernetes.memory=2Gi \
       -Dexternal-resource.requests.kubernetes.cpu=1000m \
       -Dexternal-resource.requests.kubernetes.memory=1Gi \
       -Dkubernetes.rest-service.exposed.type=NodePort \
       local:///opt/flink/usrlib/TopSpeedWindowing.jar

      local是application模式中唯一支持的方案。local 代表本地环境,这里即 pod 或者容器环境,并非宿主机。

      2.1.4 删除 flink 集群

      kubectl delete deployment/my-first-application-cluster -n flink
      kubectl delete ns flink --force

      2.2 Standalone 模式

      在此之前需要使用nfs设置一个共享目录,配置文件中设置共享目录是/mnt/bigdata/flink/usrlib

      NFS共享存储服务

      2.2.1 创建启动脚本docker-entrypoint.sh

      #!/usr/bin/env bash
      ###############################################################################
      #  Licensed to the Apache Software Foundation (ASF) under one
      #  or more contributor license agreements.  See the NOTICE file
      #  distributed with this work for additional information
      #  regarding copyright ownership.  The ASF licenses this file
      #  to you under the Apache License, Version 2.0 (the
      #  "License"); you may not use this file except in compliance
      #  with the License.  You may obtain a copy of the License at
      #
      #      http://www.apache.org/licenses/LICENSE-2.0
      #
      #  Unless required by applicable law or agreed to in writing, software
      #  distributed under the License is distributed on an "AS IS" BASIS,
      #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      #  See the License for the specific language governing permissions and
      # limitations under the License.
      ###############################################################################
      COMMAND_STANDALONE="standalone-job"
      COMMAND_HISTORY_SERVER="history-server"
      # If unspecified, the hostname of the container is taken as the JobManager address
      JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}
      CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml"
      drop_privs_cmd() {
          if [ $(id -u) != 0 ]; then
              # Don't need to drop privs if EUID != 0
              return
          elif [ -x /sbin/su-exec ]; then
              # Alpine
              echo su-exec admin
          else
              # Others
              echo gosu admin
          fi
      }
      copy_plugins_if_required() {
        if [ -z "$ENABLE_BUILT_IN_PLUGINS" ]; then
          return 0
        fi
        echo "Enabling required built-in plugins"
        for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" | tr ';' ' '); do
          echo "Linking ${target_plugin} to plugin directory"
          plugin_name=${target_plugin%.jar}
          mkdir -p "${FLINK_HOME}/plugins/${plugin_name}"
          if [ ! -e "${FLINK_HOME}/opt/${target_plugin}" ]; then
            echo "Plugin ${target_plugin} does not exist. Exiting."
            exit 1
          else
            ln -fs "${FLINK_HOME}/opt/${target_plugin}" "${FLINK_HOME}/plugins/${plugin_name}"
            echo "Successfully enabled ${target_plugin}"
          fi
        done
      }
      set_config_option() {
        local option=$1
        local value=$2
        # escape periods for usage in regular expressions
        local escaped_option=$(echo ${option} | sed -e "s/\./\\\./g")
        # either override an existing entry, or append a new one
        if grep -E "^${escaped_option}:.*" "${CONF_FILE}" > /dev/null; then
              sed -i -e "s/${escaped_option}:.*/$option: $value/g" "${CONF_FILE}"
        else
              echo "${option}: ${value}" >> "${CONF_FILE}"
        fi
      }
      prepare_configuration() {
          set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS}
          set_config_option blob.server.port 6124
          set_config_option query.server.port 6125
          if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; then
              set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}
          fi
          if [ -n "${FLINK_PROPERTIES}" ]; then
              echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}"
          fi
          envsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"
      }
      maybe_enable_jemalloc() {
          if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; then
              JEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so"
              JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so"
              if [ -f "$JEMALLOC_PATH" ]; then
                  export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATH
              elif [ -f "$JEMALLOC_FALLBACK" ]; then
                  export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACK
              else
                  if [ "$JEMALLOC_PATH" = "$JEMALLOC_FALLBACK" ]; then
                      MSG_PATH=$JEMALLOC_PATH
                  else
                      MSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK"
                  fi
                  echo "WARNING: attempted to load jemalloc from $MSG_PATH but the library couldn't be found. glibc will be used instead."
              fi
          fi
      }
      maybe_enable_jemalloc
      copy_plugins_if_required
      prepare_configuration
      args=("$@")
      if [ "$1" = "help" ]; then
          printf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER})\n"
          printf "    Or $(basename "$0") help\n\n"
          printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the 'DISABLE_JEMALLOC' environment variable to 'true'.\n"
          exit 0
      elif [ "$1" = "jobmanager" ]; then
          args=("${args[@]:1}")
          echo "Starting Job Manager"
          exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}"
      elif [ "$1" = ${COMMAND_STANDALONE} ]; then
          args=("${args[@]:1}")
          echo "Starting Job Manager"
          exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}"
      elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; then
          args=("${args[@]:1}")
          echo "Starting History Server"
          exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}"
      elif [ "$1" = "taskmanager" ]; then
          args=("${args[@]:1}")
          echo "Starting Task Manager"
          exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}"
      fi
      args=("${args[@]}")
      # Running command in pass-through mode
      exec $(drop_privs_cmd) "${args[@]}"

      2.2.2 编排Dockerfile

      FROM centos:7.9.2009
      USER root
      # 安装常用工具
      RUN yum install -y vim tar wget curl rsync bzip2 iptables tcpdump less telnet net-tools lsof
      # 设置时区,默认是UTC时区
      RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone
      RUN mkdir -p /opt/apache
      ADD jdk-8u231-linux-x64.tar.gz /opt/apache/
      ADD flink-1.16.2-bin-scala_2.12.tgz  /opt/apache/
      ENV FLINK_HOME /opt/apache/flink-1.16.2
      ENV JAVA_HOME /opt/apache/jdk1.8.0_231
      ENV PATH $JAVA_HOME/bin:$PATH
      # 创建用户应用jar目录
      RUN mkdir $FLINK_HOME/usrlib/
      #RUN mkdir home
      COPY docker-entrypoint.sh /opt/apache/
      COPY /
      RUN groupadd --system --gid=9999 admin && useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=admin admin
      RUN chown -R admin:admin /opt/apache
      RUN chmod +x /opt/apache/docker-entrypoint.sh
      #设置的工作目录
      WORKDIR $FLINK_HOME
      # 对外暴露端口
      EXPOSE 6123 8081
      # 执行脚本,构建镜像时不执行,运行实例才会执行
      ENTRYPOINT ["/opt/apache/docker-entrypoint.sh"]
      CMD ["help"]
      docker build -t 192.168.20.62:2333/bigdata/flink-centos-admin-application:1.16.2 . --no-cache
      docker push 192.168.20.62:2333/bigdata/flink-centos-admin-application:1.16.2
      

      2.2.3 创建命名空间和 serviceacount

      # 创建namespace
      kubectl create ns flink
      # 创建serviceaccount
      kubectl create serviceaccount flink-service-account -n flink
      # 用户授权
      kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account

      2.2.4 flink-configuration-configmap.yaml

      apiVersion: v1
      kind: ConfigMap
      metadata:
        name: flink-config
        labels:
          app: flink
      data:
        flink-conf.yaml: |+
          jobmanager.rpc.address: flink-jobmanager
          taskmanager.numberOfTaskSlots: 2
          blob.server.port: 6124
          jobmanager.rpc.port: 6123
          taskmanager.rpc.port: 6122
          queryable-state.proxy.ports: 6125
          jobmanager.memory.process.size: 3200m
          taskmanager.memory.process.size: 2728m
          taskmanager.memory.flink.size: 2280m
          parallelism.default: 2
        log4j-console.properties: |+
          # This affects logging for both user code and Flink
          rootLogger.level = INFO
          rootLogger.appenderRef.console.ref = ConsoleAppender
          rootLogger.appenderRef.rolling.ref = RollingFileAppender
          # Uncomment this if you want to _only_ change Flink's logging
          #logger.flink.name = org.apache.flink
          #logger.flink.level = INFO
          # The following lines keep the log level of common libraries/connectors on
          # log level INFO. The root logger does not override this. You have to manually
          # change the log levels here.
          logger.akka.name = akka
          logger.akka.level = INFO
          logger.kafka.name= org.apache.kafka
          logger.kafka.level = INFO
          logger.hadoop.name = org.apache.hadoop
          logger.hadoop.level = INFO
          logger.zookeeper.name = org.apache.zookeeper
          logger.zookeeper.level = INFO
          # Log all infos to the console
          appender.console.name = ConsoleAppender
          appender.console.type = CONSOLE
          appender.console.layout.type = PatternLayout
          appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
          # Log all infos in the given rolling file
          appender.rolling.name = RollingFileAppender
          appender.rolling.type = RollingFile
          appender.rolling.append = false
          appender.rolling.fileName = ${sys:log.file}
          appender.rolling.filePattern = ${sys:log.file}.%i
          appender.rolling.layout.type = PatternLayout
          appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
          appender.rolling.policies.type = Policies
          appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
          appender.rolling.policies.size.size=100MB
          appender.rolling.strategy.type = DefaultRolloverStrategy
          appender.rolling.strategy.max = 10
          # Suppress the irrelevant (wrong) warnings from the Netty channel handler
          logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
          logger.netty.level = OFF

      2.2.5 jobmanager-service.yaml

      apiVersion: v1
      kind: Service
      metadata:
        name: flink-jobmanager
      spec:
        type: ClusterIP
        ports:
        - name: rpc
          port: 6123
        - name: blob-server
          port: 6124
        - name: webui
          port: 8081
        selector:
          app: flink
          component: jobmanager

      2.2.6  jobmanager-rest-service.yaml

      将 jobmanager rest 端口公开为公共 Kubernetes 节点的端口。

      apiVersion: v1
      kind: Service
      metadata:
        name: flink-jobmanager-rest
      spec:
        type: NodePort
        ports:
        - name: rest
          port: 8081
          targetPort: 8081
          nodePort: 30081
        selector:
          app: flink
          component: jobmanager

      2.2.7 taskmanager-query-state-service.yaml

      公开 TaskManager 端口以访问可查询状态作为公共 Kubernetes 节点的端口。

      apiVersion: v1
      kind: Service
      metadata:
        name: flink-taskmanager-query-state
      spec:
        type: NodePort
        ports:
        - name: query-state
          port: 6125
          targetPort: 6125
          nodePort: 30025
        selector:
          app: flink
          component: taskmanager

      2.2.8 jobmanager-application-non-ha.yaml

      apiVersion: batch/v1
      kind: Job
      metadata:
        name: flink-jobmanager
      spec:
        template:
          metadata:
            labels:
              app: flink
              component: jobmanager
          spec:
            restartPolicy: OnFailure
            containers:
              - name: jobmanager
                image: 192.168.20.62:2333/bigdata/flink-centos-admin-application:1.16.2
                env:
                args: ["standalone-job", "--job-classname", "org.apache.flink.streaming.examples.windowing.TopSpeedWindowing","--output","/tmp/result"]
                ports:
                  - containerPort: 6123
                    name: rpc
                  - containerPort: 6124
                    name: blob-server
                  - containerPort: 8081
                    name: webui
                livenessProbe:
                  tcpSocket:
                    port: 6123
                  initialDelaySeconds: 30
                  periodSeconds: 60
                volumeMounts:
                  - name: flink-config-volume
                    mountPath: /opt/apache/flink-1.16.2/conf
                  - name: job-artifacts-volume
                    mountPath: /opt/apache/flink-1.16.2/usrlib
                securityContext:
                  runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
            volumes:
              - name: flink-config-volume
                configMap:
                  name: flink-config
                  items:
                    - key: flink-conf.yaml
                      path: flink-conf.yaml
                    - key: log4j-console.properties
                      path: log4j-console.properties
              - name: job-artifacts-volume
                hostPath:
                  path: /mnt/bigdata/flink/usrlib

      2.2.9 taskmanager-job-deployment.yaml

      apiVersion: apps/v1
      kind: Deployment
      metadata:
        name: flink-taskmanager
      spec:
        replicas: 2
        selector:
          matchLabels:
            app: flink
            component: taskmanager
        template:
          metadata:
            labels:
              app: flink
              component: taskmanager
          spec:
            containers:
            - name: taskmanager
              image: 192.168.20.62:2333/bigdata/flink-centos-admin-application:1.16.2
              env:
              args: ["taskmanager"]
              ports:
              - containerPort: 6122
                name: rpc
              - containerPort: 6125
                name: query-state
              livenessProbe:
                tcpSocket:
                  port: 6122
                initialDelaySeconds: 30
                periodSeconds: 60
              volumeMounts:
              - name: flink-config-volume
                mountPath: /opt/apache/flink-1.16.2/conf
              - name: job-artifacts-volume
                mountPath: /opt/apache/flink-1.16.2/usrlib
              securityContext:
                runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
            volumes:
            - name: flink-config-volume
              configMap:
                name: flink-config
                items:
                - key: flink-conf.yaml
                  path: flink-conf.yaml
                - key: log4j-console.properties
                  path: log4j-console.properties
            - name: job-artifacts-volume
              hostPath:
                path: /mnt/bigdata/flink/usrlib
      

      2.2.10 创建 flink 集群并提交任务

      kubectl create ns flink
      # Configuration and service definition
      kubectl create -f flink-configuration-configmap.yaml -n flink
      # service
      kubectl create -f jobmanager-service.yaml -n flink
      kubectl create -f jobmanager-rest-service.yaml -n flink
      kubectl create -f taskmanager-query-state-service.yaml -n flink
      # Create the deployments for the cluster
      kubectl create -f  jobmanager-application-non-ha.yaml -n flink
      kubectl create -f  taskmanager-job-deployment.yaml -n flink

      2.2.11 删除 flink 集群

      kubectl delete -f flink-configuration-configmap.yaml -n flink
      kubectl delete -f jobmanager-service.yaml -n flink
      kubectl delete -f jobmanager-rest-service.yaml -n flink
      kubectl delete -f taskmanager-query-state-service.yaml -n flink
      kubectl delete -f jobmanager-application-non-ha.yaml -n flink
      kubectl delete -f taskmanager-job-deployment.yaml -n flink
      kubectl delete ns flink --force

网友评论

搜索
最新文章
热门文章
热门标签