0%

pyspark学习心得

持久化

​ Spark中对于一个RDD执行多次算子的默认原理是这样的:每次你对一个RDD执行一个算子操作时,都会重新从源头处计算一遍,计算出那个RDD来,然后再对这个RDD执行你的算子操作。这种方式的性能是很差的。

​ 因此对于这种情况,我们的建议是:对多次使用的RDD进行持久化。此时Spark就会根据你的持久化策略,将RDD中的数据保存到内存或者磁盘中。以后每次对这个RDD进行算子操作时,都会直接从内存或磁盘中提取持久化的RDD数据,然后执行算子,而不会从源头处重新计算一遍这个RDD,再执行算子操作

​ spark中的持久化操作主要分为两种:persist和cachecache相当于使用MEMORY_ONLY级别的persist操作,而persist更灵活可以任意指定persist的级别。

如何选择一种最合适的持久化策略

  • 默认情况下,性能最高的当然是MEMORY_ONLY,但前提是你的内存必须足够足够大,可以绰绰有余地存放下整个RDD的所有数据。因为不进行序列化与反序列化操作,就避免了这部分的性能开销;对这个RDD的后续算子操作,都是基于纯内存中的数据的操作,不需要从磁盘文件中读取数据,性能也很高;而且不需要复制一份数据副本,并远程传送到其他节点上。但是这里必须要注意的是,在实际的生产环境中,恐怕能够直接用这种策略的场景还是有限的,如果RDD中数据比较多时(比如几十亿),直接用这种持久化级别,会导致JVM的OOM内存溢出异常
  • 如果使用MEMORY_ONLY级别时发生了内存溢出,那么建议尝试使用MEMORY_ONLY_SER级别。该级别会将RDD数据序列化后再保存在内存中,此时每个partition仅仅是一个字节数组而已,大大减少了对象数量,并降低了内存占用。这种级别比MEMORY_ONLY多出来的性能开销,主要就是序列化与反序列化的开销但是后续算子可以基于纯内存进行操作,因此性能总体还是比较高的。此外,可能发生的问题同上,如果RDD中的数据量过多的话,还是可能会导致OOM内存溢出的异常。
  • 如果纯内存的级别都无法使用,那么建议使用MEMORY_AND_DISK_SER策略,而不是MEMORY_AND_DISK策略。因为既然到了这一步,就说明RDD的数据量很大,内存无法完全放下。序列化后的数据比较少,可以节省内存和磁盘的空间开销。同时该策略会优先尽量尝试将数据缓存在内存中,内存缓存不下才会写入磁盘
  • 通常不建议使用DISK_ONLY和后缀为_2的级别:因为完全基于磁盘文件进行数据的读写,会导致性能急剧降低,有时还不如重新计算一次所有RDD。后缀为_2的级别,必须将所有数据都复制一份副本,并发送到其他节点上,数据复制以及网络传输会导致较大的性能开销,除非是要求作业的高可用性,否则不建议使用。

提高性能的算子

使用filter之后进行coalesce操作

通常对一个RDD执行filter算子过滤掉RDD中较多数据后(比如30%以上的数据),建议使用coalesce算子,手动减少RDD的partition数量,将RDD中的数据压缩到更少的partition中去。因为filter之后,RDD的每个partition中都会有很多数据被过滤掉,此时如果照常进行后续的计算,其实每个task处理的partition中的数据量并不是很多,有一点资源浪费,而且此时处理的task越多,可能速度反而越慢。因此用coalesce减少partition数量,将RDD中的数据压缩到更少的partition之后,只要使用更少的task即可处理完所有的partition。在某些场景下,对于性能的提升会有一定的帮助。

Shuffle

​ 大多数spark作业的性能主要就消耗在shuffle环节,因为shuffle中包含了大量的磁盘IO、序列化、网络数据传输等操作。但是影响一个spark作业性能的主要因素还是代码开发、资源参数、以及数据倾斜,shuffle调优在优化spark作业性能中只能起较小的作用。

shuffle操作速度慢的原因

Pyspark使用过程中的一些小Tips:

1、RDD.repartition(n)可以在最初对RDD进行分区操作,这个操作实际上是一个shuffle,可能比较耗时,但是如果之后的action比较多的话,可以减少下面操作的时间。其中的n值看cpu的个数,一般大于2倍cpu,小于1000。

2、Action不能够太多,每一次的action都会将以上的taskset划分一个job,这样当job增多,而其中task并不释放,会占用更多的内存,使得gc拉低效率。

3、在shuffle前面进行一个过滤,减少shuffle数据,并且过滤掉null值,以及空值

4、groupBy尽量通过reduceBy替代。reduceBy会在work节点做一次reduce,在整体进行reduce,相当于做了一次hadoop中的combine操作,而combine操作和reduceBy逻辑一致,这个groupBy不能保证。

5、做join的时候,尽量用小RDD去join大RDD.

6、避免collect的使用。因为collect如果数据集超大的时候,会通过各个work进行收集,io增多,拉低性能,因此当数据集很大时要save到HDFS。

7、RDD如果后面使用迭代,建议cache,但是一定要估计好数据的大小,避免比cache设定的内存还要大,如果大过内存就会删除之前存储的cache,可能导致计算错误,如果想要完全的存储可以使用persist(MEMORY_AND_DISK),因为cache就是persist(MEMORY_ONLY)。

8、设置spark.cleaner.ttl,定时清除task,因为job的原因可能会缓存很多执行过去的task,所以定时回收可能避免集中gc操作拉低性能。