《the MapReduce》 --- lab1
  最近稍微有空,就把割了很久的lab1给做了。在做的过程中磕磕碰碰,碰到了不少坑但是也总结了不少经验。感觉对日常的工作还是有不少帮助的,下面就来介绍本次的lab1。
MapReduce
  简单来说MapReduce就是把一个大任务,拆成若干个小的任务。然后把这些任务分发到不同的worker(服务器)上面去运行,然后再把他们执行的结果聚拢到一起,回复给外界。
  至于原理是什么我就不多说了,有兴趣的可以看看Jeff Dean的论文。 【the MapReduce】
  下面来介绍,如何实现一个简单的MapReduce,这里只介绍master是如何设计,该lab我做下来,感觉最大的难点是在于如何维护metadata。至于worker的设计倒没有什么特别的地方。
Master,Task相关的数据结构
关于task(map和reduce),其实不难,仅需要把关于task的信息都抽象出来即可。如下
1 |
|
- TaskSeq:task的序列号,用来区分不同的task
- TaskState:用来描述每个task的状态
- TaskStartTimeStamp:master每次分发一个task之后,都会记录该task的分发时间,用于后续的超时判断
- TaskAssignedWorkerId:记录该task被分发到哪个worker上,这个字段和时间戳有助于提高系统的容错
- InputPath和OutputPath:用来记录文件读取,和文件输出的路径
  这里解释一下,为什么reduce的状态枚举量比map多了一个BLOCK状态。因为,reduce的输入文件,是map的输出。因此,只有某个reduce的输入数据都准备好之后,它才能够运行。因此,在此之前它都是阻塞状态。
  下面就是master的数据结构,在我实现的lab中,所设计的Master的数据结构参考了GFS里面的master。master存储着整个集群的控制信息,以及提供任务分发的接口。
  这是我一开始设计的Master结构体。
1 |
|
先说一下这两种设计各有什么优劣之处。
(1)从任务分发的角度来看
  第一种设计,采用了三个队列来存储不同状态的task,这样就导致分发task的效率会很高,如果想要把一个空的task分发出去,直接从INIT状态的队列中直接取出来,效率很高,复杂度在O(1)。
  但是,做出同样的操作第二种设计却需要O(n)的时间复杂度。
(2)从数据的完整性来看
  在这个角度来看,第一种设计的数据持久型做的不够好,我觉得它其实跟“分布式”有一点点像,在极端的情况下会数据丢失的。
  第二种结构,所有状态的task都用一个表来进行管理,这种又有点像”单机“,它的原子性和持久性能达到比较好的保障。
  这个的具体原因,在下面的任务分发接口中会提及。
Master的接口
  一开始做这个lab的时候,其实我是有一点懵的。因为MIT给出的实验文件,仅仅只有master提供rpc接口,worker却没有rpc接口。也就是说,master并不能主动向worker分发任务。
  后来通过查阅文档之后发现,该lab的设计是通过,worker在一个loop中不断的向master请求任务,解决任务,然后恢复master。不断执行这个loop。
  这种设计方案,有一个好处,就是能够充分利用worker的性能,尽量减少worker闲置的情况发生。
任务分发
1 |
|
  先简单说一下分发任务的逻辑。因为,map任务是产生中间数据集,而reduce任务的输入就是map产生的中间数据集。因此,在任务分发的时候,显然map任务有更高的优先级,尽量让map任务先做完。
  以map的分发为例,需要分发的是INIT状态的任务,已经在计算和运行结束的任务不必再次分发。这里通过调用m.getInitMapTaskForAssign()
获得一个**能够分发的任务(INIT)**。(如果找不到,说明map已经全部分发完毕,可以把reduce进行分发这个逻辑跟分发map类似)
  后面就是对该任务进行注册(修改状态,添加分发时间,以及分给哪个worker)。
以上就是分发任务接口的实现逻辑,十分简单。下面想讨论,在做这个lab遇到的问题以及细节:
(1)数据的完整性以及分发的原子性
  刚开始做这个lab的时候,我设计的master是用三个队列来存储taskInfo的。这种设计,可以马上在INIT状态的队列里面拿出一个任务进行处理。而如果用table的话,需要遍历整个表。
  但是,这样三队列这种设计,其实它原子性是不高的。这种设计之下,它的分发其实是三个动作:(1)从INIT队列中拿出一个task (2)修改该task的信息 (3)把这个task插入到PROCESSING队列中
  这三个都是独立的操作,即使定期给Master做一个快照,但还是会有数据丢失的情况发生。比如说以下这种场景:
- 从INIT队列中取出一个task
- 系统进行快照备份
- 备份完之后发生了crash
  那么,在这种场景下,从INIT拿出来的这个task就会消失了。因为,发生持久化的时候,该task并不在这三个队列中,持久化的时候并不能把该task写入到硬盘中去。如果没有发生crash则没关系,若是在这个时候发生crash了,这个task是仅仅存在于内存里面的,因此会导致数据丢失。(除非持久化的时候,不仅存储那三个队列,还把内存中的数据一并持久化。但是真要这要做,开销是十分巨大的)
  正是因为这个原因,我选择了把所有task存储在一个table里面,这样就避免了这种弱原子性的问题。并且考虑到,在table里面拿到一个可以分发的task的平均复杂度不会太高,因此这种方案还是能够接受的。
(2)处理逻辑和发送回复的顺序问题
1 |
|
  简单来说其实就是先处理注册逻辑还是先发送回复的问题。在这个lab中其实不会遇到这个问题,因为太久没用过rpc了,当时以为只要设置了rsp就会把回复发送给调用者。实际上,会等到这个接口调用结束之后,rpc框架才会对rsp进行序列化并通过网络进行传输。
  虽然,这个问题在该lab中没有体现,但是还是有不少参考意义(比如在MQ中这应该会经常碰到)。
  对于这个问题,有两种方案,下面来分析一下它们的优劣势:
1.先发送rsp,再处理metadata(又或者是业务逻辑)
  当时做这个的时候,主要考虑的问题是。当我发送完这个回复请求之后,还没到业务逻辑的处理,这时系统发生宕机了,我该怎么进行处理?
  如果发送rsp之后发生宕机了,假设网络没有任何问题,worker就会收到该请求,然后开始处理这个计算任务,从这个角度来看worker认为master已经把任务分发给他了。
  但是从master的角度来说,它虽然已经发送了请求,但是metadata还没来得及修改,然而master作出的决策也基本是依靠metadata。因此从这个角度master不认为已经把任务分发了,在master的metadata中,该任务仍就是处于INIT状态。
  正是因为上述原因,需要额外的机制来处理这种错误。此时,metadata没有修改,但是worker已经收到分发的任务了,并开始处理。处理完成后,会调用task_finish
接口让master处理。但这时候,master就会很困惑了,因为该任务是INIT又被分发到别的worker上。
  其实,这个处理也不难,只要回复的workerId 与 master记录的把该task分发到哪个workerId上,这两个对不上,master就不进行处理就好了。不要弄太复杂的逻辑。
2.先处理metadata,再发送请求
  跟上面的问题类似,在这种场景下,从master的角度来看,确实是把任务分发了,因为metadata已经被修改了。但是从worker的角度来看,它没有收到分发的任务。
  这种的处理策略也很简单,我们只需要等超时处理,自动把该任务回收就好了。
  经过综合考虑之后,我决定选择第二种策略去做该lab。
任务结束
1 |
|
  当worker处理完任务之后,就会回调该函数,然后让master进行metadata的处理。因为worker能处理map_task和reduce_task,因此在回调的之后,需要指明当前worker是处理的哪个task。
  同时,需要知道的是,有超时机制的存在,导致master有可能把任务再次分发给到了另一个worker。因此,master在处理任务结束的回调,最主要的是处理metadata的信息是否准确。当时有想过,在校验worker信息的时候,同时也检测超时。但是,后来还是觉得应该遵从单一原则,不应该把事情弄得过于复杂。并且,如果在处理结束任务时,实际上该任务已经超时了,但是超时任务还没来得及启动处理。我想,最终是否是超时应该交给超时机制去处理。
任务监控(timeout)
1 |
|
  对于管理metadata的节点来说,我觉得监控是十分重要的一个机制。感觉很多容错的措施都是建立在监控的基础上的,比如说运行在某个worker上面的任务,因为宕机等原因,造成该任务没有办法完成。如果没有额外的机制去处理的话,master将会永远等待这个“无法完成”的任务,这显然不合理。
  因此,需要引入一个机制来帮助master筛选那些已经“坏掉的任务”。在这个lab中,我通过超时处理来达成这个目的。一般情况下,worker都能在规定时间内完成分发给它的任务,因为分发下去的任务都是一些子任务,并不会有很大的数据量,除非一些不可抗的因素(网络IO慢,worker负载高,宕机)发生,才会导致任务的超时。
  当一个任务被分发下去的时候,master便会记录该任务分发的时间,以及分发给哪个worker。一开始设计的时候,我仅仅是记录分发的时间,没有记录分发给哪个worker。
  后面验证的时候发现了逻辑漏洞,如果不记录worker的话,当一个任务超时,master会把该任务重新设置为可分发状态。当旧的worker完成任务后,调用task_finish
接口的时候,如果此时任务处于INIT状态,master显然能够这道此时的处理的task_finish
是超时的任务。但是,如果master把这个超时的任务,重新分发给到了别的worker,那么master旧很难区分到底是谁调用了task_finish
。因此,我加了一个workerId方便master进行校验。
  把这些该保存的信息,都记录下之后,便开启开启一个协程去处理超时。该协程是一个endless loop,并且在每个周期内都会检查所有
metadata,如果某个任务超时,便将它重新设置为初始化状态。这样即使,master收到已经超时的任务,它也有能力进行处理(一般直接忽略即可)。
  虽然这种机制,能够处理上述的问题,但是也不是那么的完美。最大的问题就是,这个超时时间很难去给出一个很好的界定,首先网络本身就是不可靠的(除非,master和worker都在一个机房内,有专线进行沟通,但这地域容错性不高),还有worker也不是只处理一个任务可能还运行别的任务…..因此,如果把超时时间设置得太小,那么很容易发生超时;如果设置太大,那么超时机制的作用就会降低。
  但是,我个人是比较喜欢设置稍大的超时时间的,主要有以下的考虑:
- 超时显然是小概率的事件,因此设置稍大的超时时间也未尝不可,只需要在一定的时间内把该任务发现了就行
- worker应该要有重试机制,一个任务在某个worker上彻底无法重做的概率也是很小的,如果是这种情况确实应该重新分发。但是如果是别的情况,显然让worker进行重做该任务,显然比master重新分发该任务让别的worker执行效率要高。
Done
  这个就不细说了,就是master定时检查,是否所有任务都已经完成,回复给调用的client。
收获
  这个lab虽然不太难,但是做完之后收获还是不小。最大的一个感受就是,像这种依靠一个master进行分发的模型,如果能够维护好metadata的话,很多问题都能避免。
  还有就是,目前做的这个lab,因为时间关系很多方面都比较粗糙,比如说控制并发方面,我用了一个互斥锁就把所有metadata都锁起来。这样的操作导致该demo并发度并不会太高。优化的方案也想过,就是把互斥锁换成读写锁机制,还有学习concurrent_hashMap,在表中加分段加锁,这样就不会一次锁起整个表。
  还有就是,一些持久化操作也做,比如可以定时把master的metadata都持久化到磁盘,这样能够提高系统的容错性…
  ok,就到这里吧,社畜还要接着上班…
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!