《the MapReduce》 --- lab1

  最近稍微有空,就把割了很久的lab1给做了。在做的过程中磕磕碰碰,碰到了不少坑但是也总结了不少经验。感觉对日常的工作还是有不少帮助的,下面就来介绍本次的lab1。

MapReduce

  简单来说MapReduce就是把一个大任务,拆成若干个小的任务。然后把这些任务分发到不同的worker(服务器)上面去运行,然后再把他们执行的结果聚拢到一起,回复给外界。

  至于原理是什么我就不多说了,有兴趣的可以看看Jeff Dean的论文。 【the MapReduce】

  下面来介绍,如何实现一个简单的MapReduce,这里只介绍master是如何设计,该lab我做下来,感觉最大的难点是在于如何维护metadata。至于worker的设计倒没有什么特别的地方。

Master,Task相关的数据结构

​ 关于task(map和reduce),其实不难,仅需要把关于task的信息都抽象出来即可。如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
type MapTaskInfo struct {
MapTaskSeq int
MapTaskState int
MapTaskStartTimeStamp uint64
MapTaskAssignedWorkerId uint64
InputFilePathLocation string
IntermediateFileLocation []string
}

type ReduceTaskInfo struct {
ReduceTaskSeq int
ReduceTaskState int
ReduceTaskStartTimeStamp uint64
ReduceTaskAssignedWorkerId uint64
InterMediatePathLocationList []string
OutputFilePathLocation string
}

// 枚举量
const (
MAP_TASK_STATE_INIT = 0
MAP_TASK_STATE_RUN = 1
MAP_TASK_STATE_FINISH = 2
)

const (
REDUCE_TASK_STATE_BLOCK = -1
REDUCE_TASK_STATE_INIT = 0
REDUCE_TASK_STATE_RUN = 1
REDUCE_TASK_STATE_FINISH = 2
)
  • TaskSeq:task的序列号,用来区分不同的task
  • TaskState:用来描述每个task的状态
  • TaskStartTimeStamp:master每次分发一个task之后,都会记录该task的分发时间,用于后续的超时判断
  • TaskAssignedWorkerId:记录该task被分发到哪个worker上,这个字段和时间戳有助于提高系统的容错
  • InputPath和OutputPath:用来记录文件读取,和文件输出的路径

  这里解释一下,为什么reduce的状态枚举量比map多了一个BLOCK状态。因为,reduce的输入文件,是map的输出。因此,只有某个reduce的输入数据都准备好之后,它才能够运行。因此,在此之前它都是阻塞状态。

  下面就是master的数据结构,在我实现的lab中,所设计的Master的数据结构参考了GFS里面的master。master存储着整个集群的控制信息,以及提供任务分发的接口。

  这是我一开始设计的Master结构体。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 1.最初设计的Master
type Master struct {
// Your definitions here.
initMapTaskTable List.list
processMapTaskTable List.list
doneMapTaskTable List.list

initReduceTaskTable. List.list
processReduceTaskTable List.list
doneReduceTaskTable List.list
}

// 2.后面设计的Master
type Master struct {
// Your definitions here.
tableMapTask map[int]*MapTaskInfo
mapNum int
tableReduceTask map[int]*ReduceTaskInfo
reduceNum int
}

先说一下这两种设计各有什么优劣之处。

(1)从任务分发的角度来看

  第一种设计,采用了三个队列来存储不同状态的task,这样就导致分发task的效率会很高,如果想要把一个空的task分发出去,直接从INIT状态的队列中直接取出来,效率很高,复杂度在O(1)。

  但是,做出同样的操作第二种设计却需要O(n)的时间复杂度。

(2)从数据的完整性来看

  在这个角度来看,第一种设计的数据持久型做的不够好,我觉得它其实跟“分布式”有一点点像,在极端的情况下会数据丢失的。

  第二种结构,所有状态的task都用一个表来进行管理,这种又有点像”单机“,它的原子性和持久性能达到比较好的保障。

  这个的具体原因,在下面的任务分发接口中会提及。

Master的接口

  一开始做这个lab的时候,其实我是有一点懵的。因为MIT给出的实验文件,仅仅只有master提供rpc接口,worker却没有rpc接口。也就是说,master并不能主动向worker分发任务。

  后来通过查阅文档之后发现,该lab的设计是通过,worker在一个loop中不断的向master请求任务,解决任务,然后恢复master。不断执行这个loop。

  这种设计方案,有一个好处,就是能够充分利用worker的性能,尽量减少worker闲置的情况发生。

任务分发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
type AssignTaskReq struct {
workerId uint64
}

type AssignTaskReply struct {
replyTaskType int
mapTaskInfo *MapTaskInfo
ReduceTaskInfo *ReduceTaskInfo
}

// 分发任务的接口
func (m *Master) assignTask2Worker(req *AssignTaskReq, rsp *AssignTaskReply) error {

mutexLock.Lock()
defer mutexLock.Unlock()

// 1.check if propre map task for work
mapTask,assignMapSuccess := m.getInitMapTaskForAssign()
if assignMapSuccess {
// 1. change the info of map task
mapTask.MapTaskState = MAP_TASK_STATE_RUN
mapTask.MapTaskStartTimeStamp = uint64(time.Now().Unix())
mapTask.MapTaskAssignedWorkerId = req.workerId

// 2. set reply
rsp.mapTaskInfo = mapTask
rsp.replyTaskType = TASK_TYPE_MAP

return nil
}

// 2.try to assign reduce task
reduceTask,assignReduceSuccess := m.getInitReduceTaskForAssign()
if assignReduceSuccess {
// register reduce task
reduceTask.ReduceTaskState = REDUCE_TASK_STATE_RUN
reduceTask.ReduceTaskStartTimeStamp = uint64(time.Now().Unix())
reduceTask.ReduceTaskAssignedWorkerId = req.workerId

// set reply
rsp.replyTaskType = TASK_TYPE_REDUCE
rsp.ReduceTaskInfo = reduceTask

return nil
}

return nil
}

  先简单说一下分发任务的逻辑。因为,map任务是产生中间数据集,而reduce任务的输入就是map产生的中间数据集。因此,在任务分发的时候,显然map任务有更高的优先级,尽量让map任务先做完。

  以map的分发为例,需要分发的是INIT状态的任务,已经在计算和运行结束的任务不必再次分发。这里通过调用m.getInitMapTaskForAssign()获得一个**能够分发的任务(INIT)**。(如果找不到,说明map已经全部分发完毕,可以把reduce进行分发这个逻辑跟分发map类似)

  后面就是对该任务进行注册(修改状态,添加分发时间,以及分给哪个worker)。

以上就是分发任务接口的实现逻辑,十分简单。下面想讨论,在做这个lab遇到的问题以及细节:

(1)数据的完整性以及分发的原子性

  刚开始做这个lab的时候,我设计的master是用三个队列来存储taskInfo的。这种设计,可以马上在INIT状态的队列里面拿出一个任务进行处理。而如果用table的话,需要遍历整个表。

  但是,这样三队列这种设计,其实它原子性是不高的。这种设计之下,它的分发其实是三个动作:(1)从INIT队列中拿出一个task (2)修改该task的信息 (3)把这个task插入到PROCESSING队列中

  这三个都是独立的操作,即使定期给Master做一个快照,但还是会有数据丢失的情况发生。比如说以下这种场景:

  • 从INIT队列中取出一个task
  • 系统进行快照备份
  • 备份完之后发生了crash

  那么,在这种场景下,从INIT拿出来的这个task就会消失了。因为,发生持久化的时候,该task并不在这三个队列中,持久化的时候并不能把该task写入到硬盘中去。如果没有发生crash则没关系,若是在这个时候发生crash了,这个task是仅仅存在于内存里面的,因此会导致数据丢失。(除非持久化的时候,不仅存储那三个队列,还把内存中的数据一并持久化。但是真要这要做,开销是十分巨大的)

  正是因为这个原因,我选择了把所有task存储在一个table里面,这样就避免了这种弱原子性的问题。并且考虑到,在table里面拿到一个可以分发的task的平均复杂度不会太高,因此这种方案还是能够接受的。

(2)处理逻辑和发送回复的顺序问题

1
2
3
4
5
6
7
8
9
// 1. change the info of map task
mapTask.MapTaskState = MAP_TASK_STATE_RUN
mapTask.MapTaskStartTimeStamp = uint64(time.Now().Unix())
mapTask.MapTaskAssignedWorkerId = req.workerId

// 2. set reply
rsp.mapTaskInfo = mapTask
rsp.replyTaskType = TASK_TYPE_MAP

  简单来说其实就是先处理注册逻辑还是先发送回复的问题。在这个lab中其实不会遇到这个问题,因为太久没用过rpc了,当时以为只要设置了rsp就会把回复发送给调用者。实际上,会等到这个接口调用结束之后,rpc框架才会对rsp进行序列化并通过网络进行传输。

  虽然,这个问题在该lab中没有体现,但是还是有不少参考意义(比如在MQ中这应该会经常碰到)。

  对于这个问题,有两种方案,下面来分析一下它们的优劣势:

1.先发送rsp,再处理metadata(又或者是业务逻辑)

  当时做这个的时候,主要考虑的问题是。当我发送完这个回复请求之后,还没到业务逻辑的处理,这时系统发生宕机了,我该怎么进行处理?

  如果发送rsp之后发生宕机了,假设网络没有任何问题,worker就会收到该请求,然后开始处理这个计算任务,从这个角度来看worker认为master已经把任务分发给他了。

  但是从master的角度来说,它虽然已经发送了请求,但是metadata还没来得及修改,然而master作出的决策也基本是依靠metadata。因此从这个角度master不认为已经把任务分发了,在master的metadata中,该任务仍就是处于INIT状态。

  正是因为上述原因,需要额外的机制来处理这种错误。此时,metadata没有修改,但是worker已经收到分发的任务了,并开始处理。处理完成后,会调用task_finish接口让master处理。但这时候,master就会很困惑了,因为该任务是INIT又被分发到别的worker上

  其实,这个处理也不难,只要回复的workerId 与 master记录的把该task分发到哪个workerId上,这两个对不上,master就不进行处理就好了。不要弄太复杂的逻辑。

2.先处理metadata,再发送请求

  跟上面的问题类似,在这种场景下,从master的角度来看,确实是把任务分发了,因为metadata已经被修改了。但是从worker的角度来看,它没有收到分发的任务。

  这种的处理策略也很简单,我们只需要等超时处理,自动把该任务回收就好了。

  经过综合考虑之后,我决定选择第二种策略去做该lab。

任务结束

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
func(m *Master) finishTaskHandler(req *DoneTaskReq,rsp *DoneTaskRsp) error {

mutexLock.Lock()
defer mutexLock.Unlock()

// get the task
if req.taskType == TASK_TYPE_MAP {
m.finishMapTask(req)
}else if req.taskType == TASK_TYPE_REDUCE {
m.finishReduceTask(req)
}

return nil
}

func(m *Master) finishMapTask(req *DoneTaskReq){

// get the task
taskInfo := m.tableMapTask[req.taskSeq]

if checkMapTaskCanSetFinish(req.workerId, uint64(time.Now().Unix()),taskInfo){
// set the task finish
m.intermediateHandler(req.outputFilePath)
taskInfo.MapTaskState = MAP_TASK_STATE_FINISH
}else{
// if the worker id or the task timeout,don't do anything just let the timeout handler to solve the problem
// it can simplfy the design
}

return
}

func(m *Master) finishReduceTask(req *DoneTaskReq) {

// get the task
reduceTaskInfo := m.tableReduceTask[req.taskSeq]

if checkReduceTaskCanSetFinish(req.workerId,uint64(time.Now().Unix()),reduceTaskInfo) {
reduceTaskInfo.OutputFilePathLocation = req.outputFilePath[0]
reduceTaskInfo.ReduceTaskState = REDUCE_TASK_STATE_FINISH
}

// let the timeout handler the solute this situation

return
}

  当worker处理完任务之后,就会回调该函数,然后让master进行metadata的处理。因为worker能处理map_task和reduce_task,因此在回调的之后,需要指明当前worker是处理的哪个task。

  同时,需要知道的是,有超时机制的存在,导致master有可能把任务再次分发给到了另一个worker。因此,master在处理任务结束的回调,最主要的是处理metadata的信息是否准确。当时有想过,在校验worker信息的时候,同时也检测超时。但是,后来还是觉得应该遵从单一原则,不应该把事情弄得过于复杂。并且,如果在处理结束任务时,实际上该任务已经超时了,但是超时任务还没来得及启动处理。我想,最终是否是超时应该交给超时机制去处理。

任务监控(timeout)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func (m *Master) MapCheckingHandler(){

for{
mutexLock.Lock()
defer mutexLock.Unlock()
nowTimeStamp := uint64(time.Now().Unix())
for _,mapTaskInfo := range m.tableMapTask {
if mapTaskInfo.MapTaskState == MAP_TASK_STATE_RUN &&
checkTaskTimeout(nowTimeStamp,mapTaskInfo.MapTaskStartTimeStamp) {

// eraser map-register info
mapTaskInfo.MapTaskState = MAP_TASK_STATE_INIT
mapTaskInfo.MapTaskAssignedWorkerId = NO_WORKER_ID
mapTaskInfo.MapTaskStartTimeStamp = 0
}
}
mutexLock.Unlock()
// sleep 15 sec
time.Sleep(100*time.Millisecond*2)
}

}

func (m *Master) ReduceCheckingHandler(){

for{
mutexLock.Lock()
defer mutexLock.Unlock()
nowTimestamp := uint64(time.Now().Unix())
for _,reduceTask := range m.tableReduceTask{
if reduceTask.ReduceTaskState == REDUCE_TASK_STATE_RUN &&
checkTaskTimeout(nowTimestamp,reduceTask.ReduceTaskStartTimeStamp){
// eraser reduce task register info
reduceTask.ReduceTaskState = REDUCE_TASK_STATE_INIT
reduceTask.ReduceTaskAssignedWorkerId = NO_WORKER_ID
reduceTask.ReduceTaskStartTimeStamp = 0

} else if reduceTask.ReduceTaskState == REDUCE_TASK_STATE_BLOCK &&
checkReduceTaskCanSetInit(reduceTask){
// 检查是否能够把某个reduce-task set init
reduceTask.ReduceTaskState = REDUCE_TASK_STATE_INIT
}
}
mutexLock.Unlock()
time.Sleep(100*time.Millisecond*2)
}

}

  对于管理metadata的节点来说,我觉得监控是十分重要的一个机制。感觉很多容错的措施都是建立在监控的基础上的,比如说运行在某个worker上面的任务,因为宕机等原因,造成该任务没有办法完成。如果没有额外的机制去处理的话,master将会永远等待这个“无法完成”的任务,这显然不合理。

  因此,需要引入一个机制来帮助master筛选那些已经“坏掉的任务”。在这个lab中,我通过超时处理来达成这个目的。一般情况下,worker都能在规定时间内完成分发给它的任务,因为分发下去的任务都是一些子任务,并不会有很大的数据量,除非一些不可抗的因素(网络IO慢,worker负载高,宕机)发生,才会导致任务的超时。

  当一个任务被分发下去的时候,master便会记录该任务分发的时间,以及分发给哪个worker。一开始设计的时候,我仅仅是记录分发的时间,没有记录分发给哪个worker。

  后面验证的时候发现了逻辑漏洞,如果不记录worker的话,当一个任务超时,master会把该任务重新设置为可分发状态。当旧的worker完成任务后,调用task_finish接口的时候,如果此时任务处于INIT状态,master显然能够这道此时的处理的task_finish是超时的任务。但是,如果master把这个超时的任务,重新分发给到了别的worker,那么master旧很难区分到底是谁调用了task_finish因此,我加了一个workerId方便master进行校验。

  把这些该保存的信息,都记录下之后,便开启开启一个协程去处理超时。该协程是一个endless loop,并且在每个周期内都会检查所有

metadata,如果某个任务超时,便将它重新设置为初始化状态。这样即使,master收到已经超时的任务,它也有能力进行处理(一般直接忽略即可)。

  虽然这种机制,能够处理上述的问题,但是也不是那么的完美。最大的问题就是,这个超时时间很难去给出一个很好的界定,首先网络本身就是不可靠的(除非,master和worker都在一个机房内,有专线进行沟通,但这地域容错性不高),还有worker也不是只处理一个任务可能还运行别的任务…..因此,如果把超时时间设置得太小,那么很容易发生超时;如果设置太大,那么超时机制的作用就会降低。

  但是,我个人是比较喜欢设置稍大的超时时间的,主要有以下的考虑:

  • 超时显然是小概率的事件,因此设置稍大的超时时间也未尝不可,只需要在一定的时间内把该任务发现了就行
  • worker应该要有重试机制,一个任务在某个worker上彻底无法重做的概率也是很小的,如果是这种情况确实应该重新分发。但是如果是别的情况,显然让worker进行重做该任务,显然比master重新分发该任务让别的worker执行效率要高。

Done

  这个就不细说了,就是master定时检查,是否所有任务都已经完成,回复给调用的client。

收获

  这个lab虽然不太难,但是做完之后收获还是不小。最大的一个感受就是,像这种依靠一个master进行分发的模型,如果能够维护好metadata的话,很多问题都能避免。

  还有就是,目前做的这个lab,因为时间关系很多方面都比较粗糙,比如说控制并发方面,我用了一个互斥锁就把所有metadata都锁起来。这样的操作导致该demo并发度并不会太高。优化的方案也想过,就是把互斥锁换成读写锁机制,还有学习concurrent_hashMap,在表中加分段加锁,这样就不会一次锁起整个表。

  还有就是,一些持久化操作也做,比如可以定时把master的metadata都持久化到磁盘,这样能够提高系统的容错性…

  ok,就到这里吧,社畜还要接着上班…