Commit 66c2f60 1 parent daa3819 commit 66c2f60 Copy full SHA for 66c2f60
File tree 1 file changed +24
-0
lines changed
1 file changed +24
-0
lines changed Original file line number Diff line number Diff line change
1
+ ## Magnet
2
+ 先从框架开始,这篇论文主要讲述的是从一个分布式数据处理框架(名叫spark)中优化shuffle操作的。业界中spark和hadoop这些数据处理框架一直都很知名,他们都是基于mapreduce的方法已经使用大量的机器作为集群,然后这些框架都已经展现出了好的可扩展性和广泛的适用性,在数据分析和机器学习领域都广泛的应用了。Spark利用sql优化器优化用户写的计算逻辑,优化器会产生一个优化好的plan,在这里是类似数据库里的先进行filter然后进行join,然后把这个plan转换成几个job去执行,而每一个job都包含一个状态的DAG图,表示这些数据是如何转换的以及当前的job的最后状态是怎样的。而在这些状态之间的中间数据的传输就是shuffle操作。
3
+ ***
4
+ 而spark中的shuffle操作是如何进行的呢。在领英,就是那个知名职场社交平台,他们用YARN来部署spark然后调用的外部shuffle服务,这样的话就与数据处理部分进行了一定的解耦,这种方法很多公司都在使用。图2就是shuffle过程的具体的步骤,每个spark的执行器启动时都会去跟处于同一个节点的ESS(就是那个外部的shuffle服务)注册一下,这样位于同一节点的shuffle服务就可以去知道map后的数据的绝对位置,就可以直接获取。然后Spark执行器启动之后其中的map执行完任务时,他会产生一对文件,一个是要进行shuffle的数据文件,一个是定位这个文件中的每一个shuffle block的一个索引文件,在此过程中,map任务会根据shuffle的划分的key值的哈希值进行排序,如果在内存中做不完这些操作的话,将会把数据挪到磁盘上做。生成的shuffle文件中属于一个shuffle分区的都处于同一个shuffle block中。到了reduce这边启动时,他会询问spark他们的要接受的输入的shuffle block在哪儿,然后建立与ess的连接来获取数据。Ess接收到请求就从磁盘中读取相应的shuffle block,返回给reduce任务。然而这一过程是存在一些会使得shuffle这个操作成为整个数据处理过程中的瓶颈的issues的。
5
+ ***
6
+ 其中之一是IO读写这一块儿,因为ESS对于每一个请求都只读一个shuffle的block,shuffle block的大小的平均大小也就是每次从磁盘读的数据量大小,领英又主要使用的是HDD(主要原因是便宜),而对于HDD来说,服务这种大量的随机读取shuffle block(每个shuffle block又比较小),是会受限于HDD的IOPS 也就是每秒钟的IO操作,而且每个块一般依旧读一次而且每次都的顺序都会随机(不知道reduce调用ess服务的顺序),所以用cache就是没什么大作用。HDD的IOPS的限制和访问shuffle data的模式相结合就导致了磁盘读取的低吞吐量。图中也可以看出导致fetch操作长延迟的基本上都是低block大小的。
7
+ 下一个问题就是可靠性,对于有S个shuffle 服务和E个spark executors的集群,至少S* E个连接是需要被建立的,而他们的集群中S和E都可以多达1000个,集群中还会节点间歇性可用问题,节点也不一定是一直为可用状态。Spark ESS是一个共享的服务,所以它可能会在高峰期间处理大量的流量,从而导致性能下降。还有一点就是当ESS和reduce的任务连接失败时,会导致整个shuffle reduce stage失败,从而重新生成没有被fetch的shuffle data。
8
+ ***
9
+ 最后一个问题是我们应当将reduce task放置于哪一个位置。因为IOPS的限制,即使我们的网速很快,我们也无法经常使我们的网络带宽能够用满,而且reduce任务的输入分散在map任务中,所以将shuffle block放置于与reduce任务的同一节点也是不大有作用的。
10
+ ***
11
+ 为了解决这些问题,他们实现了magnet,替代了spark中的shuffle 操作,这是他的结构图。首先,他将shuffle操作改成了用这种push-merge的操作,先将mapper产生的数据推给magnet,magnet根据属于同一shuffle分区的合并为同一块,这样magnet就可以将那些从小的读取变为以MB为数量级的大块的顺序读取,而且push这个操作和mapper是解耦的,所以push的时间不算入map的运行时间内,也不会导致map操作失败。Push的这一具体过程分两步:第一步是准备要被push的数据,第二步是在magnet上进行合并操作。准备合并操作时会先将mapper的任务中的产生的shuffle的file中的blocks先合并成大小为MB数量级的几个chunks再发给magnet,右边是这一合并过程的算法。这算法保证了每一个chunk只包含相邻的几个块然后每一个magnet服务分到的chunk的数量较为平均,而且来自不同mapper的属于同一分区的blocks会被推送到同一个magnet服务。合并完chunk后为了减少在同一时间内发给同一个magnet服务的属于同一分区的blocks的可能性,map会随机性的发送这些chunk给对应的magnet。然后magnet服务得到blocks之后,会将每个block给添加到对应的合并文件的末尾,同时维护一个metadata,用来处理出错情况。
12
+ ***
13
+ 这是magnet从map到reduce的过程中与hadoop和spark原先的方法的时间线的对比图。Hadoop用的是一种slow start的方法,也就是reduce可以在其他的map任务还没结束时开始,spark原先的方法呢为了提高并发使用了一对线程,一个作为producer一个作为consumer,fetch一个block然后喂给reduce task。Magnet不仅延续了这一方法,还使用了线程池来解耦了push和map task,同时他将magnet中的大的一个合并文件分解成几个MB大小的slice,来提高这种produce和consume的并发量,总的来说相比原来那种每次fetch只fetch一个小block的方式已经好了很多。
14
+ ***
15
+ Magnet的实现也增强了可靠性,因为magnet保留了fetch操作,所以对于一个不完整的merge file被获取后,reduce任务还会继续fetch那些没有被合并进该文件的其他shuffle block,对于map失败的情况下,会直接重启map,当push失败后,如果几次重试都失败,则reduce到时会直接fetch没有被成功push的blocks。当合并的文件产生问题(比如有重复,或者数据异常),reduce也会直接fetch所需的没有被合并的块。当reduce尝试去获得merged-file失败时也同理。而且,就像图中第六步一样,当spark driver通知magnet去对于一个给定的shuffle划分停止合并时会获取一个mergeStatus的列表,而map任务完成时也会向driver发一个mapstatus的列表,这些数据能告知driver没一个unmerged的block和merged的file的位置和大小,这样,driver就可以根据这些信息启动reduce任务,并绑定到特定的magnet服务进行数据获取。这一过程是由unmerged的block和merged的file的,所以也相当于保存了数据的两份副本,而当一个spark应用程序结束后,则会删除这些shuffle中间数据,所以不用担心数据量过多而保存不下。
16
+ ***
17
+ 在实际环境中,还会有掉队和数据倾斜这样的事发生,当mapper中所有务做完了,而有的push操作没做完时就会发生掉队得情况。Magnet允spark给一个时间限制,超过这个时间,push操作就会停止,magnet会收来自spark的通知从而停止合并新块,这样那些block又会以未合并的方被fetch。当某一个shuffle划分明显的大于其他的划分时就会产生数据斜,当magnet检测到这样的事情发生时,他会将这个划分分割成多个划来分摊计算压力。
18
+ ***
19
+ 剩下的evaluation在这里,对push操作和fetch操作的时间,fetch的block的大小越大,则延时越短,而因为push操作中map使用了那个算法每次推mb数量级的chunk给magnet,所以push操作反而不受block的大小的影响。同时他们还对比了HDD和SSD中fetch操作的延迟,也验证了HDD的小块读写很慢的这个事实。
20
+ 他们为了比较shuffle 服务能有多快的读写block的数据,还根据block的大小测量了磁盘读写量,fetch操作主要是读磁盘,push操作主要是写磁盘。如图中block的大小主要影响的是读的吞吐量。Push相比于fetch更加稳定,不像那种每次小的随机读取,写操作会有多层的cache比如页表,磁盘的缓冲层等等这些地方。
21
+ ***
22
+ 最后有一个使用和不使用magnet的运行时间的区别。Workload1中shuffle的数据小于100GB,workload2处理了大概400GB的数据,而且其中的任务都是与CPU联系紧密地,就是跟CPU密切相关,workload3 跟IO读写密切相关,而且处理了800GB左右的数据。其中2和3结合的情况更符合实际的情况,而结果也表明magnet在这种情况下更加有效。
23
+ ***
24
+ 总结一下,magnet用了基于这种push的操作使得从原来的small随机阅读变为了顺序读取MB大小的块,提高了HDD下的磁盘读取性能。保留了原有的对未合并的block的fetch操作从而提高了容错性。加入了一些新的机制使得整个系统能够正确的平衡掉队和数据倾斜的问题。
You can’t perform that action at this time.
0 commit comments