`

apark 运行和调度

 
阅读更多

拷贝文档 备用理解

 

图2显示了Spark程序的运行场景。它由客户端启动,分两个阶段:第一阶段记录变换算子序列、增量构建DAG图;第二阶段由行动算子触 发,DAGScheduler把DAG图转化为作业及其任务集。Spark支持本地单节点运行(开发调试有用)或集群运行。对于后者,客户端运行于 master节点上,通过Cluster manager把划分好分区的任务集发送到集群的worker/slave节点上执行。

 

Spark 传统上与Mesos“焦不离孟”,也可支持Amazon EC2和YARN。底层任务调度器的基类是个trait,它的不同实现可以混入实际的执行。例如,在Mesos上有两种调度器实现,一种把每个节点的所有 资源分给Spark,另一种允许Spark作业与其他作业一起调度、共享集群资源。worker节点上有任务线程(task thread)真正运行DAGScheduler生成的任务;还有块管理器(block manager)负责与master上的block manager master通信(完美使用了Scala的Actor模式),为任务线程提供数据块。

最有趣的部分是DAGScheduler。下面详解它的工作过程。RDD的数据结构里很重要的一个域是对父RDD的依赖。如图3所示,有两类依赖:窄(Narrow)依赖和宽(Wide)依赖。

 

窄依赖指父RDD的每一个分区最多被一个子RDD的分区所用,表现为一个父RDD的分区对应于一个子RDD的分区,和两个父RDD的分区对应于一个子RDD 的分区。图3中,map/filter和union属于第一类,对输入进行协同划分(co-partitioned)的join属于第二类。

宽依赖指子RDD的分区依赖于父RDD的所有分区,这是因为shuffle类操作,如图3中的groupByKey和未经协同划分的join。

窄依赖对优化很有利。逻辑上,每个RDD的算子都是一个fork/join(此join非上文的join算子,而是指同步多个并行任务的barrier): 把计算fork到每个分区,算完后join,然后fork/join下一个RDD的算子。如果直接翻译到物理实现,是很不经济的:一是每一个RDD(即使 是中间结果)都需要物化到内存或存储中,费时费空间;二是join作为全局的barrier,是很昂贵的,会被最慢的那个节点拖死。如果子RDD的分区到 父RDD的分区是窄依赖,就可以实施经典的fusion优化,把两个fork/join合为一个;如果连续的变换算子序列都是窄依赖,就可以把很多个 fork/join并为一个,不但减少了大量的全局barrier,而且无需物化很多中间结果RDD,这将极大地提升性能。Spark把这个叫做流水线 (pipeline)优化。

变换算子序列一碰上shuffle类操作,宽依赖就发生了,流水线优化终止。在具体实现 中,DAGScheduler从当前算子往前回溯依赖图,一碰到宽依赖,就生成一个stage来容纳已遍历的算子序列。在这个stage里,可以安全地实 施流水线优化。然后,又从那个宽依赖开始继续回溯,生成下一个stage。

要深究两个问题:一,分区如何划分;二,分区该放到集群内哪个节点。这正好对应于RDD结构中另外两个域:分区划分器(partitioner)和首选位置(preferred locations)。

分区划分对于shuffle类操作很关键,它决定了该操作的父RDD和子RDD之间的依赖类型。上文提到,同一个join算子,如果协同划分的话,两个父 RDD之间、父RDD与子RDD之间能形成一致的分区安排,即同一个key保证被映射到同一个分区,这样就能形成窄依赖。反之,如果没有协同划分,导致宽 依赖。

所谓协同划分,就是指定分区划分器以产生前后一致的分区安排。Pregel和HaLoop把这个作为系统内置的一部分;而Spark 默认提供两种划分器:HashPartitioner和RangePartitioner,允许程序通过partitionBy算子指定。注意,HashPartitioner能够发挥作用,要求key的hashCode是有效的,即同样内容的key产生同样的hashCode。这对 String是成立的,但对数组就不成立(因为数组的hashCode是由它的标识,而非内容,生成)。这种情况下,Spark允许用户自定义 ArrayHashPartitioner。

第二个问题是分区放置的节点,这关乎数据本地性:本地性好,网络通信就少。有些RDD产生时就 有首选位置,如HadoopRDD分区的首选位置就是HDFS块所在的节点。有些RDD或分区被缓存了,那计算就应该送到缓存分区所在的节点进行。再不 然,就回溯RDD的lineage一直找到具有首选位置属性的父RDD,并据此决定子RDD的放置。

宽/窄依赖的概念不止用在调度中,对容错也很有用。如果一个节点宕机了,而且运算是窄依赖,那只要把丢失的父RDD分区重算即可,跟其他节点没有依赖。而宽依赖需要父RDD的所有分区都存在, 重算就很昂贵了。所以如果使用checkpoint算子来做检查点,不仅要考虑lineage是否足够长,也要考虑是否有宽依赖,对宽依赖加检查点是最物 有所值的。

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics