一、简介
Apache Spark 是用于大规模数据处理的统一分析引擎。 它提供了 Java、Scala、Python 和 R 中的高级 API,以及优化的引擎,该引擎支持用于数据分析的通用计算图。 它还支持丰富的高级工具集,包括用于 SQL 和 DataFrames 的 Spark SQL,用于机器学习的 MLlib,用于图形处理的 GraphX 和用于流处理的 Spark Streaming。
1.1 Spark 的特点
高效性
与Hadoop的MapReduce相比,Spark基于内存的运算要比MapReduce运算快100倍以上,基于硬盘的运算也要快10倍以上。Spark实现了高效的DAG执行引擎,可以通过基于内存来高效处理数据流。计算的中间结果是存在于内存中的。
易用性
不同于MapReduce仅支持Map和Reduce两种编程算子,Spark提供了超过80种不同的Transformation和Action算子,如map,reduce,filter,groupByKey,sortByKey,foreach等,并且采用函数式编程风格,实现相同的功能需要的代码量极大缩小。
Spark支持Java、Python和Scala的API,还支持超过80种高级算子,使用户可以快速构建不同的应用。而且Spark支持交互式的Python和Scala的shell,可以非常方便地在这些shell中使用Spark集群来验证解决问题的方法。
通用性
Spark提供了统一的解决方案。Spark可以用于批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX)。这些不同类型的处理都可以在同一个应用中无缝使用。
兼容性
Spark能够跟很多开源工程兼容使用。如Spark可以使用Hadoop的YARN和Apache Mesos作为它的资源管理和调度器,并且Spark可以读取多种数据源,如HDFS、HBase、MySQL等。
二、Spark 架构原理
2.1 Spark 专业术语定义
RDD:是弹性分布式数据集(Resilient Distributed Dataset)的简称,是分布式内存的一个抽象概念,提供了一种高度受限的共享内存模型。
Application:用户编写的Spark应用程序,一个Application包含多个Job。
DAG:是Directed Acyclic Graph(有向无环图)指的是数据转换执行的过程,有方向,无闭环(其实就是 RDD 执行的流程)。
DAGScheduler:有向无环图调度器,基于DAG划分Stage 并以TaskSet的形势提交Stage给TaskScheduler;负责将作业拆分成不同阶段的具有依赖关系的多批任务;最重要的任务之一就是:计算作业和任务的依赖关系,制定调度逻辑。在SparkContext初始化的过程中被实例化,一个SparkContext对应创建一个DAGScheduler。
Driver:驱动程序,Spark中的Driver即运行上述Application的Main()函数并且创建SparkContext,其中创建SparkContext的目的是为了准备Spark应用程序的运行环境。
Driver Program:控制程序,负责为Application构建DAG图。
Cluster Manager:集群资源管理中心,负责分配计算资源。目前有三种类型:
- Standalon : spark原生的资源管理,由Master负责资源的分配
- Apache Mesos:与hadoop MR兼容性良好的一种资源调度框架
- Hadoop Yarn: 主要是指Yarn中的ResourceManager
Worker:计算节点,集群中任何可以运行Application代码的节点,类似于Yarn中的NodeManager节点。
Executor:执行器,是运行在工作节点(Worker Node)上的一个进程,负责运行Task,并为应用程序存储数据。
Job:作业,一个Job包含多个RDD及作用于相应RDD上的各种操作。
Stage:阶段,是作业的基本调度单位,一个作业会分为多组任务,每组任务被称为阶段。
TaskScheduler:任务调度器,将Taskset提交给worker(集群)运行并回报结果;负责每个具体任务的实际物理调度。
TaskSet:任务集,由一组关联的,但相互之间没有Shuffle依赖关系的任务所组成的任务集。
Task:任务,运行在Executor上的工作单元,单个分区数据集上的最小处理流程单元,是Executor中的一个线程。
总体如图所示
2.2 Spark 运行流程
- 构建Spark Application的运行环境,启动SparkContext;
- SparkContext向资源管理器(可以是Standalone,Mesos,Yarn)申请运行Executor资源,并启动StandaloneExecutorbackend;
- Executor向SparkContext申请Task;
- SparkContext将应用程序分发给Executor;
- SparkContext构建成DAG图,将DAG图分解成Stage、将Taskset发送给Task Scheduler,最后由Task Scheduler将Task发送给Executor运行;
- Task在Executor上运行,运行完释放所有资源;
每个Application获取专属的executor进程,该进程在Application期间一直驻留,并以多线程方式运行Task。这种Application隔离机制是有优势的,无论是从调度角度看(每个Driver调度他自己的任务),还是从运行角度看(来自不同Application的Task运行在不同JVM中),当然这样意味着Spark Application不能跨应用程序共享数据,除非将数据写入外部存储系统
Spark与资源管理器无关,只要能够获取executor进程,并能保持相互通信就可以了
提交SparkContext的Client应该靠近Worker节点(运行Executor的节点),最好是在同一个Rack里,因为Spark Application运行过程中SparkContext和Executor之间有大量的信息交换
2.3 Spark 调度原理
2.3.1 Spark 集群整体运行架构
- Spark 集群分为 Master 节点和 Worker 节点,相当于 Hadoop 的 Master 和 Slave 节点。
- Master 节点上常驻 Master 守护进程,负责管理全部的 Worker 节点。
- Worker 节点上常驻 Worker 守护进程,负责与 Master 节点通信并管理 Executors。
- Driver 为用户编写的 Spark 应用程序所运行的进程。Driver 程序可以运行在 Master 节点上,也可运行在 Worker 节点上,还可运行在非 Spark 集群的节点上。
2.3.2 Spark 调度器
Spark 中主要有两种调度器:DAGScheduler 和 TaskScheduler,DAGScheduler 主要是把一个 Job 根据 RDD 间的依赖关系,划分为多个 Stage,对于划分后的每个 Stage 都抽象为一个由多个 Task 组成的任务集(TaskSet),并交给 TaskScheduler 来进行进一步的任务调度。TaskScheduler 负责对每个具体的 Task 进行调度。
DAGScheduler
当创建一个 RDD 时,每个 RDD 中包含一个或多个分区,当执行 Action 操作时,相应的产生一个 Job,而一个 Job 会根据 RDD 间的依赖关系分解为多个 Stage,每个 Stage 由多个 Task 组成(即 TaskSet),每个 Task 处理 RDD 中的一个 Partition。一个 Stage 里面所有分区的任务集合被包装为一个 TaskSet 交给 TaskScheduler 来进行任务调度。这个过程由是由 DAGScheduler 来完成的。DAGScheduler 对 RDD 的调度过程如下图所示:
TaskScheduler
DAGScheduler 将一个 TaskSet 交给 TaskScheduler 后,TaskScheduler 会为每个 TaskSet 进行任务调度,Spark 中的任务调度分为两种:FIFO(先进先出)调度和 FAIR(公平调度)调度。
FIFO 调度:即谁先提交谁先执行,后面的任务需要等待前面的任务执行。这是 Spark 的默认的调度模式。
FAIR 调度:支持将作业分组到池中,并为每个池设置不同的调度权重,任务可以按照权重来决定执行顺序。
在 Spark 中使用哪种调度器可通过配置spark.scheduler.mode参数来设置,可选的参数有 FAIR 和 FIFO,默认是 FIFO。
2.3.3 Spark RDD 调度过程
如下图所示,Spark 对 RDD 执行调度的过程,创建 RDD 并生成 DAG,由 DAGScheduler 分解 DAG 为包含多个 Task(即 TaskSet)的 Stages,再将 TaskSet 发送至 TaskScheduler,由 TaskScheduler 来调度每个 Task,并分配到 Worker 节点上执行,最后得到计算结果。
2.4 Spark RDD 常用函数
2.4.1 Transformation
下表列出了 Spark 支持的一些常见转换。有关详细信息,请参阅 RDD API 文档(Scala、 Java、 Python、 R)和配对 RDD 函数文档(Scala、 Java)
方法 | 描述 |
---|---|
map(func) | 对调用map的RDD数据集中的每个element都使用func,然后返回一个新的RDD,这个返回的数据集是做工的数据集 |
filter(func) | 对调用filter的RDD数据集中的每个元素都使用func,然后返回一个包含使func为true的元素构成的RDD |
flatMap(func) | 和map差不多,但是flatMap生成的是多个结果 |
mapPartitions(func) | 和map很像,但是map是每个element,而mapPartitions是每个partition |
mapPartitionsWithIndex(func) | 和mapPartitions很像,但是func作用的是其中一个split上,所以func中应该有index |
sample(withReplacement, fraction, seed) | 使用给定的随机数生成器种子对数据的一小部分进行采样,无论是否有替换 |
union(otherDataset) | 求并集 |
intersection(otherDataset) | 求交集 |
distinct([numPartitions])) | 数据去重,numPartitions为可选参数,表示分配多少个partition |
groupByKey([numPartitions]) | 分组函数,对类型为(K, V)数据集根据K进行分组,返回(K, lterable<V>)类型的数据集。numPartitions为可选参数,表示分配多少个partition |
reduceByKey(func, [numPartitions]) | 按照key分组然后聚合 |
aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions]) | 当在 (K, V) 对的数据集上调用时,返回 (K, U) 对的数据集,其中每个键的值使用给定的组合函数和中性零值聚合。允许与输入值类型不同的聚合值类型,同时避免不必要的分配。与 in 一样groupByKey ,reduce 任务的数量可通过可选的第二个参数进行配置 |
sortByKey([ascending], [numPartitions]) | 当在 K 实现 Ordered 的 (K, V) 对数据集上调用时,返回 (K, V) 对的数据集,按布尔ascending 参数中指定的键升序或降序排序 |
join(otherDataset, [numPartitions]) | 当有两个KV的dataset(K, V) 和 (K, W),返回是 (K, (V, W)) 的dataset,numTasks为并发的任务数 |
cogroup(otherDataset, [numPartitions]) | 当有两个KV的dataset(K, V) 和 (K, W) ,返回的是 (K, (Seq[V], Seq[W])) 的dataset,numTasks为并发的任务数 |
cartesian(otherDataset) | 当调用类型为 T 和 U 的数据集时,返回一个 (T, U) 对(所有元素对)的数据集 |
pipe(command, [envVars]) | 通过 shell 命令(例如 Perl 或 bash 脚本)对 RDD 的每个分区进行管道传输。RDD 元素被写入进程的标准输入,输出到标准输出的行作为字符串的 RDD 返回 |
coalesce(numPartitions) | 将 RDD 中的分区数减少到 numPartitions。对过滤大型数据集后更有效地运行操作很有用 |
repartition(numPartitions) | 随机重组 RDD 中的数据以创建更多或更少的分区并在它们之间进行平衡。这总是在网络上打乱所有数据 |
repartitionAndSortWithinPartitions(partitioner) | 根据给定的分区器对 RDD 进行重新分区,并在每个结果分区内,按键对记录进行排序。这比repartition 在每个分区内调用然后排序更有效,因为它可以将排序下推到 shuffle 机器中 |
2.4.2 Actions
方法 | 描述 |
---|---|
reduce(func) | 使用函数func(它接受两个参数并返回一个)聚合数据集的元素。该函数应该是可交换的和关联的 |
collect() | 在驱动程序中将数据集的所有元素作为数组返回。这通常在filter或其他返回足够小的数据子集的操作之后很有用 |
count() | 返回数据集中元素的数量 |
first() | 返回数据集的第一个元素(类似于 take(1)) |
take(n) | 返回一个包含数据集前 n 个元素的数组 |
takeSample(withReplacement, num, [seed]) | 随机取num个样本,withReplacement是否允许重复样本,num抽样个数,seed随机数种子 |
takeOrdered(n, [ordering]) | 使用自然顺序或自定义比较器返回RDD的前 n 个元素 |
saveAsTextFile(path) | 将数据集的元素作为文本文件(或一组文本文件)写入本地文件系统、HDFS 或任何其他 Hadoop 支持的文件系统中的给定目录中。Spark 将对每个元素调用 toString 以将其转换为文件中的一行文本 |
saveAsSequenceFile(path) (Java and Scala) | 将数据集的元素作为 Hadoop SequenceFile 写入本地文件系统、HDFS 或任何其他 Hadoop 支持的文件系统中的给定路径中。这在实现 Hadoop 的 Writable 接口的键值对的 RDD 上可用。在 Scala 中,它也可用于隐式转换为 Writable 的类型(Spark 包括对 Int、Double、String 等基本类型的转换) |
saveAsObjectFile(path) (Java and Scala) | 使用 Java 序列化以简单格式编写数据集的元素,然后可以使用 SparkContext.objectFile() |
countByKey() | 仅适用于 (K, V) 类型的 RDD。返回 (K, Int) 对的哈希图,其中包含每个键的计数 |
foreach(func) | 对数据集的每个元素运行函数func |
三、Spark 运行模式
Spark 有多种运行模式,由图中可以看到 Spark 支持本地运行模式(Local 模式)、独立运行模式(Standalone 模式)、Mesos、YARN(Yet Another Resource Negotiator)、Kubernetes 模式等。
本地运行模式是 Spark 中最简单的一种模式,也可称作伪分布式模式。
独立运行模式为 Spark 自带的一种集群管理模式,Mesos 及 YARN 两种模式也是比较常用的集群管理模式。相比较 Mesos 及 YARN 两种模式而言,独立运行模式是最简单,也最容易部署的一种集群运行模式。
Kubernetes 是一个用于自动化部署、扩展和管理容器化应用程序的开源系统。
Spark 底层还支持多种数据源,能够从其它文件系统读取数据,如 HDFS、Amazon S3、Hypertable、HBase 等。Spark 对这些文件系统的支持,同时也丰富了整个 Spark 生态的运行环境。
3.1 Spark Local 模式
Spark Local模式被称为Local[N]模式,是用单机的多个线程来模拟Spark分布式计算,直接运行在本地,便于调试,通常用来验证开发出来的应用程序逻辑上有没有问题,其中N代表可以使用N个线程,每个线程拥有一个core。如果不指定N,则默认是1个线程(该线程有1个core),如果是local[*],则代表 Run Spark locally with as many worker threads as logical cores on your machine.
即运行的线程数与CPU的核数一样。通常,Local模式用于完成开发出来的分布式程序的测试工作,并不用于实际生产。
3.2 Spark Standalone 模式
Standalone模式是Spark实现的资源调度框架,其自带完整的服务,可单独部署到一个集群中,无需依赖任何其他资源管理系统。主要的节点有Client节点、Master节点和Worker节点。其中Driver既可以运行在Master节点上中,也可以运行在本地Client端。
- 当用spark-shell交互式工具提交Spark的Job时,Driver在Master节点上运行;
- 当使用spark-submit工具提交Job或者在Eclipse、IDEA等开发平台上使用
new SparkConf().setMaster(“spark://master:7077”)
方式运行Spark任务时,Driver是运行在本地Client端上的。
Spark Standalone运行流程
1.SparkContext连接到Master,向Master注册并申请资源(CPU Core 和Memory);
2.Master根据SparkContext的资源申请要求和Worker心跳周期内报告的信息决定在哪个Worker上分配资源,然后在该Worker上获取资源,然后启动StandaloneExecutorBackend;
3.StandaloneExecutorBackend向SparkContext注册;
4.SparkContext将Applicaiton代码发送给StandaloneExecutorBackend;并且SparkContext解析Applicaiton代码,构建DAG图,并提交给DAG Scheduler分解成Stage(当碰到Action操作时,就会催生Job;每个Job中含有1个或多个Stage,Stage一般在获取外部数据和shuffle之前产生),DAG Scheduler将TaskSet提交给Task Scheduler,Task Scheduler负责将Task分配到相应的Worker,最后提交给StandaloneExecutorBackend执行;
5.StandaloneExecutorBackend会建立Executor线程池,开始执行Task,并向SparkContext报告,直至Task完成;
6.所有Task完成后,SparkContext向Master注销,释放资源。
3.3 Spark on Mesos 模式
Spark On Mesos模式是很多公司采用的模式,Spark运行在Mesos上会比运行在YARN上更加灵活,更加自然。在Spark On Mesos环境中,用户可选择粗粒度模式和细粒度模式两种调度模式之一运行自己的应用程序。其中细粒度模式于Spark2.0.0版本开始不再使用。
Spark执行程序的大小取决于以下配置变量:
执行器内存:spark.executor.memory
执行器核心:spark.executor.cores
执行者核数:spark.cores.max/spark.executor.cores
粗粒度模式 (Coarse-grained Mode)
粗粒度模式下,每个应用程序的运行环境由一个Dirver和若干个Executor组成,其中,每个Executor占用若干资源,内部可运行多个Task(对应多少个slot)。应用程序的各个任务正式运行之前,需要将运行环境中的资源全部申请好,且运行过程中要一直占用这些资源,即使不用,最后程序运行结束后,回收这些资源。举个例子,比如你提交应用程序时,指定使用5个executor运行你的应用程序,每个executor占用5GB内存和5个CPU,每个executor内部设置了5个slot,则Mesos需要先为executor分配资源并启动它们,之后开始调度任务。另外,在程序运行过程中,mesos的master和slave并不知道executor内部各个task的运行情况,executor直接将任务状态通过内部的通信机制汇报给Driver,从一定程度上可以认为,每个应用程序利用mesos搭建了一个虚拟集群自己使用。
细粒度模式 (Fine-grained Mode)
鉴于粗粒度模式会造成大量资源浪费,Spark On YARN还提供了另外一种调度模式:细粒度模式,这种模式类似于现在的云计算,思想是按需分配。与粗粒度模式一样,应用程序启动时,先会启动executor,但每个executor占用资源仅仅是自己运行所需的资源,不需要考虑将来要运行的任务,之后,mesos会为每个executor动态分配资源,每分配一些,便可以运行一个新任务,单个Task运行完之后可以马上释放对应的资源。每个Task会汇报状态给Mesos slave和Mesos Master,便于更加细粒度管理和容错,这种调度模式类似于MapReduce调度模式,每个Task完全独立,优点是便于资源控制和隔离,但缺点也很明显,短作业运行延迟大。
3.4 Spark on YARN模式
Spark On YARN模式。这是一种最有前景的部署模式。但限于YARN自身的发展,目前仅支持粗粒度模式(Coarse-grained Mode)。这是由于YARN上的Container资源是不可以动态伸缩的,一旦Container启动之后,可使用的资源不能再发生变化,不过这个已经在YARN计划(具体参考:https://issues.apache.org/jira/browse/YARN-1197 )中了。
Spark on YARN模式根据Driver在集群中的位置分为两种模式:一种是YARN-Client模式,另一种是YARN-Cluster(或称为YARN-Standalone模式)。
YARN-Client 模式
Yarn-Client模式中,Driver在客户端本地运行,这种模式可以使得Spark Application和客户端进行交互,因为Driver在客户端,所以可以通过webUI访问Driver的状态,默认是 http://hadoop1:4040 访问,而YARN通过 http://hadoop1:8088 访问。
YARN-client的工作流程
1.Spark Yarn Client向YARN的ResourceManager申请启动Application Master。同时在SparkContent初始化中将创建DAGScheduler和TASKScheduler等,由于我们选择的是Yarn-Client模式,程序会选择YarnClientClusterScheduler和YarnClientSchedulerBackend;
2.ResourceManager收到请求后,在集群中选择一个NodeManager,为该应用程序分配第一个Container,要求它在这个Container中启动应用程序的ApplicationMaster,与YARN-Cluster区别的是在该ApplicationMaster不运行SparkContext,只与SparkContext进行联系进行资源的分派;
3.Client中的SparkContext初始化完毕后,与ApplicationMaster建立通讯,向ResourceManager注册,根据任务信息向ResourceManager申请资源(Container);
4.一旦ApplicationMaster申请到资源(也就是Container)后,便与对应的NodeManager通信,要求它在获得的Container中启动启动CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend启动后会向Client中的SparkContext注册并申请Task;
5.Client中的SparkContext分配Task给CoarseGrainedExecutorBackend执行,CoarseGrainedExecutorBackend运行Task并向Driver汇报运行的状态和进度,以让Client随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务;
6.应用程序运行完成后,Client的SparkContext向ResourceManager申请注销并关闭自己。
YARN-Cluster 模式
在YARN-Cluster模式中,当用户向YARN中提交一个应用程序后,YARN将分两个阶段运行该应用程序:第一个阶段是把Spark的Driver作为一个ApplicationMaster在YARN集群中先启动;第二个阶段是由ApplicationMaster创建应用程序,然后为它向ResourceManager申请资源,并启动Executor来运行Task,同时监控它的整个运行过程,直到运行完成。
YARN-cluster的工作流程
1.Spark Yarn Client向YARN中提交应用程序,包括ApplicationMaster程序、启动ApplicationMaster的命令、需要在Executor中运行的程序等;
2.ResourceManager收到请求后,在集群中选择一个NodeManager,为该应用程序分配第一个Container,要求它在这个Container中启动应用程序的ApplicationMaster,其中ApplicationMaster进行SparkContext等的初始化;
3.ApplicationMaster向ResourceManager注册,这样用户可以直接通过ResourceManage查看应用程序的运行状态,然后它将采用轮询的方式通过RPC协议为各个任务申请资源,并监控它们的运行状态直到运行结束;
4.一旦ApplicationMaster申请到资源(也就是Container)后,便与对应的NodeManager通信,要求它在获得的Container中启动CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend启动后会向ApplicationMaster中的SparkContext注册并申请Task。这一点和Standalone模式一样,只不过SparkContext在Spark Application中初始化时,使用CoarseGrainedSchedulerBackend配合YarnClusterScheduler进行任务的调度,其中YarnClusterScheduler只是对TaskSchedulerImpl的一个简单包装,增加了对Executor的等待逻辑等;
5.ApplicationMaster中的SparkContext分配Task给CoarseGrainedExecutorBackend执行,CoarseGrainedExecutorBackend运行Task并向ApplicationMaster汇报运行的状态和进度,以让ApplicationMaster随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务;
6.应用程序运行完成后,ApplicationMaster向ResourceManager申请注销并关闭自己。
YARN-Client 与 YARN-Cluster 区别
YARN-Client模式下,Application Master仅仅向YARN请求Executor,Client会和请求的Container通信来调度他们工作,也就是说Client不能离开。
YARN-Cluster模式下,Driver运行在AM(Application Master)中,它负责向YARN申请资源,并监督作业的运行状况。当用户提交了作业之后,就可以关掉Client,作业会继续在YARN上运行,因而YARN-Cluster模式不适合运行交互类型的作业;