Spark调优核心参数设置
num-executors 该参数一定被设置, 为当前Application生产指定个数的Executors 实际生产环境分配80个左右的Executors
executor-memory 与JVM OOM(内存溢出)紧密相关,很多时候甚至决定了spark运行的性能 实际生产环境下建议8GB左右 若运行在yarn上,内存占用量不超过yarn的内存资源的50%
excutor-cores 决定了在Executor中能够并行执行的Task的个数 实际生产环境建议2~3个driver-memory 作为驱动,默认是1GB 生产环境一般设置4GB
spark.default.parallelism 建议至少设置100个,官方推荐是num-executors*excutor-cores的2~3倍
spark.storage.memoryFraction 用于存储的比例默认占用60%,如果计算比较依赖于历史数据,则可以适当调高该参数,如果计算严重依赖于shuffle,则需要降低该比例
spark.shuffle.memoryFraction 用于shuffle的内存比例,默认占用20% 如果计算严重依赖于shuffle,则需要提高该比例
spark生态的主要组件:
spark core,任务调度,内存管理,错误恢复
spark sql,结构化数据处理
spark streaming,流式计算
spark MLlib,机器学习库
GraphX,图计算
spark运行模式:
- local模式
- standalone模式,构建master+slave集群
- Spark on Yarn模式
- Spark on Mesos模式
宽窄依赖
1.窄依赖是1对1或1对多,宽依赖是多对1
2.窄依赖前一步map没有完全完成也可以进行下一步,在一个线程里完成不划分stage;宽依赖下一步需要依赖前一步的结果,划分stage
4.在传输上,窄依赖之间在一个stage内只需要做pipline,每个父RDD的分区只会传入到一个子RDD分区中,通常可以在一个节点内完成转换;宽依赖在stage做shuffle,需要在运行过程中将同一个父RDD的分区传入到不同的子RDD分区中,中间可能涉及多个节点之间的数据传输
3.容错上,窄依赖只需要重新计算子分区对应的负分区的RDD即可;宽依赖,在极端情况下所有负分区的RDD都要被计算
mapreduce中数据倾斜的原因?应该如何处理?
如何处理spark中的数据倾斜?
原因:在物理执行期间,RDD会被分为一系列的分区,每个分区都是整个数据集的子集。当spark调度并运行任务的时候,Spark会为每一个分区中的数据创建一个任务。大部分的任务处理的数据量差不多,但是有少部分的任务处理的数据量很大,因而Spark作业会看起来运行的十分的慢,从而产生数据倾斜
处理方式:
1.使用需要进行shuffle人工指定参数并行度
2.进行数据的清洗,把发生倾斜的刨除,用单独的程序去算倾斜的key
3.join的时候使用小数据join大数据时,换用map join
- 尽量减少shuffle的次数
Spark分区数设置
1、分区数越多越好吗?
不是的,分区数太多意味着任务数太多(一个partion对应一个任务),每次调度任务也是很耗时的,所以分区数太多会导致总体耗时增多。
2、分区数太少会有什么影响?
分区数太少的话,会导致一些结点没有分配到任务;另一方面,分区数少则每个分区要处理的数据量就会增大,从而对每个结点的内存要求就会提高;还有分区数不合理,会导致数据倾斜问题。
3、合理的分区数是多少?如何设置?
总核数=executor-cores * num-executor
一般合理的分区数设置为总核数的2~3倍
Worker、Master、Executor、Driver 4大组件
1.master和worker节点
搭建spark集群的时候我们就已经设置好了master节点和worker节点,一个集群有一个master节点和多个worker节点。
master节点常驻master守护进程,负责管理worker节点,我们从master节点提交应用。
worker节点常驻worker守护进程,与master节点通信,并且管理executor进程。
2.driver和executor进程
driver进程就是应用的main()函数并且构建sparkContext对象,当我们提交了应用之后,便会启动一个对应的driver进程,driver本身会根据我们设置的参数占有一定的资源(主要指cpu core和memory)。下面说一说driver和executor会做哪些事。
driver可以运行在master上,也可以运行worker上(根据部署模式的不同)。driver首先会向集群管理者(standalone、yarn,mesos)申请spark应用所需的资源,也就是executor,然后集群管理者会根据spark应用所设置的参数在各个worker上分配一定数量的executor,每个executor都占用一定数量的cpu和memory。在申请到应用所需的资源以后,driver就开始调度和执行我们编写的应用代码了。driver进程会将我们编写的spark应用代码拆分成多个stage,每个stage执行一部分代码片段,并为每个stage创建一批tasks,然后将这些tasks分配到各个executor中执行。
executor进程宿主在worker节点上,一个worker可以有多个executor。每个executor持有一个线程池,每个线程可以执行一个task,executor执行完task以后将结果返回给driver,每个executor执行的task都属于同一个应用。此外executor还有一个功能就是为应用程序中要求缓存的 RDD 提供内存式存储,RDD 是直接缓存在executor进程内的,因此任务可以在运行时充分利用缓存数据加速运算。
driver进程会将我们编写的spark应用代码拆分成多个stage,每个stage执行一部分代码片段,并为每个stage创建一批tasks,然后将这些tasks分配到各个executor中执行。
Spark是如何进行资源管理的?
1)资源的管理和分配
资源的管理和分配,由Master和Worker来完成。
Master给Worker分配资源, Master时刻知道Worker的资源状况。
客户端向服务器提交作业,实际是提交给Master。
2)资源的使用
资源的使用,由Driver和Executor。程序运行时候,向Master请求资源。
Spark和mapreduce点的区别
优点:
1.最大的区别在于.spark把用到的中间数据放入内存,而mapreduce需要通过HDFS从磁盘中取数据。
2.spark算子多,mapreduce只有map和reduce两种操作
缺点:
spark过度依赖内存计算,如果参数设置不当,内存不够时就会因频繁GC导致线程等待
什么是RDD
RDD是一个只读的分布式弹性数据集,是spark的基本抽象
主要特性:
1.分布式。由多个partition组成,可能分布于多台机器,可并行计算
2.高效的容错(弹性)。通过RDD之间的依赖关系重新计算丢失的分区
3.只读。不可变
RDD在spark中的运行流程?
- 创建RDD对象
- sparkContext负责计算RDD之间的依赖关系,构建DAG
- DAGScheduler负责把DAG分解成多个stage(shuffle stage和final stage),每个stage中包含多个task,每个task会被TAskScheduler分发给WORKER上的Executor执行
spark任务执行流程:
- Driver端提交任务,向Master申请资源
- Master与Worker进行RPC通信,让Work启动Executor
- Executor启动反向注册Driver,通过Driver—Master—Worker—Executor得到Driver在哪里
- Driver产生Task,提交给Executor中启动Task去真正的做计算
spark是如何容错的?
主要采用Lineage(血统)机制来进行容错,但在某些情况下也需要使用RDD的checkpoint
对于窄依赖,只计算父RDD相关数据即可,窄依赖开销较小
对于宽依赖,需计算所有依赖的父RDD相关数据,会产生冗余计算,宽依赖开销较大。
在两种情况下,RDD需要加checkpoint
1.DAG中的Lineage过长,如果重算,开销太大
2.在宽依赖上Cheakpoint的收益更大
一个RDD的task数量是又什么决定?一个job能并行多少个任务是由什么决定的?
task由分区决定,读取时候其实调用的是hadoop的split函数,根据HDFS的block来决定
每个job的并行度由core决定
cache与checkpoint的区别
cache 和 checkpoint 之间有一个重大的区别,cache 将 RDD 以及 RDD 的血统(记录了这个RDD如何产生)缓存到内存中,当缓存的 RDD 失效的时候(如内存损坏),
它们可以通过血统重新计算来进行恢复。但是 checkpoint 将 RDD 缓存到了 HDFS 中,同时忽略了它的血统(也就是RDD之前的那些依赖)。为什么要丢掉依赖?因为可以利用 HDFS 多副本特性保证容错!
reduceByKey和groupByKey的区别?
如果能用reduceByKey,那就用reduceByKey.因为它会在map端,先进行本地combine,可以大大减少要传输到reduce端的数据量,减小网络传输的开销。
groupByKey的性能,相对来说要差很多,因为它不会在本地进行聚合,而是原封不动,把ShuffleMapTask的输出,拉取到ResultTask的内存中,所以这样的话,就会导致,所有的数据,都要进行网络传输从而导致网络传输性能开销非常大!
map和mapPartition的区别?
1.map是对rdd中的每一个元素进行操作;mapPartitions则是对rdd中的每个分区的迭代器进行操作
如果是普通的map,比如一个partition中有1万条数据。ok,那么你的function要执行和计算1万次。使用MapPartitions操作之后,一个task仅仅会执行一次function,function一次接收所有的partition数据。只要执行一次就可以了,性能比较高。2.如果在map过程中需要频繁创建额外的对象(例如将rdd中的数据通过jdbc写入数据库,map需要为每个元素
创建一个链接而mapPartition为每个partition创建一个链接),则mapPartitions效率比map高的多。3.SparkSql或DataFrame默认会对程序进行mapPartition的优化。
mapPartition缺点:
一次性读入整个分区全部内容,分区数据太大会导致内存OOM
详细说明一下GC对spark性能的影响?优化
GC会导致spark的性能降低。因为spark中的task运行时是工作线程,GC是守护线程,守护线程运行时,会让工作线程停止,所以GC运行的时候,会让Task停下来,这样会影响spark
程序的运行速度,降低性能。
默认情况下,Executor的内存空间分60%给RDD用来缓存,只分配40%给Task运行期间动态创建对象,这个内存有点小,很可能会发生full gc,因为内存小就会导致创建的对象很快把内存填满,然后就会GC了,就是JVM尝试找到不再被使用的对象进行回收,清除出内存空间。所以如果Task分配的内存空间小,就会频繁的发生GC,从而导致频繁的Task工作线程的停止,从而降低Spark应用程序的性能。优化方式:
1.增加executor内存
2.可以用通过调整executor比例,比如将RDD缓存空间占比调整为40%,分配给Task的空间变为了60%,这样的话可以降低GC发生的频率 spark.storage.memoryFraction
2.使用Kryo序列化类库进行序列化
为什么要使用广播变量?
当RDD的操作要使用driver中定义的变量时,每次都要把变量发送给worker节点一次,如果这个变量的数据很大的话,会产生很高的负载,导致执行效率低;
使用广播变量可以高效的使一个很大的只读数据发送给多个worker节点,而且对每个worker节点只需要传输一次,每次操作时executor可以直接获取本地保存的数据副本,不需要多次传输