上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

Spark学习笔记

guduadmin13小时前

Spark笔记

Spark介绍

  • Apache Spark 是一个快速、通用、可扩展的大数据处理框架,它提供了分布式数据处理、机器学习和图计算等功能。Spark 最初是由加州大学伯克利分校的AMPLab实验室开发的,于2010年开源,并成为 Apache 软件基金会的顶级项目。

    Spark任务架构

    • Driver
      • Driver 是一个 JVM 进程,负责执行 Spark 任务的 main 方法
      • 执行用户提交的代码,创建 SparkContext 或者 SparkSession
      • 将用户代码转化为 Spark 任务(Jobs)创建血缘(Lineage),逻辑计划(Logical Plan)和物理计划(Physical Plan)
      • 在 Cluster Manager 的辅助下,把 task 任务分发调度出去
      • 跟踪任务的执行情况,收集日志
      • Spark Context/Session
        • 它是由 Spark driver 创建,每个 Spark 应用对应一个
        • 程序和集群交互的入口
        • 可以连接到 Cluster Manager
        • Cluster Manager
          • 负责部署整个 Spark 集群
          • 包括上面提到的 driver 和 executors
          • 具有以下几种常见的部署模式:Standalone、YARN、Mesos、Kubernetes
          • Executor
            • 一个创建在 worker 节点的进程
            • 一个 Executor 有多个 slots(线程)
            • 一个 slot 就是一个线程,对应了一个 task
            • 可以并发执行多个 tasks
            • 负责执行 Spark 任务,把结果返回给 Driver
            • 可以将数据缓存到 worker 节点的内存

              RDD的五大特性

              • RDD由很多partition构成,有多少partition就对应有多少task
              • 算子实际上是作用在每一个分区上
              • RDD之间有依赖关系,宽依赖和窄依赖,用于切分Stage
              • Spark默认是hash分区,ByKey类的算子只能作用在kv格式的rdd上
              • Spark为task的计算提供了最佳的计算位置,移动计算而不是移动数据

                Spark常用算子

                • 转换算子(转换算子并不会触发提交作业,需要由Action算子触发执行 —— 懒执行)
                  • map
                  • flatMap
                  • filter
                  • union
                  • groupBy
                  • groupByKey
                  • reduceByKey
                  • sortBy
                  • join
                  • 行动算子(该类算子会触发 SparkContext 提交 Job 作业)
                    • count
                    • reduce
                    • foreach
                    • saveAsTextFile

                      宽窄依赖

                      "宽依赖"和"窄依赖"是用来描述RDD之间依赖关系的两个重要概念。这些概念主要涉及到在执行阶段(运行时)中,Spark如何在不同的分区之间进行数据的传递和处理。

                      • 宽依赖
                        • 父RDD的每个分区和子RDD的每个分区是一对多的关系,则父RDD和子RDD之间是宽依赖
                        • 窄依赖:map、union
                        • 窄依赖
                          • 父RDD的每个分区和子RDD的每个分区是一一对应的,则父RDD和子RDD之间是窄依赖
                          • 宽依赖:groupBy、union

                            Spark缓存和检查点

                            cache缓存

                            Spark中对每个RDD执行一个算子操作时,都会重新从源头处计算一遍

                            如果该RDD被多次使用,则会导致该RDD被重复计算

                            重复计算,浪费资源,消耗时间,影响整体性能

                            对多次使用的RDD可以通过cache/persist操作进行缓存

                            repeatRDD.cache()

                            默认以仅内存策略对RDD进行缓存

                            相当于repeatRDD.persist(StorageLevel.MEMORY_ONLY)

                            缓存级别

                            级别使用空间CPU时间是否在内存中是否在磁盘上备注
                            MEMORY_ONLY
                            MEMORY_ONLY_2数据存2份
                            MEMORY_ONLY_SER_2数据序列化,数据存2份
                            MEMORY_AND_DISK中等部分部分如果数据在内存中放不下,则溢写到磁盘
                            MEMORY_AND_DISK_2中等部分部分数据存2份
                            MEMORY_AND_DISK_SER部分部分
                            MEMORY_AND_DISK_SER_2部分部分数据存2份
                            DISK_ONLY
                            DISK_ONLY_2数据存2份
                            NONE不缓存
                            OFF_HEAP堆外内存

                            Checkpoint 检查点

                            Checkpoint 检查点是一种容错容灾机制

                            • 将某一时刻运行的内存数据和状态进行持久化
                              • 通常会持久化到磁盘
                              • 或者是分布式文件系统,例如HDFS
                              • 在Spark中的使用场景:
                                • SparkStreaming中进行故障恢复
                                • 对计算代价过高或者数据链路过长的RDD进行保存

                                  CheckPoint 的执行原理

                                  • 当RDD的job执行完毕后,会从最后一个RDD往前回溯
                                  • 当回溯到某个RDD调用了checkpoint方法后,Spark会启动一个新的job
                                  • 该任务会重新计算该RDD的数据,并持久化到HDFS上

                                    CheckPoint 对比 Cache

                                    CacheCheckpoint
                                    使用场景重复的RDD进行缓存,提高性能计算代价大或者数据链路过长的RDD,容错
                                    存储位置内存(易丢失)磁盘或HDFS(可靠的文件存储系统)
                                    依赖关系需要保存RDD之间的血缘依赖关系切断RDD之间的血缘依赖关系
                                    原理当每个Partition的数据被计算出来即可保存等第一个job执行完之后会重新启动一个新任务进行处理
                                    清理策略Job结束后自动删除需要手动删除,下一个Driver程序可直接加载

                                    广播变量和累加器

                                    广播变量

                                    • 使用广播变量的原因

                                      算子内部的代码最终会被封装到Task并发送到Executor中执行

                                      如果在算子内部使用了算子外部的变量,变量也会封装到Task中,Task中使用的实际上是外部变量的副本,Task的数量决定了外部变量副本的数量

                                      Task是在Executor中执行的,且Task的数量会远大于Executor的数量

                                      故可将外部变量广播到每个Executor中,减少变量的副本数

                                      进而减少网络中传输的数据量,提升运行效率

                                    • 广播变量介绍

                                      广播变量(Broadcast Variables)是Spark中一种用于高效分发较大数据集给所有工作节点的机制。在分布式计算中,当需要在所有节点上使用相同的只读数据时,可以使用广播变量来减少数据传输开销,提高性能。

                                      通常情况下,Spark会在集群的每个工作节点上复制一份任务的变量。但是,对于相对较大的变量,这样的传输开销可能是昂贵的。广播变量通过在集群上的各个节点上缓存变量的只读副本,从而避免了多次传输相同的数据。

                                      累加器

                                      • 应用场景

                                        • 例如如异常监控,调试,记录符合某特性的数据的数目
                                        • 累加器作用

                                          如果变量不被声明为累加器,那么它被改变时不会在Driver端进行全局汇总

                                          即在分布式运行时每个task运行的只是原始变量的一个副本,并不能改变原始变量的值

                                          • 当这个变量被声明为累加器后,该变量就会有分布式计数的功能
                                          • 广播变量获取流程

                                            • Task会向Executor申请获取广播变量
                                            • 若Executor暂无数据,则Executor首先会向同机架的其他Executor获取
                                            • 若获取不到再向跨机架的Executor获取
                                            • 如果还是获取不到,则向Driver端获取数据

                                              广播变量获取后会优先放入内存中,由BlockManager管理维护

                                              后续Task可直接从MemoryStore中获取使用

                                              BlockManager(Spark的分布式存储系统)

                                              主从结构:BlockManagerMaster/BlockManager(Slave)

                                              • BlockManagerMaster
                                                • 在Driver端启动
                                                • 负责接受Executor上的BlockManager的注册
                                                • 管理BlockManager的元数据信息
                                                • BlockManager
                                                  • 在每个Executor中启动
                                                  • 负责管理所在节点上的数据

                                                    主要构成

                                                    • MemoryStore:负责对内存上的数据进行存储和读写
                                                    • DiskStore:负责对磁盘上的数据进行存储和读写
                                                    • BlockTransferService:负责建立网络连接
                                                    • BlockManagerWorker:负责对其他的BlockManager的数据进行读写

                                                      BlockManager主要维护以下三类数据

                                                      • Cache缓存的数据
                                                      • 广播变量和累加器
                                                      • Shuffle产生的数据

                                                        Spark任务调度和资源申请流程

                                                        • Yarn-client模式

                                                          1. 在 YARN Client 模式下,spark-submit提交 Spark Job之后,就会提交的本地机器上启动Driver端

                                                          2. Driver 启动后会与 ResourceManager (RM)建立通讯并发起启动 ApplicationMaster(AM) 请求

                                                          3. RM接收到这个 Job 时,会在集群中选一个合适的 NodeManager (NM)并分配一个 Container(具有计算资源的一个容器),然后启动 ApplicationMaster(初始化SparkContext)

                                                          4. AM的功能相当于一个 ExecutorLaucher (Executor启动器),负责向 RM申请 Container 资源 ,RM收到请求后便会与 NM通信,启动 Container

                                                          5. AM对RM指定 NM分配的 Container 发出启动 Executor 进程请求

                                                          6. Executor进程启动后会向 Driver 反向注册,Executor 全部注册完成后 Driver 开始执行执行 Job 任务

                                                          7. Driver 中的 SparkContext 分配 Task 给 Executor 执行,Executor 运行 Task 并向 Driver 汇报运行的状态、进度、以及最终的计算结果;让 Driver 随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务

                                                          8. 应用程序运行完成后,AM向 RM申请注销并关闭自己。

                                                        • Yarn Cluster模式

                                                          1. 在 YARN Cluster 模式下,Spark 任务提交之后会与 RM建立通讯,并发出申请启动 AM请求
                                                          2. RM接收到这个 Job 时,会在集群中选一个合适的 NodeManager 并分配一个 Container,然后启动 AM,此时的 AM不仅负责ExecutorLauncher,还兼顾 Driver的作用
                                                          3. AM启动后向 RM申请资源启动Executor,RM接到 AM的资源申请后会在合适(有资源的情况下)的 NodeManager 中分配 Container
                                                          4. AM对RM指定 NodeManager 分配的 Container 发出启动 Executor 进程请求
                                                          5. Executor 进程启动后会向 AM(Driver)反向注册,Executor 全部注册完成后,开始执行执行 Job 任务
                                                          6. AM中的 SparkContext 分配 Task 给 Executor 执行,Executor 运行 Task 并向AM(Driver)汇报运行状态、进度、以及最终结果;让 AM(Driver)随时掌握各任务的运行状态,从而可在任务失败时重新启动任务
                                                          7. 应用程序运行完成后,ApplicationMaster 向 ResourceManager 申请注销并关闭自己;

                                                          Client模式 对比 Cluster模式

                                                          • Client模式
                                                            • Driver端在任务提交所在节点本地创建
                                                            • 主要用于Job的调试,上线前的测试
                                                            • 便于在本地查看日志
                                                            • 当多用户同时提交多个任务时,Driver 会与 Executor 进行大量的通信,会占用大量IO,导致网卡流量激增而被SA警告
                                                            • Cluster模式
                                                              • AM兼顾Driver端的作用,在某个NM中创建
                                                              • 适用于任务真正上线
                                                              • 由于AM(Driver)端是在任意某个NM中创建,故不会造成单节点流量激增,也不会导致网卡风暴
                                                              • 无法直接查看日志,需要通过命令或者在WEB界面查看

                                                                Spark优化

                                                                • 避免创建重复RDD
                                                                • 尽可能复用同一个RDD
                                                                • 对于多次使用的RDD进行持久化
                                                                • 使用广播变量(算子中使用了外部变量时,默认情况下每个task都会有一个变量副本,使用广播变量时,每个Executor保留一个变量副本,减少了网络传输和内存开销)
                                                                • 使用数据序列化,比如Kryo

网友评论

搜索
最新文章
热门文章
热门标签