文章目录
- 第一章 DolphinScheduler介绍
- 1.1 关于DolphinScheduler
- 1.2 特性
- 1.3 名词解释
- 1.3.1 名词解释
- 1.3.2 模块介绍
- 第二章 DolphinScheduler系统架构
- 2.1 系统架构图
- 2.2 架构说明
- 该服务包含:
- 2.3 启动流程活动图
- 2.4 架构设计思想
- 2.4.1 去中心化vs中心化
- 2.4.1.1 中心化思想
- 2.4.1.2 去中心化
- 2.4.3 容错设计
- 2.4.3.1 宕机容错
- 2.4.3.2 任务失败重试
- 2.4.4 任务优先级设计
- 2.4.5 Logback和netty实现日志访问
- 第三章 DolphinScheduler安装部署
- 3.1 软硬件环境配置建议
- 3.1.1 Linux 操作系统版本要求
- 3.1.2 服务器建议配置
- 3.1.3 网络要求
- 3.1.4 客户端 Web 浏览器要求
- 3.2 安装部署介绍
- 3.3 单机部署
- 3.4 集群部署
- 3.4.1 集群部署规划
- 3.4.2 集群准备工作
- 3.4.3 初始化数据库
- 3.4.4 修改安装环境配置
- 3.4.5 安装DolphinScheduler
- 3.4.6 启停命令
- 3.4.7 登录 DolphinScheduler
- 3.5 DolphinScheduler Web UI介绍
- 第四章 DolphinScheduler功能应用
- 4.1 项目管理
- 4.1.1 项目创建
- 4.1.2 项目进入-修改与删除
- 4.1.3 项目首页
- 4.2 工作流定义
- 4.2.1 创建工作流定义
- 4.2.2 工作流定义操作功能
- 4.2.3 运行工作流
- 4.2.4 单独运行任务
- 4.2.5 工作流定时
- 4.2.6 导出工作流
- 4.2.7 倒入工作流
- 4.3 工作流实例
- 4.3.1 查看工作流实例
- 4.3.2 查看任务日志
- 4.3.3 查看任务历史记录
- 4.3.4 查看运行参数
- 4.3.5 工作流实例操作功能
- 4.4 任务定义
- 4.4.1 批量任务定义
- 4.4.2 创建任务
- 4.4.3 实时任务定义
- 4.5 任务实例
- 4.5.1 批量任务实例
- 4.5.2 实时任务实例
- 4.6 数据源中心
- 4.6.1 MySQL数据源
- 4.6.2 Hive数据源
- 4.6.3 Spark数据源
- 4.7 资源中心
- 4.7.1 资源中心简介
- 4.7.2 资源中心配置
- 4.7.3 对接分布式或远端存储
- 4.7.4 文件管理
- 4.8 任务类型
- 4.8.1 Shell
- 4.8.1.1 创建任务
- 4.8.1.2 任务参数
- 4.8.1.3 任务案例
- 4.8.2 SQL
- 4.8.2.1 创建任务
- 4.8.2.2 任务参数
- 4.8.2.3 任务样例
- 4.8.3 Hive Cli
- 4.8.3.1 Hive CLI任务 VS 连接Hive数据源的SQL任务
- 4.8.3.2 创建任务
- 4.8.3.3 任务参数
- 4.8.3.4 任务案例
- 4.8.4 Python
- 4.8.4.1 创建任务
- 4.8.4.2 任务参数
- 4.8.4.3 任务案例
- 4.8.5 Spark
- 4.8.5.1 创建任务
- 4.8.5.2 任务参数
- 4.5.3 任务案例
- 4.8.6 DataX
- 4.8.6.1 创建任务
- 4.8.6.2 任务参数
- 4.8.6.3 任务案例
- 4.8.7 SubProcess
- 4.8.7.1 创建任务
- 4.8.7.2 任务参数
- 4.8.7.3 任务案例
- 4.8.8 Dependent
- 4.8.8.1 创建任务
- 4.8.8.2 任务参数
- 4.8.8.3 任务样例
- 4.8.9 Conditions
- 4.8.9.1 创建任务
- 4.8.9.2 任务参数
- 4.8.9.3 任务案例
- 4.8.10 Switch
- 4.8.10.1 创建任务
- 4.8.10.2 任务参数
- 4.8.10.3 任务样例
- 4.9 参数
- 4.9.1 内置参数
- 4.9.2 全局参数
- 4.9.3 本地参数
- 4.9.4 参数的引用
- 4.9.5 参数优先级
- 4.9.5.1 优先级案例
- 4.10 告警
- 4.10.1 Email
- 4.10.1.1 获取邮箱授权码
- 4.10.1.2 创建email告警实例
- 4.10.2 feishu
- 4.10.2.1 构建飞书机器人
- 4.10.2.2 创建飞书告警实例
- 4.10.3 DingTalk
- 4.10.3.1 钉钉告警准备
- 4.3.10.2 钉钉告警实例
- 4.10.4 告警组管理
- 4.10.4.1 告警组创建
- 4.10.4.2 告警组应用
- 4.11 监控中心
- 4.11.1 服务管理
- 4.11.2 统计管理
- 4.12 安全中心
- 4.12.1 YARN队列管理
- 4.12.2 租户管理
- 4.12.3 用户管理
- 4.12.4 授予权限
- 4.12.5 令牌管理
- 4.12.6 Worker分组管理
- 4.12.7 环境管理
- 4.12.8 集群管理
- 4.12.9 K8s命名空间管理
- 4.13 数据质量
- 4.13.1 任务类型介绍
- 4.13.2 数据质量任务的执行逻辑
- 4.13.3 数据质量配置
- 4.13.4 检查逻辑详解
- 4.13.5 单表行数校验
- 4.13.6 单表空值校验
- 4.13.7 自定义SQL校验
第一章 DolphinScheduler介绍
1.1 关于DolphinScheduler
Apache DolphinScheduler 是一个分布式易扩展的可视化DAG工作流任务调度开源系统。适用于企业级场景,提供了一个可视化操作任务、工作流和全生命周期数据处理过程的解决方案。
Apache DolphinScheduler 旨在解决复杂的大数据任务依赖关系,并为应用程序提供数据和各种 OPS 编排中的关系。 解决数据研发ETL依赖错综复杂,无法监控任务健康状态的问题。 DolphinScheduler 以 DAG(Directed Acyclic Graph,DAG)流式方式组装任务,可以及时监控任务的执行状态,支持重试、指定节点恢复失败、暂停、恢复、终止任务等操作。
1.2 特性
-
简单易用
- 可视化 DAG: 用户友好的,通过拖拽定义工作流的,运行时控制工具
- 模块化操作: 模块化有助于轻松定制和维护。
-
丰富的使用场景
- 支持多种任务类型: 支持Shell、MR、Spark、SQL等10余种任务类型,支持跨语言,易于扩展
- 丰富的工作流操作: 工作流程可以定时、暂停、恢复和停止,便于维护和控制全局和本地参数。
-
High Reliability
- 高可靠性: 去中心化设计,确保稳定性。 原生 HA 任务队列支持,提供过载容错能力。 DolphinScheduler 能提供高度稳健的环境。
-
High Scalability
- 高扩展性: 支持多租户和在线资源管理。支持每天10万个数据任务的稳定运行。
1.3 名词解释
在对 Apache DolphinScheduler 了解之前,我们先来认识一下调度系统常用的名词
1.3.1 名词解释
DAG: 全称 Directed Acyclic Graph,简称 DAG。工作流中的 Task 任务以有向无环图的形式组装起来,从入度为零的节点进行拓扑遍历,直到无后继节点为止。举例如下图:
流程定义:通过拖拽任务节点并建立任务节点的关联所形成的可视化DAG
流程实例:流程实例是流程定义的实例化,可以通过手动启动或定时调度生成。每运行一次流程定义,产生一个流程实例。
任务实例:任务实例是流程定义中任务节点的实例化,标识着某个具体的任务。
任务类型:目前支持有 SHELL、SQL、SUB_PROCESS(子流程)、PROCEDURE、MR、SPARK、PYTHON、DEPENDENT(依赖),同时计划支持动态插件扩展,注意:其中 SUB_PROCESS类型的任务需要关联另外一个流程定义,被关联的流程定义是可以单独启动执行的
调度方式:系统支持基于 cron 表达式的定时调度和手动调度。命令类型支持:启动工作流、从当前节点开始执行、恢复被容错的工作流、恢复暂停流程、从失败节点开始执行、补数、定时、重跑、暂停、停止、恢复等待线程。 其中 恢复被容错的工作流 和 恢复等待线程 两种命令类型是由调度内部控制使用,外部无法调用
定时调度:系统采用 quartz 分布式调度器,并同时支持cron表达式可视化的生成
依赖:系统不单单支持 DAG 简单的前驱和后继节点之间的依赖,同时还提供任务依赖节点,支持流程间的自定义任务依赖
优先级 :支持流程实例和任务实例的优先级,如果流程实例和任务实例的优先级不设置,则默认是先进先出
邮件告警:支持 SQL任务 查询结果邮件发送,流程实例运行结果邮件告警及容错告警通知
失败策略:对于并行运行的任务,如果有任务失败,提供两种失败策略处理方式,继续是指不管并行运行任务的状态,直到流程失败结束。结束是指一旦发现失败任务,则同时Kill掉正在运行的并行任务,流程失败结束
补数:补历史数据,支持区间并行和串行两种补数方式,其日期选择方式包括日期范围和日期枚举两种
1.3.2 模块介绍
- dolphinscheduler-master master模块,提供工作流管理和编排服务。
- dolphinscheduler-worker worker模块,提供任务执行管理服务。
- dolphinscheduler-alert 告警模块,提供 AlertServer 服务。
- dolphinscheduler-api web应用模块,提供 ApiServer 服务。
- dolphinscheduler-common 通用的常量枚举、工具类、数据结构或者基类
- dolphinscheduler-dao 提供数据库访问等操作。
- dolphinscheduler-remote 基于 netty 的客户端、服务端
- dolphinscheduler-service service模块,包含Quartz、Zookeeper、日志客户端访问服务,便于server模块和api模块调用
- dolphinscheduler-ui 前端模块
第二章 DolphinScheduler系统架构
2.1 系统架构图
2.2 架构说明
-
MasterServer
MasterServer采用分布式无中心设计理念,MasterServer主要负责 DAG 任务切分、任务提交监控,并同时监听其它MasterServer和WorkerServer的健康状态。 MasterServer服务启动时向Zookeeper注册临时节点,通过监听Zookeeper临时节点变化来进行容错处理。 MasterServer基于netty提供监听服务。
该服务内主要包含:
-
DistributedQuartz分布式调度组件,主要负责定时任务的启停操作,当quartz调起任务后,Master内部会有线程池具体负责处理任务的后续操作;
-
MasterSchedulerService是一个扫描线程,定时扫描数据库中的t_ds_command表,根据不同的命令类型进行不同的业务操作;
-
WorkflowExecuteRunnable主要是负责DAG任务切分、任务提交监控、各种不同事件类型的逻辑处理;
-
TaskExecuteRunnable主要负责任务的处理和持久化,并生成任务事件提交到工作流的事件队列;
-
EventExecuteService主要负责工作流实例的事件队列的轮询;
-
StateWheelExecuteThread主要负责工作流和任务超时、任务重试、任务依赖的轮询,并生成对应的工作流或任务事件提交到工作流的事件队列;
-
FailoverExecuteThread主要负责Master容错和Worker容错的相关逻辑;
-
WorkerServer
WorkerServer也采用分布式无中心设计理念,WorkerServer主要负责任务的执行和提供日志服务。 WorkerServer服务启动时向Zookeeper注册临时节点,并维持心跳。 Server基于netty提供监听服务。
该服务包含:
- WorkerManagerThread主要负责任务队列的提交,不断从任务队列中领取任务,提交到线程池处理;
- TaskExecuteThread主要负责任务执行的流程,根据不同的任务类型进行任务的实际处理;
- RetryReportTaskStatusThread主要负责定时轮询向Master汇报任务的状态,直到Master回复状态的ack,避免任务状态丢失;
-
ZooKeeper
ZooKeeper服务,系统中的MasterServer和WorkerServer节点都通过ZooKeeper来进行集群管理和容错。另外系统还基于ZooKeeper进行事件监听和分布式锁。 我们也曾经基于Redis实现过队列,不过我们希望DolphinScheduler依赖到的组件尽量地少,所以最后还是去掉了Redis实现。
-
AlertServer
提供告警服务,通过告警插件的方式实现丰富的告警手段。
-
APIServer
API接口层,主要负责处理前端UI层的请求。该服务统一提供RESTful api向外部提供请求服务。 接口包括工作流的创建、定义、查询、修改、发布、下线、手工启动、停止、暂停、恢复、从该节点开始执行等等。
-
UI
系统的前端页面,提供系统的各种可视化操作界面。
2.3 启动流程活动图
2.4 架构设计思想
2.4.1 去中心化vs中心化
2.4.1.1 中心化思想
中心化的设计理念比较简单,分布式集群中的节点按照角色分工,大体上分为两种角色:
- Master的角色主要负责任务分发并监督Slave的健康状态,可以动态的将任务均衡到Slave上,以致Slave节点不至于“忙死”或”闲死”的状态。
- Worker的角色主要负责任务的执行工作并维护和Master的心跳,以便Master可以分配任务给Slave。
中心化思想设计存在的问题:
- 一旦Master出现了问题,则群龙无首,整个集群就会崩溃。为了解决这个问题,大多数Master/Slave架构模式都采用了主备Master的设计方案,可以是热备或者冷备,也可以是自动切换或手动切换,而且越来越多的新系统都开始具备自动选举切换Master的能力,以提升系统的可用性。
- 另外一个问题是如果Scheduler在Master上,虽然可以支持一个DAG中不同的任务运行在不同的机器上,但是会产生Master的过负载。如果Scheduler在Slave上,则一个DAG中所有的任务都只能在某一台机器上进行作业提交,则并行任务比较多的时候,Slave的压力可能会比较大。
2.4.1.2 去中心化
- 在去中心化设计里,通常没有Master/Slave的概念,所有的角色都是一样的,地位是平等的,全球互联网就是一个典型的去中心化的分布式系统,联网的任意节点设备down机,都只会影响很小范围的功能。
- 去中心化设计的核心设计在于整个分布式系统中不存在一个区别于其他节点的”管理者”,因此不存在单点故障问题。但由于不存在” 管理者”节点所以每个节点都需要跟其他节点通信才得到必须要的机器信息,而分布式系统通信的不可靠性,则大大增加了上述功能的实现难度。
- 实际上,真正去中心化的分布式系统并不多见。反而动态中心化分布式系统正在不断涌出。在这种架构下,集群中的管理者是被动态选择出来的,而不是预置的,并且集群在发生故障的时候,集群的节点会自发的举行"会议"来选举新的"管理者"去主持工作。最典型的案例就是ZooKeeper及Go语言实现的Etcd。
- DolphinScheduler的去中心化是Master/Worker注册到Zookeeper中,实现Master集群和Worker集群无中心,并使用Zookeeper分布式锁来选举其中的一台Master或Worker为“管理者”来执行任务。
2.4.3 容错设计
容错分为服务宕机容错和任务重试,服务宕机容错又分为Master容错和Worker容错两种情况
2.4.3.1 宕机容错
服务容错设计依赖于ZooKeeper的Watcher机制,实现原理如图:
其中Master监控其他Master和Worker的目录,如果监听到remove事件,则会根据具体的业务逻辑进行流程实例容错或者任务实例容错。
-
Master容错流程:
- 容错范围:从host的维度来看,Master的容错范围包括:自身host+注册中心上不存在的节点host,容错的整个过程会加锁;
- 容错内容:Master的容错内容包括:容错工作流实例和任务实例,在容错前会比较实例的开始时间和服务节点的启动时间,在服务启动时间之后的则跳过容错;
- 容错后处理:ZooKeeper Master容错完成之后则重新由DolphinScheduler中Scheduler线程调度,遍历 DAG 找到”正在运行”和“提交成功”的任务,对”正在运行”的任务监控其任务实例的状态,对”提交成功”的任务需要判断Task Queue中是否已经存在,如果存在则同样监控任务实例的状态,如果不存在则重新提交任务实例。
-
Worker容错流程:
- 容错范围:从工作流实例的维度看,每个Master只负责容错自己的工作流实例;只有在handleDeadServer时会加锁;
- 容错内容:当发送Worker节点的remove事件时,Master只容错任务实例,在容错前会比较实例的开始时间和服务节点的启动时间,在服务启动时间之后的则跳过容错;
- 容错后处理:Master Scheduler线程一旦发现任务实例为” 需要容错”状态,则接管任务并进行重新提交。
注意:
由于” 网络抖动”可能会使得节点短时间内失去和ZooKeeper的心跳,从而发生节点的remove事件。对于这种情况,我们使用最简单的方式,那就是节点一旦和ZooKeeper发生超时连接,则直接将Master或Worker服务停掉。
2.4.3.2 任务失败重试
这里首先要区分任务失败重试、流程失败恢复、流程失败重跑的概念:
- 任务失败重试是任务级别的,是调度系统自动进行的,比如一个Shell任务设置重试次数为3次,那么在Shell任务运行失败后会自己再最多尝试运行3次
- 流程失败恢复是流程级别的,是手动进行的,恢复是从只能从失败的节点开始执行或从当前节点开始执行
- 流程失败重跑也是流程级别的,是手动进行的,重跑是从开始节点进行
接下来说正题,我们将工作流中的任务节点分了两种类型。
- 一种是业务节点,这种节点都对应一个实际的脚本或者处理语句,比如Shell节点,MR节点、Spark节点、依赖节点等。
- 还有一种是逻辑节点,这种节点不做实际的脚本或语句处理,只是整个流程流转的逻辑处理,比如子流程节等。
每一个业务节点都可以配置失败重试的次数,当该任务节点失败,会自动重试,直到成功或者超过配置的重试次数。逻辑节点不支持失败重试。但是逻辑节点里的任务支持重试。
如果工作流中有任务失败达到最大重试次数,工作流就会失败停止,失败的工作流可以手动进行重跑操作或者流程恢复操作
2.4.4 任务优先级设计
在早期调度设计中,如果没有优先级设计,采用公平调度设计的话,会遇到先行提交的任务可能会和后继提交的任务同时完成的情况,而不能做到设置流程或者任务的优先级,因此我们对此进行了重新设计,目前我们设计如下:
- 按照不同流程实例优先级优先于同一个流程实例优先级优先于同一流程内任务优先级优先于同一流程内任务提交顺序依次从高到低进行任务处理。
- 具体实现是根据任务实例的json解析优先级,然后把流程实例优先级_流程实例id_任务优先级_任务id信息保存在ZooKeeper任务队列中,当从任务队列获取的时候,通过字符串比较即可得出最需要优先执行的任务
- 其中流程定义的优先级是考虑到有些流程需要先于其他流程进行处理,这个可以在流程启动或者定时启动时配置,共有5级,依次为HIGHEST、HIGH、MEDIUM、LOW、LOWEST。如下图
-
任务的优先级也分为5级,依次为HIGHEST、HIGH、MEDIUM、LOW、LOWEST。如下图
2.4.5 Logback和netty实现日志访问
- 由于Web(UI)和Worker不一定在同一台机器上,所以查看日志不能像查询本地文件那样。有两种方案:
- 将日志放到ES搜索引擎上
- 通过netty通信获取远程日志信息
- 介于考虑到尽可能的DolphinScheduler的轻量级性,所以选择了gRPC实现远程访问日志信息。
- 详情可参考Master和Worker的logback配置,如下示例:
taskAppId ${log.base} ${log.base}/${taskAppId}.log [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} [%thread] %logger{96}:[%line] - %messsage%n UTF-8 true 第三章 DolphinScheduler安装部署
3.1 软硬件环境配置建议
DolphinScheduler 作为一款开源分布式工作流任务调度系统,可以很好地部署和运行在 Intel 架构服务器及主流虚拟化环境下,并支持主流的Linux操作系统环境
3.1.1 Linux 操作系统版本要求
操作系统 版本 Red Hat Enterprise Linux 7.0 及以上 CentOS 7.0 及以上 Oracle Enterprise Linux 7.0 及以上 Ubuntu LTS 16.04 及以上 注意: 以上 Linux 操作系统可运行在物理服务器以及 VMware、KVM、XEN 主流虚拟化环境上
3.1.2 服务器建议配置
DolphinScheduler 支持运行在 Intel x86-64 架构的 64 位通用硬件服务器平台。对生产环境的服务器硬件配置有以下建议:
生产环境
CPU 内存 硬盘类型 网络 实例数量 4核+ 8 GB+ SAS 千兆网卡 1+ 注意:
- 以上建议配置为部署 DolphinScheduler 的最低配置,生产环境强烈推荐使用更高的配置
- 硬盘大小配置建议 50GB+ ,系统盘和数据盘分开
3.1.3 网络要求
DolphinScheduler正常运行提供如下的网络端口配置:
组件 默认端口 说明 MasterServer 5678 非通信端口,只需本机端口不冲突即可 WorkerServer 1234 非通信端口,只需本机端口不冲突即可 ApiApplicationServer 12345 提供后端通信端口 注意:
- MasterServer 和 WorkerServer 不需要开启网络间通信,只需本机端口不冲突即可
- 管理员可根据实际环境中 DolphinScheduler 组件部署方案,在网络侧和主机侧开放相关端口
3.1.4 客户端 Web 浏览器要求
DolphinScheduler 推荐 Chrome 以及使用 Chromium 内核的较新版本浏览器访问前端可视化操作界面
3.2 安装部署介绍
DolphinScheduler提供了4种安装部署方式:
- 单机部署(Standalone):Standalone 仅适用于 DolphinScheduler 的快速体验。如果你是新手,想要体验 DolphinScheduler 的功能,推荐使用[Standalone]方式体检。
- 伪集群部署(Pseudo-Cluster):伪集群部署目的是在单台机器部署 DolphinScheduler 服务,该模式下master、worker、api server 都在同一台机器上。如果你想体验更完整的功能,或者更大的任务量,推荐使用伪集群部署。
- 集群部署(Cluster):集群部署目的是在多台机器部署 DolphinScheduler 服务,用于运行大量任务情况。如果你是在生产中使用,推荐使用集群部署或者kubernetes。
- Kubernetes 部署:Kubernetes部署目的是在Kubernetes集群中部署 DolphinScheduler 服务,能调度大量任务,可用于在生产中部署。
注意:
1、Standalone仅建议20个以下工作流使用,因为其采用内存式的H2 Database, Zookeeper Testing Server,任务过多可能导致不稳定,并且如果重启或者停止standalone-server会导致内存中数据库里的数据清空。 如果您要连接外部数据库,比如mysql或者postgresql。
2、Kubernetes部署先决条件:Helm3.1.0+ ;Kubernetes1.12+;PV 供应(需要基础设施支持)
3.3 单机部署
Standalone 仅适用于 DolphinScheduler 的快速体验.
-
二进制包下载
二进制包:在下载页面下载 DolphinScheduler 二进制包
-
前置准备工作
- JDK:下载JDK (1.8+),安装并配置 JAVA_HOME 环境变量,并将其下的 bin 目录追加到 PATH 环境变量中。如果你的环境中已存在,可以跳过这步。
-
解压并启动 DolphinScheduler
二进制压缩包中有 standalone 启动的脚本,解压后即可快速启动。切换到有sudo权限的用户,运行脚本
# 解压并运行 Standalone Server [root@qianfeng01 soft]# cd /opt/soft [root@qianfeng01 soft]# tar -zxvf apache-dolphinscheduler-3.1.4-bin.tar.gz [root@qianfeng01 soft]# cd ./apache-dolphinscheduler-3.1.4-bin [root@qianfeng01 apache-dolphinscheduler-3.1.4-bin]# ./bin/dolphinscheduler-daemon.sh start standalone-server #查询dolphinscheduler的单机服务 [root@qianfeng01 apache-dolphinscheduler-3.1.4-bin]# jps 18688 Jps 18665 StandaloneServer
-
登录 DolphinScheduler
浏览器访问地址 http://qianfeng01:12345/dolphinscheduler/ui 即可登录系统UI。默认的用户名和密码是 admin/dolphinscheduler123
登录成功如下图所示:
-
启停服务
脚本 ./bin/dolphinscheduler-daemon.sh 除了可以快捷启动 standalone 外,还能停止服务运行,全部命令如下
# 启动 Standalone Server 服务 [root@qianfeng01 apache-dolphinscheduler-3.1.4-bin]# ./bin/dolphinscheduler-daemon.sh start standalone-server # 停止 Standalone Server 服务 [root@qianfeng01 apache-dolphinscheduler-3.1.4-bin]# ./bin/dolphinscheduler-daemon.sh stop standalone-server
-
配置数据库
Standalone server 使用 H2 数据库作为其元数据存储数据,这是为了上手简单,用户在启动服务器之前不需要启动数据库。但是如果用户想将元数据库存储在 MySQL 或 PostgreSQL 等其他数据库中,他们必须更改一些配置。请参考 数据源配置 Standalone 切换元数据库 创建并初始化数据库。
单机版到此为止即可。
3.4 集群部署
3.4.1 集群部署规划
集群模式下,可配置多个Master及多个Worker。通常可配置2~3个Master,若干个Worker。由于集群资源有限,此处配置一个Master,三个Worker,集群规划如下。
主机名 ip 服务 备注 qianfeng01 192.168.10.101 master,worker qianfeng02 192.168.10.102 master,worker 该服务器也可以安装master qianfeng03 192.168.10.103 worker 3.4.2 集群准备工作
-
操作系统:linux centos 7.7
-
部署版本:apache-dolphinscheduler-3.1.4-bin
-
JDK:配置Java环境,将JAVA_HOME配置于PATH中,推荐版本使用jdk8+
-
数据库:本文使用的是MySQL 8.0.26版本,也可以使用5.7版本及以上,或者是使用PostgreSQL数据库(8.2.15+),两者任选其一即可,如 MySQL 则需要 JDBC Driver 8.0.16
╭liyadong at ~ ╰$ scp ~/Desktop/mysql-connector-java-8.0.16.jar qianfeng01:/opt/soft
-
注册中心:zookeeper(3.4.6+)
-
其他大数据相关组件:Hadoop 3.3.1 、Hive 3.1.2 、DataX 3.0、Spark 3.1.2等
-
创建部署用户,并为该用户配置免登录,以创建dolphinscheduler用户为例(准备执行DS安装程序的服务器上创建即可)
# 创建用户需使用root登录 useradd dolphinscheduler # 添加密码 echo "dolphinscheduler" | passwd --stdin dolphinscheduler # 配置sudo(系统管理命令)免密 sed -i '$adolphinscheduler ALL=(ALL) NOPASSWD: NOPASSWD: ALL' /etc/sudoers sed -i 's/Defaults requirett/#Defaults requirett/g' /etc/sudoers
注意:
- 因为任务执行服务是以 sudo -u {linux-user} 切换不同 linux 用户的方式来实现多租户运行作业,所以部署用户需要有 sudo 权限,而且是免密的。初学习者不理解的话,完全可以暂时忽略这一点
- 如果发现 /etc/sudoers 文件中有 “Defaults requirett” 这行,也请注释掉
-
配置机器SSH免密登陆
由于安装的时候需要向不同机器发送资源,所以要求各台机器间能实现SSH免密登陆。配置免密登陆的步骤如下
su dolphinscheduler ssh-copy-id qianfeng01 ssh-copy-id qianfeng02 ssh-copy-id qianfeng03
注意:
配置完成后,可以通过运行命令 ssh localhost 判断是否成功,如果不需要输入密码就能ssh登陆则证明成功
-
启动zookeeper集群(所有安装Zookeper的服务器均执行)
# 启动 zookeeper ./bin/zkServer.sh start
3.4.3 初始化数据库
DolphinScheduler 元数据存储在关系型数据库中,目前支持 PostgreSQL 和 MySQL。下面分别介绍如何使用 MySQL 初始化数据库。
-
创建数据库、用户和授权
-- 进入MySQL命令行 [root@qianfeng01 soft]# mysql -uroot -p123456 -- 创建dolphinscheduler的数据库用户和密码,并限定登陆范围 mysql> CREATE USER 'dolphinscheduler'@'%' IDENTIFIED BY 'QF-Dolphinscheduler123!'; -- 创建dolphinscheduler的元数据库,并指定编码 mysql> CREATE DATABASE dolphinscheduler DEFAULT CHARACTER SET utf8 DEFAULT COLLATE utf8_general_ci; -- 为dolphinscheduler数据库授权 mysql> grant all privileges on dolphinscheduler.* to 'dolphinscheduler'@'%' ; -- 刷新权限 mysql> flush privileges;
-
解压安装包
上传DolphinScheduler安装包到qianfeng01节点的/opt/software目录,并解压安装包到该目录。
#上传安装包到服务器 $ scp ~/Desktop/apache-dolphinscheduler-3.1.4-bin.tar.gz qianfeng01:/opt/soft #解压 [root@qianfeng01 soft]# tar -zxvf ./apache-dolphinscheduler-3.1.4-bin.tar.gz # 修改目录权限,使得部署用户对二进制包解压后的 apache-dolphinscheduler-*-bin 目录有操作权限 [root@qianfeng01 soft]# chown -R dolphinscheduler:dolphinscheduler apache-dolphinscheduler-3.1.4-bin
-
添加MySQL驱动
需要手动下载 对应的mysql-connector-java驱动(8.0.16)并移动到 DolphinScheduler 的每个模块的 libs 目录下,其中包括 api-server/libs 和 alert-server/libs 和 master-server/libs 和 worker-server/libs和tools/libs
#复制mysql的驱动到对应libs目录中 [root@qianfeng01 soft]# cp ./mysql-connector-java-8.0.16.jar ./apache-dolphinscheduler-3.1.4-bin/alert-server/libs/ [root@qianfeng01 soft]# cp ./mysql-connector-java-8.0.16.jar ./apache-dolphinscheduler-3.1.4-bin/master-server/libs/ [root@qianfeng01 soft]# cp ./mysql-connector-java-8.0.16.jar ./apache-dolphinscheduler-3.1.4-bin/worker-server/libs/ [root@qianfeng01 soft]# cp ./mysql-connector-java-8.0.16.jar ./apache-dolphinscheduler-3.1.4-bin/api-server/libs/ [root@qianfeng01 soft]# cp ./mysql-connector-java-8.0.16.jar ./apache-dolphinscheduler-3.1.4-bin/tools/libs/
-
修改dolphinscheduler_env.sh配置文件
文件 ./bin/env/dolphinscheduler_env.sh 描述了下列配置:
- DolphinScheduler 的数据库配置,将username和password改成你在上一步中设置的用户名和密码
- 一些任务类型外部依赖路径或库文件,如 JAVA_HOME 和 SPARK_HOME都是在这里定义的
- 注册中心zookeeper
- 服务端相关配置,比如缓存,时区设置等
如果您不使用某些任务类型,您可以忽略任务外部依赖项,但您必须根据您的环境更改 JAVA_HOME、注册中心和数据库相关配置。
#修改内容如下 [root@qianfeng01 soft]# cd ./apache-dolphinscheduler-3.1.4-bin [root@qianfeng01 apache-dolphinscheduler-3.1.4-bin]# vim ./bin/env/dolphinscheduler_env.sh # JAVA_HOME, will use it to start DolphinScheduler server # 改为自己的JDK路径 export JAVA_HOME=${JAVA_HOME:-/usr/local/jdk1.8.0_321} # Database related configuration, set database type, username and password # MySQL数据库连接信息 export DATABASE=${DATABASE:-mysql} export SPRING_PROFILES_ACTIVE=${DATABASE} export SPRING_DATASOURCE_URL="jdbc:mysql://qianfeng01:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8&useSSL=false&allowPublicKeyRetrieval=true" export SPRING_DATASOURCE_USERNAME=${SPRING_DATASOURCE_USERNAME:-"dolphinscheduler"} export SPRING_DATASOURCE_PASSWORD=${SPRING_DATASOURCE_PASSWORD:-"QF-Dolphinscheduler123!"} # DolphinScheduler server related configuration # 不用修改 export SPRING_CACHE_TYPE=${SPRING_CACHE_TYPE:-none} export SPRING_JACKSON_TIME_ZONE=${SPRING_JACKSON_TIME_ZONE:-UTC} export MASTER_FETCH_COMMAND_NUM=${MASTER_FETCH_COMMAND_NUM:-10} # Registry center configuration, determines the type and link of the registry center # zookeeper集群信息 export REGISTRY_TYPE=${REGISTRY_TYPE:-zookeeper} export REGISTRY_ZOOKEEPER_CONNECT_STRING=${REGISTRY_ZOOKEEPER_CONNECT_STRING:-qianfeng01:2181,qianfeng02:2181,qianfeng03:2181} # Tasks related configurations, need to change the configuration if you use the related tasks. # 对已有可以正常配置,没有的保持默认即可 export HADOOP_HOME=${HADOOP_HOME:-/usr/local/hadoop-3.3.1} export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/usr/local/hadoop-3.3.1/etc/hadoop} export SPARK_HOME1=${SPARK_HOME1:-/usr/local/spark-3.1.2} export SPARK_HOME2=${SPARK_HOME2:-/usr/local/spark-3.1.2} export PYTHON_HOME=${PYTHON_HOME:-/usr/bin/python} export HIVE_HOME=${HIVE_HOME:-/usr/local/hive-3.1.2} export FLINK_HOME=${FLINK_HOME:-/usr/local/flink-1.14.3} export DATAX_HOME=${DATAX_HOME:-/usr/local/datax} export SEATUNNEL_HOME=${SEATUNNEL_HOME:-/opt/soft/seatunnel} export CHUNJUN_HOME=${CHUNJUN_HOME:-/opt/soft/chunjun} export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$FLINK_HOME/bin:$DATAX_HOME/bin:$PATH
-
初始化元数据
# 切换到apache-dolphinscheduler-3.1.4-bin目录下,执行命令 [root@qianfeng01 apache-dolphinscheduler-3.1.4-bin]# sh ./tools/bin/upgrade-schema.sh
3.4.4 修改安装环境配置
完成基础环境和元数据库初始化的准备后,需要根据你的机器环境修改配置文件。配置文件可以在目录 bin/env 中找到,他们分别是 install_env.sh 和 dolphinscheduler_env.sh。
-
修改 install_env.sh 文件
文件 install_env.sh 描述了哪些机器将被安装 DolphinScheduler 以及每台机器对应安装哪些服务。您可以在路径 bin/env/install_env.sh 中找到此文件,可通过以下方式更改env变量,export
=,配置详情如下。 #修改配置如下 [root@qianfeng01 apache-dolphinscheduler-3.1.4-bin]# vim ./bin/env/install_env.sh # --------------------------------------------------------- # INSTALL MACHINE # --------------------------------------------------------- # Due to the master, worker, and API server being deployed on a single node, the IP of the server is the machine IP or localhost ips="qianfeng01,qianfeng02,qianfeng03" sshPort="22" # masters可以是一台,也可以是多台,根据自己机器配置决定 masters="qianfeng01,qianfeng02" workers="qianfeng01:default,qianfeng02:default,qianfeng03:default" alertServer="qianfeng03" apiServers="qianfeng01" # DolphinScheduler installation path, it will auto-create if not exists # 最终的安装路径 installPath=${installPath:-"/usr/local/dolphinscheduler-3.1.4"} # Deploy user, use the user you create in section **Configure machine SSH password-free login** deployUser="dolphinscheduler" # The root of zookeeper, for now DolphinScheduler default registry server is zookeeper. zkRoot=${zkRoot:-"/dolphinscheduler"}
3.4.5 安装DolphinScheduler
使用上面创建的部署用户运行以下命令完成部署,部署后的运行日志将存放在 安装目录下的logs 文件夹内
[root@qianfeng01 apache-dolphinscheduler-3.1.4-bin]# sh ./bin/install.sh
注意:
第一次部署的话,可能出现 5 次sh: bin/dolphinscheduler-daemon.sh: No such file or directory相关信息,为非重要信息直接忽略即可。
3.4.6 启停命令
第一次安装后会自动启动所有服务的,如有服务问题或者后续需要启停,命令如下。下面的操作脚本都在dolphinScheduler安装目录bin下。
# 一键停止集群所有服务 sh /usr/local/dolphinscheduler-3.1.4/bin/stop-all.sh # 一键开启集群所有服务 sh /usr/local/dolphinscheduler-3.1.4/bin/start-all.sh # 启停 Master sh /usr/local/dolphinscheduler-3.1.4/bin/dolphinscheduler-daemon.sh stop master-server sh /usr/local/dolphinscheduler-3.1.4/bin/dolphinscheduler-daemon.sh start master-server # 启停 Worker sh /usr/local/dolphinscheduler-3.1.4/bin/dolphinscheduler-daemon.sh start worker-server sh /usr/local/dolphinscheduler-3.1.4/bin/dolphinscheduler-daemon.sh stop worker-server # 启停 Api sh /usr/local/dolphinscheduler-3.1.4/bin/dolphinscheduler-daemon.sh start api-server sh /usr/local/dolphinscheduler-3.1.4/bin/dolphinscheduler-daemon.sh stop api-server # 启停 Logger sh /usr/local/dolphinscheduler-3.1.4/bin/dolphinscheduler-daemon.sh start logger-server sh /usr/local/dolphinscheduler-3.1.4/bin/dolphinscheduler-daemon.sh stop logger-server # 启停 Alert sh /usr/local/dolphinscheduler-3.1.4/bin/dolphinscheduler-daemon.sh start alert-server sh /usr/local/dolphinscheduler-3.1.4/bin/dolphinscheduler-daemon.sh stop alert-server # 启停 Python Gateway sh /usr/local/dolphinscheduler-3.1.4/bin/dolphinscheduler-daemon.sh start python-gateway-server sh /usr/local/dolphinscheduler-3.1.4/bin/dolphinscheduler-daemon.sh stop python-gateway-server
注意:
-
每个服务在路径
/conf/dolphinscheduler_env.sh 中都有 dolphinscheduler_env.sh 文件,这是可以为微 服务需求提供便利。意味着您可以基于不同的环境变量来启动各个服务,只需要在对应服务中配置 /conf/dolphinscheduler_env.sh 然后通过 /bin/start.sh 命令启动即可。但是如果您使用命令 /bin/dolphinscheduler-daemon.sh start 启动服务器,它将会用文件 bin/env/dolphinscheduler_env.sh 覆盖 /conf/dolphinscheduler_env.sh 然后启动服务,目的是为了减少用户修改配置的成本. -
服务用途请具体参见《系统架构设计》小节。Python gateway service 默认与 api-server 一起启动,如果您不想启动 Python gateway service 请通过更改 api-server 配置文件 api-server/conf/application.yaml 中的 python-gateway.enabled : false 来禁用它。
3.4.7 登录 DolphinScheduler
浏览器访问地址 http://qianfeng01:12345/dolphinscheduler/ui 即可登录系统UI。默认的用户名和密码是 admin/dolphinscheduler123
登录如下图所示:
去监控中心查看Masters,如下图所示:
去监控中心查看workers,如下图所示:
到此为止,DolphinScheduler集群安装部署就完成了。
3.5 DolphinScheduler Web UI介绍
第四章 DolphinScheduler功能应用
4.1 项目管理
4.1.1 项目创建
点击顶部导航项目管理--->项目创建--->填写如下信息
4.1.2 项目进入-修改与删除
4.1.3 项目首页
在项目列表中—>点击项目名称链接—>进入项目首页—>如下图:
项目首页包含该项目的任务状态统计、流程状态统计、工作流定义统计。这几个指标的说明如下:
- 任务状态统计:在指定时间范围内,统计任务实例中状态为提交成功、正在运行、准备暂停、暂停、准备停止、停止、失败、成功、需要容错、kill、等待线程的个数。
- 流程状态统计:在指定时间范围内,统计工作流实例中状态为提交成功、正在运行、准备暂停、暂停、准备停止、停止、失败、成功、需要容错、kill、等待线程的个数。
- 工作流定义统计:统计用户创建的工作流定义及管理员授予该用户的工作流定义。
4.2 工作流定义
4.2.1 创建工作流定义
-
创建工作流执行需要的租户
租户就是操作系统的实际用户,ds需要租户来真正执行。
租户创建完成。
-
点击项目管理->工作流->工作流定义,进入工作流定义页面,点击创建工作流按钮,进入工作流DAG编辑页面,如下图所示:
-
工具栏中拖拽 到画板中,新增一个Shell任务,如下图所示:
-
添加 Shell 任务的参数设置:
- 填写节点名称,描述,脚本字段;
- 运行标志勾选正常,若勾选禁止执行,运行工作流不会执行该任务;
- 选择任务优先级:当 worker 线程数不足时,级别高的任务在执行队列中会优先执行,相同优先级的任务按照先进先出的顺序执行;
- 超时告警(非必选):勾选超时告警、超时失败,填写超时时长,当任务执行时间超过超时时长,会发送告警邮件并且任务超时失败;
- 资源(非必选):资源文件是资源中心->文件管理页面创建或上传的文件,如文件名为 test.sh,脚本中调用资源命令为 sh test.sh。注意调用需要使用资源的全路径;
- 自定义参数(非必填);
- 点击确定按钮,保存任务设置。
-
配置任务之间的依赖关系: 点击任务节点的右侧加号连接任务;如下图所示,任务 Node_B 和任务 Node_C 并行执行,当任务 Node_A 执行完,任务 Node_B、Node_C 会同时执行。
==注:==上图中Node_B和Node_C是根据Node_A创建方式创建即可,后续的不同类型的工作流的创建也不在一步一步创建。
-
保存工作流定义: 点击画布右上角的保存按钮,弹出基本信息弹框,如下图所示,输入工作流定义名称,工作流定义描述,设置全局变量(选填,参考全局参数),点击确定按钮,工作流定义创建成功。
其他类型任务,请参考 任务节点类型和参数设置。
- 执行策略
- 并行:如果对于同一个工作流定义,同时有多个工作流实例,则并行执行工作流实例。
- 串行等待:如果对于同一个工作流定义,同时有多个工作流实例,则并行执行工作流实例。
- 串行抛弃:如果对于同一个工作流定义,同时有多个工作流实例,则抛弃后生成的工作流实例并杀掉正在跑的实例。
- 串行优先:如果对于同一个工作流定义,同时有多个工作流实例,则按照优先级串行执行工作流实例。
-
实时任务的依赖关系: 若DAG中包含了实时任务的组件,则实时任务的关联关系显示为虚线,在执行工作流实例的时候会跳过实时任务的执行。
-
**删除任务的依赖关系:**进入DAG图中 ,选中连接线,点击右上角删除图标,删除任务间的依赖关系。
4.2.2 工作流定义操作功能
-
点击项目管理->工作流->工作流定义,进入工作流定义页面,如下图所示:
工作流定义列表的操作功能如下:
-
编辑: 只能编辑下线的工作流定义。工作流DAG编辑同创建工作流定义。
-
上线: 工作流状态为下线时,上线工作流,只有上线状态的工作流能运行,但不能编辑。
-
下线: 工作流状态为上线时,下线工作流,下线状态的工作流可以编辑,但不能运行。
-
运行: 只有上线的工作流能运行。运行操作步骤见运行工作流
-
定时: 只有上线的工作流能设置定时,系统自动定时调度工作流运行。创建定时后的状态为"下线",需在定时管理页面上线定时才生效。定时操作步骤见工作流定时
-
定时管理: 定时管理页面可编辑、上线/下线、删除定时。
-
删除: 删除工作流定义。在同一个项目中,只能删除自己创建的工作流定义,其他用户的工作流定义不能进行删除,如果需要删除请联系创建用户或者管理员。
-
下载: 下载工作流定义到本地。
-
树形图: 以树形结构展示任务节点的类型及任务状态,如下图所示:
4.2.3 运行工作流
-
点击项目管理->工作流->工作流定义,进入工作流定义页面,如下图所示,点击上线按钮,上线工作流。
-
点击运行按钮,弹出启动参数设置弹框,如下图所示,设置启动参数,点击弹框中的运行按钮,工作流开始运行,工作流实例页面生成一条工作流实例。
工作流运行参数说明:
-
失败策略:当某一个任务节点执行失败时,其他并行的任务节点需要执行的策略。继续表示:某一任务失败后,其他任务节点正常执行;结束表示:终止所有正在执行的任务,并终止整个流程。
-
通知策略:当流程结束,根据流程状态发送流程执行信息通知邮件,包含任何状态都不发,成功发,失败发,成功或失败都发。
-
流程优先级:流程运行的优先级,分五个等级:最高(HIGHEST),高(HIGH),中(MEDIUM),低(LOW),最低(LOWEST)。当 master 线程数不足时,级别高的流程在执行队列中会优先执行,相同优先级的流程按照先进先出的顺序执行。
-
Worker 分组:该流程只能在指定的 worker 机器组里执行。默认是 Default,可以在任一 worker 上执行。
-
通知组:选择通知策略||超时报警||发生容错时,会发送流程信息或邮件到通知组里的所有成员。
-
启动参数: 在启动新的流程实例时,设置或覆盖全局参数的值。
-
补数:指运行指定日期范围内的工作流定义,根据补数策略生成对应的工作流实例,补数策略包括串行补数、并行补数 2 种模式。
日期可以通过页面选择或者手动输入,日期范围是左关右关区间(startDate <= N <= endDate)
-
串行补数:指定时间范围内,从开始日期至结束日期依次执行补数,依次生成多条流程实例;点击运行工作流,选择串行补数模式:例如从12月 1号到12月20号依次执行,依次在流程实例页面生成两条流程实例。
调度日期手动输入,在弹出的输入框中输入如下即可:
2022-12-01 00:00:00,2022-12-20 00:00:00
重跑后结果:
-
并行度:是指在并行补数的模式下,最多并行执行的实例数。例如同时执行7月6号到7月10号的工作流定义,并行度为2,那么流程实例为:
补跑结果如下图:
-
依赖模式:是否触发下游依赖节点依赖到当前工作流的工作流实例的补数(要求当前补数的工作流实例的定时状态为已上线,只会触发下游直接依赖到当前工作流的补数)。
调度日期手动输入,在弹出的输入框中输入如下即可:
2022-12-01 00:00:00,2022-12-07 00:00:00
依赖模式补数结果如下:
-
补数与定时配置的关系:
-
未配置定时或已配置定时并定时状态下线:根据所选的时间范围结合定时默认配置(每天0点)进行补数,比如该工作流调度日期为7月7号到7月10号,流程实例为:
-
已配置定时并定时状态上线:根据所选的时间范围结合定时配置进行补数,比如该工作流调度日期为7月7号到7月10号,配置了定时(每日凌晨5点运行),流程实例为:
定时配置好后,选择定时管理:
将工作流的定时上线,如下图:
定时运行后结果如下:
-
-
空跑
空跑状态:
任务实例状态:
但是,任务并没有真正执行。
4.2.4 单独运行任务
-
右键选中任务,点击"启动"按钮(只有已上线的任务才能点击运行)
-
弹出启动参数设置弹框,参数说明同运行工作流
注意:
节点执行类型的选择。
4.2.5 工作流定时
-
创建定时:点击项目管理->工作流->工作流定义,进入工作流定义页面,上线工作流,点击定时按钮,弹出定时参数设置弹框,如下图所示:
-
选择起止时间。在起止时间范围内,定时运行工作流;不在起止时间范围内,不再产生定时工作流实例。
-
添加一个每天5时执行一次的定时,如下图所示:
-
失败策略、通知策略、流程优先级、Worker 分组、通知组、收件人、抄送人同工作流运行参数。
-
点击创建按钮,创建定时成功,此时定时状态为"下线",定时需上线才生效。
-
定时上线:点击定时管理按钮,进入定时管理页面,点击上线按钮,定时状态变为上线,如下图所示,工作流定时生效。
-
工作流定时上线状态
4.2.6 导出工作流
工作流定义列表,选中操作中的导出,如下图:
点击导出后,工作流文件将会被下载到电脑本地。
4.2.7 倒入工作流
点击项目管理->工作流->工作流定义,进入工作流定义页面,点击导入工作流按钮,导入本地工作流文件,工作流定义列表显示导入的工作流,状态为下线。
到此为止,工作流相关的操作就结束了。
工作流复制
复制完成的结果:
注意
复制不包含上下线的状态。
4.3 工作流实例
4.3.1 查看工作流实例
-
点击项目管理->工作流->工作流实例,进入工作流实例页面,如下图所示:
-
点击工作流名称链接,进入DAG查看页面,查看任务执行状态,如下图所示。
4.3.2 查看任务日志
-
进入工作流实例页面,点击工作流名称,进入DAG查看页面,双击任务节点,如下图所示:
-
点击"查看日志",弹出日志弹框就是任务日志信息,如下图所示。任务实例页面也可查看任务日志,参考任务查看日志。
4.3.3 查看任务历史记录
- 点击项目管理->工作流->工作流实例,进入工作流实例页面,点击工作流名称,进入工作流 DAG 页面;
- 双击任务节点,如下图所示,点击"查看历史",跳转到任务实例页面,并展示该工作流实例运行的任务实例列表
4.3.4 查看运行参数
-
点击项目管理->工作流->工作流实例,进入工作流实例页面,点击工作流名称,进入工作流 DAG 页面;
-
点击左上角查看变量,查看工作流实例的全局参数和局部参数;点击左上角启动参数,查看工作流实例的启动参数,如下图所示
4.3.5 工作流实例操作功能
点击项目管理->工作流->工作流实例,进入工作流实例页面,如下图所示:
-
编辑: 只能编辑成功/失败/停止 状态的流程。点击编辑按钮或工作流实例名称进入 DAG 编辑页面,编辑后点击保存按钮,弹出保存 DAG 弹框,如下图所示,修改流程定义信息,在弹框中勾选是否更新工作流定义,保存后则将实例修改的信息更新到工作流定义;若不勾选,则不更新工作流定义。
-
重跑: 重新执行已经终止的流程。
-
恢复失败: 针对失败的流程,可以执行恢复失败操作,从失败的节点开始执行。
-
停止: 对正在运行的流程进行停止操作,后台会先 kill worker 进程,再执行 kill -9 操作。
-
暂停: 对正在运行的流程进行暂停操作,系统状态变为等待执行,会等待正在执行的任务结束,暂停下一个要执行的任务。
-
恢复暂停: 对暂停的流程恢复,直接从暂停的节点开始运行。
-
删除: 删除工作流实例及工作流实例下的任务实例。
-
甘特图: Gantt 图纵轴是某个工作流实例下的任务实例的拓扑排序,横轴是任务实例的运行时间。
4.4 任务定义
4.4.1 批量任务定义
批量任务定义允许您在基于任务级别而不是在工作流中操作修改任务。再此之前,我们已经有了工作流级别的任务编辑器,你可以在工作流定义 单击特定的工作流,然后编辑任务的定义。当您想编辑特定的任务定义但不记得它属于哪个工作流时,这是令人沮丧的。所以我们决定在 任务 菜单下添加 任务定义 视图。
在该视图中:
- 您可以通过单击 操作 列中的相关按钮来进行编辑、移动、查看版本和删除任务。
- 您也可以通过任务列表右上角的通配符进行全部任务查询,当您只 记得任务名称但忘记它属于哪个工作流时是非常有用的。也支持通过任务名称结合使用 任务类型 或 工作流程名称 进行查询。
4.4.2 创建任务
项目管理->任务->任务定义->批量任务->创建任务,创建任务试图如下所示:
注意:
上线状态的工作流是不能定义,即你定义也不能保存。
4.4.3 实时任务定义
实时任务是在工作流定义中创建Flink_Stream类型的工作流定义,则会在实时任务列表中显示。
-
实时任务定义在工作流定义中创建,在任务定义页面可以进行修改和执行。
-
点击实时任务执行,检查执行参数后点击确认,即可提交实时任务。
4.5 任务实例
4.5.1 批量任务实例
-
点击项目管理->工作流->任务实例,进入任务实例页面,如下图所示,点击工作流实例名称,可跳转到工作流实例DAG图查看任务状态。
4.5.2 实时任务实例
-
切换到实时任务实例页面,如下图所示:
-
SavePoint:点击操作列中的SavePoint按钮,可以进行实时任务的SavePoint。
-
Stop:点击操作列中的Stop按钮,可以停止该实时任务。
4.6 数据源中心
- MySQL
- PostgreSQL
- HIVE
- Spark
- Amazon Athena
- ClickHouse
- 等等
4.6.1 MySQL数据源
点击顶部导航数据源中心--->创建数据源--->数据源选择MySQL--->填写如下信息--->测试连接--->成功--->确定
- 数据源:选择 MYSQL
- 数据源名称:输入数据源的名称
- 描述:输入数据源的描述
- IP 主机名:输入连接 MySQL 的 IP
- 端口:输入连接 MySQL 的端口
- 用户名:设置连接 MySQL 的用户名
- 密码:设置连接 MySQL 的密码
- 数据库名:输入连接 MySQL 的数据库名称
- Jdbc 连接参数:用于 MySQL 连接的参数设置,以 JSON 形式填写
注意:
数据源中心支持列出、查看、修改、删除等操作。
4.6.2 Hive数据源
点击顶部导航数据源中心--->创建数据源--->数据源选择HIVE/UMPALA--->写如下信息--->测试连接--->成功--->确定
- 数据源:选择 HIVE/IMPALA
- 数据源名称:输入数据源的名称
- 描述:输入数据源的描述
- IP 主机名:输入连接 HIVE 的 IP
- 端口:输入连接 HIVE 的端口
- 用户名:设置连接 HIVE 的用户名,默认是系统的登录用户的用户名
- 密码:设置连接 HIVE 的密码,与系统登录用户匹配的密码
- 数据库名:输入连接 HIVE 的数据库名称
- Jdbc 连接参数:用于 HIVE 连接的参数设置,以 JSON 形式填写
注意:如果您希望在同一个会话中执行多个 HIVE SQL,您可以修改配置文件 common.properties 中的配置,设置 support.hive.oneSession = true。 这对运行 HIVE SQL 前设置环境变量的场景会很有帮助。参数 support.hive.oneSession 默认值为 false,多条 SQL 将在不同的会话中运行。
Hiveserver2连接错误
通常是在使用hiveserver2或者dolphinscheduler连接hive(hiveserver2服务)的时候会报错如下:
2023-04-09 00:05:47,328 WARN [main] jdbc.HiveConnection (HiveConnection.java:
(237)) - Failed to connect to qianfeng01:10000 Error: Could not open client transport with JDBC Uri: jdbc:hive2://qianfeng01:10000: Failed to open new session: java.lang.RuntimeException: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.authorize.AuthorizationException): User: root is not allowed to impersonate root (state=08S01,code=0) 解决办法:
- 配置hadoop安装目录下的etc/hadoop/core-site.xml文件(每台安装hadoop的服务器都需要配置),追加如下内容:
hadoop.proxyuser.root.hosts * hadoop.proxyuser.root.groups * 注意:这里登陆的是root用户。若登录的是hadoop用户,则配置文件中的root需要修改为hadoop
当上述文件配置不正确时会可能会引起beeline连接Hive时报如下异常
User: hadoop is not allowed to impersonate root), serverProtocolVersion:null) (state=08S01,code=0)
先部署安装使用的是hadoop用户,故而需要修改core-site.xml配置文件
- 将Hadoop停止,并重启集群
- 停止hive的metastore和hiveserver2服务,并重启
- 再次使用hive的beeline工具连接即可
4.6.3 Spark数据源
点击顶部导航数据源中心--->创建数据源--->数据源选择SPARK--->写如下信息--->测试连接--->成功--->确定
- 数据源:选择 Spark
- 数据源名称:输入数据源的名称
- 描述:输入数据源的描述
- IP/主机名:输入连接Spark的IP
- 端口:输入连接Spark的端口
- 用户名:设置连接Spark的用户名
- 密码:设置连接Spark的密码
- 数据库名:输入连接Spark的数据库名称
- Jdbc连接参数:用于Spark连接的参数设置,以JSON形式填写
注意:如果开启了kerberos,则需要填写 Principal
4.7 资源中心
4.7.1 资源中心简介
资源中心通常用于上传文件、UDF 函数和任务组管理。那资源上传到那儿呢?具体如下:
-
对于 Standalone 环境,可以选择本地文件目录作为上传文件夹(此操作不需要Hadoop部署)。
-
当然,你也可以 选择上传到 Hadoop 或者 S3 或者阿里云OSS 等。 在这种情况下,您需要有 Hadoop(2.6+)或等相关环境。
4.7.2 资源中心配置
-
如果您以 集群 模式或者 伪集群 模式部署DolphinScheduler,您需要对以下路径的文件进行配置:api-server/conf/common.properties 和 worker-server/conf/common.properties;
-
若您以 单机 模式部署DolphinScheduler,您只需要配置 standalone-server/conf/common.properties,具体配置如下:
- 将 resource.storage.upload.base.path 改为本地存储路径,请确保部署 DolphinScheduler 的用户拥有读写权限,例如:resource.storage.upload.base.path=/tmp/dolphinscheduler。当路径不存在时会自动创建文件夹
- 修改 resource.storage.type=HDFS 和 resource.hdfs.fs.defaultFS=file:///。
注意:如果您不想用默认值作为资源中心的基础路径,请修改resource.storage.upload.base.path的值。
4.7.3 对接分布式或远端存储
当需要使用资源中心进行相关文件的创建或者上传操作时,所有的文件和资源都会被存储在分布式文件系统HDFS或者远端的对象存储,如S3上。所以需要进行以下配置:
-
配置 common.properties 文件
vim api-server/conf/common.properties和worker-server/conf/common.properties文件均修改如下:
# user data local directory path, please make sure the directory exists and have read write permissions data.basedir.path=/opt/install/dolphinscheduler-3.1.4/data # resource view suffixs #resource.view.suffixs=txt,log,sh,bat,conf,cfg,py,java,sql,xml,hql,properties,json,yml,yaml,ini,js # resource storage type: HDFS, S3, OSS, NONE resource.storage.type=HDFS # resource store on HDFS/S3 path, resource file will store to this base path, self configuration, please make sure the directory exists on hdfs and have read write permissions. "/dolphinscheduler" is recommended resource.storage.upload.base.path=/dolphinscheduler # if resource.storage.type=HDFS, the user must have the permission to create directories under the HDFS root path resource.hdfs.root.user=hdfs # if resource.storage.type=S3, the value like: s3a://dolphinscheduler; if resource.storage.type=HDFS and namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir resource.hdfs.fs.defaultFS=hdfs://qianfeng01:9000 # resourcemanager port, the default value is 8088 if not specified resource.manager.httpaddress.port=8088 # if resourcemanager HA is enabled, please set the HA IPs; if resourcemanager is single, keep this value empty yarn.resourcemanager.ha.rm.ids= # if resourcemanager HA is enabled or not use resourcemanager, please keep the default value; If resourcemanager is single, you only need to replace ds1 to actual resourcemanager hostname yarn.application.status.address=http://qianfeng01:%s/ws/v1/cluster/apps/%s # job history status url when application number threshold is reached(default 10000, maybe it was set to 1000) yarn.job.history.status.address=http://qianfeng01:19888/ws/v1/history/mapreduce/jobs/%s
-
将修改的文件分发到其它服务器
#将./api-server/conf/common.properties分发到02和03服务器 [root@qianfeng01 dolphinscheduler-3.1.4]# scp ./api-server/conf/common.properties qianfeng02:/usr/local/dolphinscheduler-3.1.4/api-server/conf/ [root@qianfeng01 dolphinscheduler-3.1.4]# scp ./api-server/conf/common.properties qianfeng03:/usr/local/dolphinscheduler-3.1.4/api-server/conf/ #将./worker-server/conf/common.properties分发到02和03服务器 [root@qianfeng01 dolphinscheduler-3.1.4]# scp ./worker-server/conf/common.properties qianfeng02:/usr/local/dolphinscheduler-3.1.4/worker-server/conf/ [root@qianfeng01 dolphinscheduler-3.1.4]# scp ./worker-server/conf/common.properties qianfeng03:/usr/local/dolphinscheduler-3.1.4/worker-server/conf/
-
如Hadoop为HA,则需将Hadoop的core-site.xml和hdfs-site.xml拷贝到worker-server/conf 以及 api-server/conf中
#本地拷贝到api-server/conf/ [root@bj-zjk-001 dolphinscheduler-3.1.4]# cp /etc/hadoop/conf/core-site.xml /etc/hadoop/conf/hdfs-site.xml /opt/install/dolphinscheduler-3.1.4/api-server/conf/ #远程分发到02和03服务器的api-server/conf/ [root@bj-zjk-001 dolphinscheduler-3.1.4]# scp /etc/hadoop/conf/core-site.xml /etc/hadoop/conf/hdfs-site.xml bj-zjk-002:/opt/install/dolphinscheduler-3.1.4/api-server/conf/ [root@bj-zjk-001 dolphinscheduler-3.1.4]# scp /etc/hadoop/conf/core-site.xml /etc/hadoop/conf/hdfs-site.xml bj-zjk-003:/opt/install/dolphinscheduler-3.1.4/api-server/conf/ #拷贝到worker-server/conf/,一定要有该步骤,否则,将资源中心的资源应用到工作流就会报空指针错误 [root@bj-zjk-001 dolphinscheduler-3.1.4]# cp /etc/hadoop/conf/core-site.xml /etc/hadoop/conf/hdfs-site.xml /opt/install/dolphinscheduler-3.1.4/worker-server/conf/ #远程分发到02和03服务器的worker-server/conf/ [root@bj-zjk-001 dolphinscheduler-3.1.4]# scp /etc/hadoop/conf/core-site.xml /etc/hadoop/conf/hdfs-site.xml bj-zjk-002:/opt/install/dolphinscheduler-3.1.4/worker-server/conf/ [root@bj-zjk-001 dolphinscheduler-3.1.4]# scp /etc/hadoop/conf/core-site.xml /etc/hadoop/conf/hdfs-site.xml bj-zjk-003:/opt/install/dolphinscheduler-3.1.4/worker-server/conf/
-
创建对应目录
#在HDFS上创建目录 [root@bj-zjk-001 dolphinscheduler-3.1.4]# hdfs dfs -mkdir /dolphinscheduler #本地每一台服务器创建目录 [root@bj-zjk-001 ~]# mkdir /opt/install/dolphinscheduler-3.1.4/data [root@bj-zjk-002 ~]# mkdir /opt/install/dolphinscheduler-3.1.4/data [root@bj-zjk-003 ~]# mkdir /opt/install/dolphinscheduler-3.1.4/data
-
重新启动DolphinScheduler服务
#停止ds服务 [root@bj-zjk-001 dolphinscheduler-3.1.4]# sh /opt/install/dolphinscheduler-3.1.4/bin/stop-all.sh #启动ds服务 [root@bj-zjk-001 dolphinscheduler-3.1.4]# sh /opt/install/dolphinscheduler-3.1.4/bin/start-all.sh
注意:
- 如果只配置了 api-server/conf/common.properties 的文件,则只是开启了资源上传的操作,并不能满足正常使用。如果想要在工作流中执行相关文件则需要额外配置 worker-server/conf/common.properties。
- 如果用到资源上传的功能,那么安装部署中,部署用户需要有这部分的操作权限。
- 如果 Hadoop 集群的 NameNode 配置了 HA 的话,需要开启 HDFS 类型的资源上传,同时需要将 Hadoop 集群下的 core-site.xml 和 hdfs-site.xml 复制到 worker-server/conf 以及 api-server/conf,非 NameNode HA 跳过次步骤。
4.7.4 文件管理
当在调度过程中需要使用到第三方的 jar 或者用户需要自定义脚本的情况,可以通过在该页面完成相关操作。可创建的文件类型包括:txt/log/sh/conf/py/java 等。并且可以对文件进行编辑、重命名、下载和删除等操作。
注意:
-
当您以admin身份等入并操作文件时,需要先给admin设置租户
-
基础操作
-
创建文件夹
-
创建文件
文件格式支持以下几种类型:txt、log、sh、conf、cfg、py、java、sql、xml、hql、properties。选择创建文件->如下图所示:
-
上传文件
注意:
- 上传、创建、重命名文件时,文件名和源文件名(上传时)均不能带有 . 以及 / 特殊符号。
- 上述案例中,关于删除、查看、重命名文件都不再掩饰。
-
任务案例
该案例主要通过上传一个简单的 shell 脚本,来演示如何在工作流定义中使用资源中心的文件。像 MR、Spark 等任务需要用到 jar 包或者py文件,也是同理。
在项目管理的工作流定义模块,创建一个新的工作流,使用 Shell 任务。
-
脚本:sh rs_test.sh
-
资源:选择 rs_test.sh
-
保存,然后上线,测试该工作流,测试结果如下:
注意:
- 脚本中选择资源文件时文件名称需要保持和所选择资源全路径一致: 例如:资源路径为/resource/hello.sh 则脚本中调用需要使用/resource/hello.sh全路径
4.8 任务类型
DolphinScheduler任务插件有一些公共参数,我们将这些公共参数列在文档中供您查阅。每种任务都有如下的所有或者部分默认参数:
任务参数 描述 任务名称 任务的名称,同一个工作流定义中的节点名称不能重复。 运行标志 标识这个节点是否需要调度执行,如果不需要执行,可以打开禁止执行开关。 描述 当前节点的功能描述。 任务优先级 worker线程数不足时,根据优先级从高到低依次执行任务,优先级一样时根据先到先得原则执行。 Worker分组 设置分组后,任务会被分配给worker组的机器机执行。若选择Default,则会随机选择一个worker执行。 任务组名称 任务资源组,未配置则不生效。 组内优先级 一个任务组内此任务的优先级。 环境名称 配置任务执行的环境。 失败重试次数 任务失败重新提交的次数,可以在下拉菜单中选择或者手动填充。 失败重试间隔 任务失败重新提交任务的时间间隔,可以在下拉菜单中选择或者手动填充。 CPU 配额 为执行的任务分配指定的CPU时间配额,单位为百分比,默认-1代表不限制,例如1个核心的CPU满载是100%,16个核心的是1600%。 task.resource.limit.state 最大内存 为执行的任务分配指定的内存大小,超过会触发OOM被Kill同时不会进行自动重试,单位MB,默认-1代表不限制。该功能由 task.resource.limit.state 控制。 超时告警 设置超时告警、超时失败。当任务超过"超时时长"后,会发送告警邮件并且任务执行失败。该功能由 task.resource.limit.state 控制。 资源 任务执行时所需资源文件 前置任务 设置当前任务的前置(上游)任务。 延时执行时间 任务延迟执行的时间,以分为单位 4.8.1 Shell
Shell 任务类型,用于创建 Shell 类型的任务并执行一系列的 Shell 脚本。worker 执行该任务的时候,会生成一个临时 shell 脚本,并使用与租户同名的 linux 用户执行这个脚本。
4.8.1.1 创建任务
- 点击项目管理-项目名称-工作流定义,点击"创建工作流"按钮,进入 DAG 编辑页面。
- 工具栏中拖动 到画板中,即可完成创建。
4.8.1.2 任务参数
- 默认参数说明请参考上述默认参数表格。
- 除上述默认参数,此任务没有其他参数。
4.8.1.3 任务案例
-
简单打印一行文字
该样例模拟了常见的简单任务,这些任务只需要简单的一两行命令就能运行起来。我们以打印一行日志为例,该任务仅会在日志文件中打印一行 “This is a demo of shell task”
-
使用自定义参数
该样例模拟了自定义参数任务,为了更方便的复用已有的任务,或者面对动态的需求时,我们会使用变量保证脚本的复用性。本例中,我们先在自定义脚本 中定义了参数 “param_key”,并将他的值设置为 “param_val”。接着在"脚本"中声明了 echo 命令,将参数 “param_key” 打印了出来。当我们保存 并运行任务后,在日志中会看到将参数 “param_key” 对应的值 “param_val” 打印出来。
上线任务,运行后如下:
注意事项
Shell 任务类型通过解析任务日志是否包含 application_xxx_xxx 的内容来判断是否 Yarn 任务,如果是则会将相应的 application_id 的状态作为当前 Shell 节点的运行状态判断,此时如果操作停止工作流则会 Kill 相应的 application_id
如果 Shell 任务中需要使用到用户自定义的脚本,可通过资源中心来上传对应的文件然后在 Shell 任务中引用他们
4.8.2 SQL
SQL任务类型,用于连接数据库并执行相应SQL。
4.8.2.1 创建任务
- 点击项目管理-项目名称-工作流定义,点击"创建工作流"按钮,进入DAG编辑页面。
- 工具栏中拖动 到画板中,选择需要连接的数据源,即可完成创建。
4.8.2.2 任务参数
-
数据源:选择对应的数据源(需要提前创建,请参考数据源中心)
-
SQL类型:支持查询和非查询两种。
-
查询:支持 DML select 类型的命令,是有结果集返回的,可以指定邮件通知为表格、附件或表格附件三种模板;
-
非查询:支持DDL全部命令 和DML update、delete、insert三种类型的命令;
-
分段执行符号:当提供在数据源不支持一次执行多段SQL语句时,拆分SQL语句的符号来进行多次调用数据源执行方法。
-
例子:
1.当数据源选择Hive数据源时,不需要填写此参数。因为Hive数据源本身支持一次执行多段SQL语句;
2.当数据源选择MySQL数据源时,并且要执行多段SQL语句时,需要填写此参数为分号 ;。因为MySQL数据源不支持一次执行多段SQL语句;
-
SQL参数:输入参数格式为key1=value1;key2=value2…
-
SQL语句:SQL语句
-
UDF函数:对于HIVE类型的数据源,可以引用资源中心中创建的UDF函数,其他类型的数据源暂不支持UDF函数。
-
自定义参数:SQL任务类型,而存储过程是自定义参数顺序,给方法设置值自定义参数类型和数据类型,同存储过程任务类型一样。区别在于SQL任务类型自定义参数会替换SQL语句中${变量}。
-
前置SQL:前置SQL在SQL语句之前执行。
-
后置SQL:后置SQL在SQL语句之后执行。
4.8.2.3 任务样例
-
MySQL的操作
MySQL的操作就完成了。
-
Hive表创建并写入数据案例
该样例向Hive中创建临时表tmp_user并写入一行数据。选择SQL类型为非查询,在创建临时表之前需要确保该表不存在,所以我们使用自定义参数,在每次运行时获取当天时间作为表名后缀,这样这个任务就可以每天运行。创建的表名格式为:tmp_user_{yyyyMMdd}。
上线运行测试,成功如下图所示。
注意:
-
biz_date是自定义参数名称;
-
${system.biz.curdate}是系统内置的当前日期;
-
${biz_date}是获取自定义参数biz_date的值。
-
注意SQL类型的选择,如果是INSERT等操作需要选择非查询类型。
-
为了兼容长会话情况,UDF函数的创建是通过CREATE OR REPLACE语句
查询Hive中结果,登录集群使用hive命令或使用beeline、JDBC、Hue等方式连接apache hive进行查询,查询SQL为select * from tmp_hello_world_{yyyyMMdd},请将{yyyyMMdd}替换为运行当天的日期,查询结果如下图:
同理,去操作MySQL的SQL语句即可。
4.8.3 Hive Cli
使用Hive Cli任务插件创建Hive Cli类型的任务执行SQL脚本语句或者SQL任务文件。 执行任务的worker会通过hive -e命令执行hive SQL脚本语句或者通过hive -f命令执行资源中心中的Hive SQL文件。
4.8.3.1 Hive CLI任务 VS 连接Hive数据源的SQL任务
在DolphinScheduler中,我们有Hive CLI任务插件和使用Hive数据源的SQL插件提供用户在不同场景下使用,您可以根据需要进行选择。
- Hive CLI任务插件直接连接HDFS和Hive Metastore来执行hive类型的任务,所以需要能够访问到对应的服务。 执行任务的worker节点需要有相应的Hive jar包以及Hive和HDFS的配置文件。 但是在生产调度中,Hive CLI任务插件能够提供更可靠的稳定性。
- 使用Hive数据源的SQL插件不需要您在worker节点上有相应的Hive jar包以及Hive和HDFS的配置文件,而且支持 Kerberos认证。 但是在生产调度中,若调度压力很大,使用这种方式可能会遇到HiveServer2服务过载失败等问题。
4.8.3.2 创建任务
- 点击项目管理-项目名称-工作流定义,点击"创建工作流"按钮,进入DAG编辑页面。
- 工具栏中拖动 Hive小蜜蜂Logo 到画板中,即可完成创建。
4.8.3.3 任务参数
任务参数 描述 Hive Cli 任务类型 Hive Cli任务执行方式,可以选择FROM_SCRIPT或者FROM_FILE。 Hive SQL 脚本 手动填入Hive SQL脚本语句。 Hive Cli 选项 Hive Cli的其他选项,如--verbose。 资源 如果您选择FROM_FILE作为Hive Cli任务类型,您需要在资源中选择Hive SQL文件。 4.8.3.4 任务案例
-
创建Hive独有的Worker组
因为我们Hive通常不是每一台服务器都部署得有,所以将装有Hive的服务器放到一个worker分组中,在运行工作流的时候,就会将任务跑到指定worker组的服务器上,就不会导致hive命令或者相关依赖找不到的问题。
worker分组就创建成功了。
-
下面的案例演示了如何使用Hive CLI任务节点执行Hive SQL脚本语句:
上线并运行:
测试结果如下:
-
下面的案例演示了如何使用Hive CLI任务节点从资源中心的Hive SQL
去资源中心创建文件:
创建hive cli的from_file类型的任务:
上线并运行:
运行状态如下图:
具体每个任务的日志,自己看看即可。
4.8.4 Python
Python 任务类型,用于创建 Python 类型的任务并执行一系列的 Python 脚本。worker 执行该任务的时候,会生成一个临时Python脚本, 并使用与租户同名的 Linux 用户执行这个脚本。
4.8.4.1 创建任务
- 点击项目管理-项目名称-工作流定义,点击"创建工作流"按钮,进入DAG编辑页面。
- 工具栏中拖动 到画板中,即可完成创建。
4.8.4.2 任务参数
任务参数 描述 脚本 用户开发的PYTHON程序 自定义参数 是PYTHON局部的用户自定义参数,会替换脚本中以${变量}的内容 4.8.4.3 任务案例
-
使用自定义参数
该样例模拟了自定义参数任务,为了更方便的复用已有的任务,或者面对动态的需求时,我们会使用变量保证脚本的复用性。本例中,我们先在自定义脚本 中定义了参数 “param_key”,并将他的值设置为 “param_val”。接着在"脚本"中使用了 print 函数,将参数 “param_key” 打印了出来。当我们保存 并运行任务后,在日志中会看到将参数 “param_key” 对应的值 “param_val” 打印出来。
print("this is a deamo of python task") print("${param_key}")
将上述任务保存,然后上线,执行如下图:
4.8.5 Spark
Spark 任务类型用于执行 Spark 应用。对于 Spark 节点,Worker 支持两个不同类型的 Spark 命令提交任务:
- spark submit 方式提交任务。
- spark sql 方式提交任务。
4.8.5.1 创建任务
- 点击项目管理 -> 项目名称 -> 工作流定义,点击”创建工作流”按钮,进入 DAG 编辑页面:
- 拖动工具栏的 任务节点到画板中。
4.8.5.2 任务参数
- 默认参数说明请参考DolphinScheduler任务参数附录默认任务参数一栏。
- 程序类型:支持 Java、Scala、Python 和 SQL 四种语言。
- Spark 版本:支持 Spark1 和 Spark2。
- 主函数的 Class:Spark 程序的入口 Main class 的全路径。
- 主程序包:执行 Spark 程序的 jar 包(通过资源中心上传)。
- SQL脚本:Spark sql 运行的 .sql 文件中的 SQL 语句。
- 部署方式:(1) spark submit 支持 yarn-clusetr、yarn-client 和 local 三种模式。 (2) spark sql 支持 yarn-client 和 local 两种模式。
- 任务名称(可选):Spark 程序的名称。
- Driver 核心数:用于设置 Driver 内核数,可根据实际生产环境设置对应的核心数。
- Driver 内存数:用于设置 Driver 内存数,可根据实际生产环境设置对应的内存数。
- Executor 数量:用于设置 Executor 的数量,可根据实际生产环境设置对应的内存数。
- Executor 内存数:用于设置 Executor 内存数,可根据实际生产环境设置对应的内存数。
- 主程序参数:设置 Spark 程序的输入参数,支持自定义参数变量的替换。
- 选项参数:支持 --jar、--files、--archives、--conf 格式。
- 资源:如果其他参数中引用了资源文件,需要在资源中选择指定。
- 自定义参数:是 Spark 局部的用户自定义参数,会替换脚本中以 ${变量} 的内容。
4.5.3 任务案例
-
SparkSQL
本案例为创建一个视图表 terms 并写入三行数据和一个格式为 parquet 的表 wc 并判断该表是否存在。程序类型为 SQL。将视图表 terms 的数据插入到格式为 parquet 的表 wc。
SQL代码:
create or replace temporary view terms as select * from values("spark hadoop hadoop flink flink ds hadoop") as tab(content); create table if not exists wc(word string,cnt int) using parquet options(path="/tmp/wc.parquet"); insert overwrite table wc select term,count(term) from ( select explode(split(content,"")) as term from terms ) group by term ; select * from wc;
运行的时候,确保是否配置了Spark的环境变量;
4.8.6 DataX
DataX 任务类型,用于执行 DataX 程序。对于 DataX 节点,worker 会通过执行 ${DATAX_HOME}/bin/datax.py 来解析传入的 json 文件。
4.8.6.1 创建任务
- 点击项目管理 -> 项目名称 -> 工作流定义,点击“创建工作流”按钮,进入 DAG 编辑页面;
- 拖动工具栏的 任务节点到画板中。
4.8.6.2 任务参数
-
默认参数说明请参考DolphinScheduler任务参数附录。
任务参数 描述 json DataX 同步的 json 配置文件 资源 在使用自定义json中如果集群开启了kerberos认证后,datax读取或者写入hdfs、hbase等插件时需要使用相关的keytab,xml文件等,则可使用改选项。资源中心-文件管理上传或创建的文件 自定义参数 sql 任务类型,而存储过程是自定义参数顺序的给方法设置值自定义参数类型和数据类型同存储过程任务类型一样。区别在于SQL任务类型自定义参数会替换 sql 语句中 ${变量} 数据源 选择抽取数据的数据源 sql 语句 目标库抽取数据的 sql 语句,节点执行时自动解析 sql 查询列名,映射为目标表同步列名,源表和目标表列名不一致时,可以通过列别名(as)转换 目标库 选择数据同步的目标库 目标库前置 前置 sql 在 sql 语句之前执行(目标库执行) 目标库后置 后置 sql 在 sql 语句之后执行(目标库执行) 限流(字节数) 限制查询的字节数 限流(记录数) 限制查询的记录数 4.8.6.3 任务案例
该样例演示为从 MySQL数据导入到 HDFS 中。
-
在 DolphinScheduler 中配置 DataX 环境
若生产环境中要是使用到 DataX 任务类型,则需要先配置好所需的环境。配置文件如下:/opt/install/dolphinscheduler-3.1.4/bin/env/dolphinscheduler_env.sh。
vim /opt/install/dolphinscheduler-3.1.4/bin/env/dolphinscheduler_env.sh
export HADOOP_HOME=${HADOOP_HOME:-/usr/local/hadoop-3.3.1} export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/usr/local/hadoop-3.3.1/etc/hadoop} export SPARK_HOME1=${SPARK_HOME1:-/usr/local/spark-3.1.2} export SPARK_HOME2=${SPARK_HOME2:-/usr/local/spark-3.1.2} export PYTHON_HOME=${PYTHON_HOME:-/usr/bin/python} export HIVE_HOME=${HIVE_HOME:-/usr/local/hive-3.1.2} export FLINK_HOME=${FLINK_HOME:-/usr/local/flink-1.14.3} export DATAX_HOME=${DATAX_HOME:-/usr/local/datax} export SEATUNNEL_HOME=${SEATUNNEL_HOME:-/opt/soft/seatunnel} export CHUNJUN_HOME=${CHUNJUN_HOME:-/opt/soft/chunjun} export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$FLINK_HOME/bin:$DATAX_HOME/bin:$SEATUNNEL_HOME/bin:$CHUNJUN_HOME/bin:$PATH
当环境配置完成之后,需要重启 DolphinScheduler。
-
配置DataX任务
配置从MySQL中将数据抽取到Hive表(对应HDFS目录)中,具体DataX文件如下:
{ "job": { "setting": { "speed": { "channel": 1 } }, "content": [ { "reader": { "name": "mysqlreader", "parameter": { "username": "root", "password": "123456", "connection": [ { "querySql": ["select * from sexinfo where sid > 0;"], "jdbcUrl": ["jdbc:mysql://qianfeng01:3306/test"] } ] } }, "writer": { "name": "hdfswriter", "parameter": { "defaultFS": "hdfs://qianfeng01:9000", "path": "/user/hive/warehouse/sexinfo", "fileName": "sexdata", "column": [ {"name": "sid", "type": "int"}, {"name": "stag", "type": "string"} ], "fileType": "text", "writeMode": "append", "fieldDelimiter": "," } } } ] } }
具体DS中DataX的任务核心配置如下:
注意:
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- 执行策略
-
-
-
-
-
-
-
- 详情可参考Master和Worker的logback配置,如下示例:
-
- 其中流程定义的优先级是考虑到有些流程需要先于其他流程进行处理,这个可以在流程启动或者定时启动时配置,共有5级,依次为HIGHEST、HIGH、MEDIUM、LOW、LOWEST。如下图
- 具体实现是根据任务实例的json解析优先级,然后把流程实例优先级_流程实例id_任务优先级_任务id信息保存在ZooKeeper任务队列中,当从任务队列获取的时候,通过字符串比较即可得出最需要优先执行的任务
- 按照不同流程实例优先级优先于同一个流程实例优先级优先于同一流程内任务优先级优先于同一流程内任务提交顺序依次从高到低进行任务处理。
-
-
-
- 高扩展性: 支持多租户和在线资源管理。支持每天10万个数据任务的稳定运行。
-