Spark笔记
Spark介绍
- Apache Spark 是一个快速、通用、可扩展的大数据处理框架,它提供了分布式数据处理、机器学习和图计算等功能。Spark 最初是由加州大学伯克利分校的AMPLab实验室开发的,于2010年开源,并成为 Apache 软件基金会的顶级项目。
Spark任务架构
- Driver
- Driver 是一个 JVM 进程,负责执行 Spark 任务的 main 方法
- 执行用户提交的代码,创建 SparkContext 或者 SparkSession
- 将用户代码转化为 Spark 任务(Jobs)创建血缘(Lineage),逻辑计划(Logical Plan)和物理计划(Physical Plan)
- 在 Cluster Manager 的辅助下,把 task 任务分发调度出去
- 跟踪任务的执行情况,收集日志
- Spark Context/Session
- 它是由 Spark driver 创建,每个 Spark 应用对应一个
- 程序和集群交互的入口
- 可以连接到 Cluster Manager
- Cluster Manager
- 负责部署整个 Spark 集群
- 包括上面提到的 driver 和 executors
- 具有以下几种常见的部署模式:Standalone、YARN、Mesos、Kubernetes
- Executor
- 一个创建在 worker 节点的进程
- 一个 Executor 有多个 slots(线程)
- 一个 slot 就是一个线程,对应了一个 task
- 可以并发执行多个 tasks
- 负责执行 Spark 任务,把结果返回给 Driver
- 可以将数据缓存到 worker 节点的内存
RDD的五大特性
- RDD由很多partition构成,有多少partition就对应有多少task
- 算子实际上是作用在每一个分区上
- RDD之间有依赖关系,宽依赖和窄依赖,用于切分Stage
- Spark默认是hash分区,ByKey类的算子只能作用在kv格式的rdd上
- Spark为task的计算提供了最佳的计算位置,移动计算而不是移动数据
Spark常用算子
- 转换算子(转换算子并不会触发提交作业,需要由Action算子触发执行 —— 懒执行)
- map
- flatMap
- filter
- union
- groupBy
- groupByKey
- reduceByKey
- sortBy
- join
- 行动算子(该类算子会触发 SparkContext 提交 Job 作业)
- count
- reduce
- foreach
- saveAsTextFile
宽窄依赖
"宽依赖"和"窄依赖"是用来描述RDD之间依赖关系的两个重要概念。这些概念主要涉及到在执行阶段(运行时)中,Spark如何在不同的分区之间进行数据的传递和处理。
- 宽依赖
- 父RDD的每个分区和子RDD的每个分区是一对多的关系,则父RDD和子RDD之间是宽依赖
- 窄依赖:map、union
- 窄依赖
- 父RDD的每个分区和子RDD的每个分区是一一对应的,则父RDD和子RDD之间是窄依赖
- 宽依赖:groupBy、union
Spark缓存和检查点
cache缓存
Spark中对每个RDD执行一个算子操作时,都会重新从源头处计算一遍
如果该RDD被多次使用,则会导致该RDD被重复计算
重复计算,浪费资源,消耗时间,影响整体性能
对多次使用的RDD可以通过cache/persist操作进行缓存
repeatRDD.cache()
默认以仅内存策略对RDD进行缓存
相当于repeatRDD.persist(StorageLevel.MEMORY_ONLY)
缓存级别
级别 使用空间 CPU时间 是否在内存中 是否在磁盘上 备注 MEMORY_ONLY 高 低 是 否 MEMORY_ONLY_2 高 低 是 否 数据存2份 MEMORY_ONLY_SER_2 低 高 是 否 数据序列化,数据存2份 MEMORY_AND_DISK 高 中等 部分 部分 如果数据在内存中放不下,则溢写到磁盘 MEMORY_AND_DISK_2 高 中等 部分 部分 数据存2份 MEMORY_AND_DISK_SER 低 高 部分 部分 MEMORY_AND_DISK_SER_2 低 高 部分 部分 数据存2份 DISK_ONLY 低 高 否 是 DISK_ONLY_2 低 高 否 是 数据存2份 NONE 不缓存 OFF_HEAP 堆外内存 Checkpoint 检查点
Checkpoint 检查点是一种容错容灾机制
- 将某一时刻运行的内存数据和状态进行持久化
- 通常会持久化到磁盘
- 或者是分布式文件系统,例如HDFS
- 在Spark中的使用场景:
- SparkStreaming中进行故障恢复
- 对计算代价过高或者数据链路过长的RDD进行保存
CheckPoint 的执行原理
- 当RDD的job执行完毕后,会从最后一个RDD往前回溯
- 当回溯到某个RDD调用了checkpoint方法后,Spark会启动一个新的job
- 该任务会重新计算该RDD的数据,并持久化到HDFS上
CheckPoint 对比 Cache
Cache Checkpoint 使用场景 重复的RDD进行缓存,提高性能 计算代价大或者数据链路过长的RDD,容错 存储位置 内存(易丢失) 磁盘或HDFS(可靠的文件存储系统) 依赖关系 需要保存RDD之间的血缘依赖关系 切断RDD之间的血缘依赖关系 原理 当每个Partition的数据被计算出来即可保存 等第一个job执行完之后会重新启动一个新任务进行处理 清理策略 Job结束后自动删除 需要手动删除,下一个Driver程序可直接加载 广播变量和累加器
广播变量
-
使用广播变量的原因
算子内部的代码最终会被封装到Task并发送到Executor中执行
如果在算子内部使用了算子外部的变量,变量也会封装到Task中,Task中使用的实际上是外部变量的副本,Task的数量决定了外部变量副本的数量
Task是在Executor中执行的,且Task的数量会远大于Executor的数量
故可将外部变量广播到每个Executor中,减少变量的副本数
进而减少网络中传输的数据量,提升运行效率
-
广播变量介绍
广播变量(Broadcast Variables)是Spark中一种用于高效分发较大数据集给所有工作节点的机制。在分布式计算中,当需要在所有节点上使用相同的只读数据时,可以使用广播变量来减少数据传输开销,提高性能。
通常情况下,Spark会在集群的每个工作节点上复制一份任务的变量。但是,对于相对较大的变量,这样的传输开销可能是昂贵的。广播变量通过在集群上的各个节点上缓存变量的只读副本,从而避免了多次传输相同的数据。
累加器
-
应用场景
- 例如如异常监控,调试,记录符合某特性的数据的数目
-
累加器作用
如果变量不被声明为累加器,那么它被改变时不会在Driver端进行全局汇总
即在分布式运行时每个task运行的只是原始变量的一个副本,并不能改变原始变量的值
- 当这个变量被声明为累加器后,该变量就会有分布式计数的功能
-
广播变量获取流程
- Task会向Executor申请获取广播变量
- 若Executor暂无数据,则Executor首先会向同机架的其他Executor获取
- 若获取不到再向跨机架的Executor获取
- 如果还是获取不到,则向Driver端获取数据
广播变量获取后会优先放入内存中,由BlockManager管理维护
后续Task可直接从MemoryStore中获取使用
BlockManager(Spark的分布式存储系统)
主从结构:BlockManagerMaster/BlockManager(Slave)
- BlockManagerMaster
- 在Driver端启动
- 负责接受Executor上的BlockManager的注册
- 管理BlockManager的元数据信息
- BlockManager
- 在每个Executor中启动
- 负责管理所在节点上的数据
主要构成
- MemoryStore:负责对内存上的数据进行存储和读写
- DiskStore:负责对磁盘上的数据进行存储和读写
- BlockTransferService:负责建立网络连接
- BlockManagerWorker:负责对其他的BlockManager的数据进行读写
BlockManager主要维护以下三类数据
- Cache缓存的数据
- 广播变量和累加器
- Shuffle产生的数据
Spark任务调度和资源申请流程
-
Yarn-client模式
-
在 YARN Client 模式下,spark-submit提交 Spark Job之后,就会提交的本地机器上启动Driver端
-
Driver 启动后会与 ResourceManager (RM)建立通讯并发起启动 ApplicationMaster(AM) 请求
-
RM接收到这个 Job 时,会在集群中选一个合适的 NodeManager (NM)并分配一个 Container(具有计算资源的一个容器),然后启动 ApplicationMaster(初始化SparkContext)
-
AM的功能相当于一个 ExecutorLaucher (Executor启动器),负责向 RM申请 Container 资源 ,RM收到请求后便会与 NM通信,启动 Container
-
AM对RM指定 NM分配的 Container 发出启动 Executor 进程请求
-
Executor进程启动后会向 Driver 反向注册,Executor 全部注册完成后 Driver 开始执行执行 Job 任务
-
Driver 中的 SparkContext 分配 Task 给 Executor 执行,Executor 运行 Task 并向 Driver 汇报运行的状态、进度、以及最终的计算结果;让 Driver 随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务
-
应用程序运行完成后,AM向 RM申请注销并关闭自己。
-
-
Yarn Cluster模式
- 在 YARN Cluster 模式下,Spark 任务提交之后会与 RM建立通讯,并发出申请启动 AM请求
- RM接收到这个 Job 时,会在集群中选一个合适的 NodeManager 并分配一个 Container,然后启动 AM,此时的 AM不仅负责ExecutorLauncher,还兼顾 Driver的作用
- AM启动后向 RM申请资源启动Executor,RM接到 AM的资源申请后会在合适(有资源的情况下)的 NodeManager 中分配 Container
- AM对RM指定 NodeManager 分配的 Container 发出启动 Executor 进程请求
- Executor 进程启动后会向 AM(Driver)反向注册,Executor 全部注册完成后,开始执行执行 Job 任务
- AM中的 SparkContext 分配 Task 给 Executor 执行,Executor 运行 Task 并向AM(Driver)汇报运行状态、进度、以及最终结果;让 AM(Driver)随时掌握各任务的运行状态,从而可在任务失败时重新启动任务
- 应用程序运行完成后,ApplicationMaster 向 ResourceManager 申请注销并关闭自己;
Client模式 对比 Cluster模式
- Client模式
- Driver端在任务提交所在节点本地创建
- 主要用于Job的调试,上线前的测试
- 便于在本地查看日志
- 当多用户同时提交多个任务时,Driver 会与 Executor 进行大量的通信,会占用大量IO,导致网卡流量激增而被SA警告
- Cluster模式
- AM兼顾Driver端的作用,在某个NM中创建
- 适用于任务真正上线
- 由于AM(Driver)端是在任意某个NM中创建,故不会造成单节点流量激增,也不会导致网卡风暴
- 无法直接查看日志,需要通过命令或者在WEB界面查看
Spark优化
- 避免创建重复RDD
- 尽可能复用同一个RDD
- 对于多次使用的RDD进行持久化
- 使用广播变量(算子中使用了外部变量时,默认情况下每个task都会有一个变量副本,使用广播变量时,每个Executor保留一个变量副本,减少了网络传输和内存开销)
- 使用数据序列化,比如Kryo
-
- BlockManagerMaster
-
-
- 将某一时刻运行的内存数据和状态进行持久化
- 宽依赖
- 转换算子(转换算子并不会触发提交作业,需要由Action算子触发执行 —— 懒执行)
- Driver
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章