绿色健康小清新

耐得住寂寞,守得住繁华

MIT6.824-lab1-MapReduce

MapReduce

MapReduce执行概述

通过将Map调用的输入数据自动分割为M个数据片段的集合,Map调用被分布到多台机器上执行。输入的数据片段能够在不同的机器上并行处理。使用分区函数将Map调用产生的中间key值分成R个不同分区(例如,hash(key) mod R),Reduce调用也被分布到多台机器上执行。分区数量(R)和分区函数由用户来指定。

流程:

  1. 用户程序首先调用的MapReduce库将输入文件分成M个数据分片,每个数据片段的大小一般从 16MB到64MB(可以通过可选的参数来控制每个数据片段的大小)。

  2. 然后用户程序在机群中创建大量的程序副本,这些程序有一个是master程序,其它的程序都是worker程序,由master分配任务(map/reduce)。有M个Map任务和R个Reduce任务将被分配,master将一个Map任务或Reduce任务分配给一个空闲的worker。(在真正实现的时候,可以每个worker主动去master处请求任务)

  3. 被分配了map任务的worker程序读取相关的输入数据分片,从输入的数据片段中解析出key/value pair,然后把key/value pair传递给用户自定义的Map函数,由Map函数生成并输出的中间key/value pair,并保存在内存中(可以进行一次合并)。

  4. 内存中的key/value pair通过分区函数分成R个区域,之后周期性的写入到本地磁盘上。key/value pair在本地磁盘上的存储位置将被回传给master,由master负责把这些存储位置再传送给Reduce worker。

  5. 当Reduce worker程序接收到master程序发来的数据存储位置信息后,使用RPC从Map worker所在主机的磁盘上读取相应分区缓存数据。当Reduce worker读取了所有的中间数据后,进行数据合并,通过对key进行排序后使得具有相同key值的数据聚合在一起(由于许多不同的key值会映射到相同的Reduce任务上,因此必须进行排序),形成一个key/interator pair。如果中间数据太大无法在内存中完成排序,那么就要在外部进行排序。

  6. Reduce worker程序遍历排序后的中间数据,对于每一个唯一的中间key值,Reduce worker程序将这个key值和它相关的中间value值的集合传递给用户自定义的Reduce函数。Reduce函数的输出被追加到所属分区的输出文件(存储在全局文件系统上)。

  7. 当所有的Map和Reduce任务都完成之后,master唤醒用户程序。在这个时候,在用户程序里的对MapReduce调用才返回。

在成功完成任务之后,MapReduce的输出存放在R个输出文件中(对应每个Reduce任务产生一个输出文件,文件名由用户指定)。一般情况下,用户不需要将这R个输出文件合并成一个文件,他们经常把这些文件作为另外一个MapReduce的输入,或者在另外一个可以处理多个分割文件的分布式应用中使用。

一般来说reduce的数量和分区数量相同,①如果ReduceTask数>分区数,则会多产生几个空的输出文件;②如果1<ReduceTask的数量<分区数,则有一部分分区数据无处安放,会Exception;③如果ReduceTask的数量=1,则不管MapTask端输出多少个分区文件,最终结果都交给这一个ReduceTask,最终也就只会产生一个结果文件

Master数据结构

Master持有一些数据结构,它存储每一个Map和Reduce任务的状态(空闲、工作中或完成),以及Worker机器(非空闲任务的机器)的标识。

Master就像一个数据管道,中间文件存储区域的位置信息通过这个管道从Map传递到Reduce。因此,对于每个已经完成的Map任务,master存储了Map任务产生的R个中间文件存储区域的大小和位置。当Map任务完成时,Master接收到位置和大小的更新信息,这些信息被逐步递增的推送给那些正在工作的Reduce任务。

容错机制

Master失效:一个简单的解决办法是让master周期性的将上面描述的数据结构写入磁盘,即检查点(checkpoint)。如果这个master任务失效了,可以从最后一个检查点(checkpoint)开始启动另一个master进程。然而,由于只有一个master进程,master失效后再恢复是比较麻烦的,因此我们现在的实现是如果master失效,就中止MapReduce运算。客户可以检查到这个状态,并且可以根据需要重新执行MapReduce操作。

Worker失效: 周期性的ping每个worker。如果在一个约定的时间范围内没有收到worker返回的信息,master将把这个worker标记为失效。所有由这个失效的worker完成的Map任务被重设为初始的空闲状态,之后这些任务就可以被安排给其他的worker。 同样的,worker失效时正在运行的Map或Reduce任务也将被重新置为空闲状态,等待重新调度。

**当worker故障时,由于已经完成的Map任务的输出存储在这台机器上,Map任务的输出已不可访问了,因此必须重新执行。**而已经完成的Reduce任务的输出存储在全局文件系统上,因此不需要再次执行。

当一个Map任务首先被worker A执行,之后由于worker A失效了又被调度到worker B执行,这个“重新执行”的动作会被通知给所有执行Reduce任务的worker。任何还没有从worker A读取数据的Reduce任务将从worker B读取数据。

**MapReduce可以处理大规模worker失效的情况。**比如,在一个MapReduce操作执行期间,在正在运行的集群上进行网络维护引起80台机器在几分钟内不可访问了,MapReduce master只需要简单的再次执行那些不可访问的worker完成的工作,之后继续执行未完成的任务,直到最终完成这个MapReduce操作。

存储位置

GFS把每个文件按64MB一个Block分隔,每个Block保存在多台机器上,环境中就存放了多份拷贝(一般是3个拷贝)。MapReduce的master在调度Map任务时会考虑输入文件的位置信息,**尽量将一个Map任务调度在包含相关输入数据拷贝的机器上执行;如果上述努力失败了,master将尝试在保存有输入数据拷贝的机器附近的机器上执行Map任务。**当在一个足够大的cluster集群上运行大型MapReduce操作的时候,大部分的输入数据都能从本地机器读取,因此消耗非常少的网络带宽。

任务粒度

我们有M个Map任务,R个Reduce任务,一般来说,M和R数量应该比Worker的机器数量多,一般的比例是:M:R:W:=200:5:2

任务备份

影响一个MapReduce的总执行时间最通常的因素是“落伍者”。出现“落伍者”的原因非常多。比如:如果一个机器的硬盘出了问题,在读取的时候要经常的进行读取纠错操作,导致读取数据的速度从30M/s降低到1M/s。如果cluster的调度系统在这台机器上又调度了其他的任务,由于CPU、内存、本地硬盘和网络带宽等竞争因素的存在,导致执行MapReduce代码的执行效率更加缓慢。

当一个MapReduce操作接近完成的时候,master调度备用(backup)任务进程来执行剩下的、处于处理中状态(in-progress)的任务。无论是最初的执行进程、还是备用(backup)任务进程完成了任务,我们都把这个任务标记成为已经完成。

改进

  1. 分区函数。一个默认的分区函数是使用hash方法(比如,hash(key) mod R)进行分区。hash方法能产生非常平衡的分区。然而,有的时候,其它的一些分区函数对key值进行的分区将非常有用。比如,输出的key值是URLs,我们希望每个主机的所有条目保持在同一个输出文件中。为了支持类似的情况,MapReduce库的用户需要提供专门的分区函数。例如,使用“hash(Hostname(urlkey)) mod R”作为分区函数就可以把所有来自同一个主机的URLs保存在同一个输出文件中。
  2. 合并函数。 Map函数产生的中间key值的重复数据会占很大的比重,用户可以自定义一个combiner函数,将相同key值的数据进行合并,然后存储在本地供reduce来取。一般情况下,Combiner和Reduce函数是一样的。Combiner函数和Reduce函数之间唯一的区别是MapReduce库怎样控制函数的输出。Reduce函数的输出被保存在最终的输出文件里,而Combiner函数的输出被写到中间文件里,等待reduce的获取。
  3. **输入输出类型。**MapReduce库支持几种不同的格式的输入数据。比如,文本模式的输入数据的每一行被视为是一个key/value pair。key是文件的偏移量,value是那一行的内容。另外一种常见的格式是以key进行排序来存储的key/value pair的序列。我们可以通过提供一个简单的Reader接口实现一个新的输入类型,Reader并非一定要从文件中读取数据,可以从数据库或者内存中的数据结构读取。
  4. 跳过糟糕记录。有时候,用户程序中的bug导致Map或者Reduce函数在处理某些记录的时候crash掉,MapReduce操作无法顺利完成,这是可以跳过。每个worker进程都设置了信号处理函数捕获内存段异常(segmentation violation)和总线错误(bus error)。在执行Map或者Reduce操作之前,MapReduce库通过全局变量保存记录序号。如果用户程序触发了一个系统信号,消息处理函数将用“最后一口气”通过UDP包向master发送处理的最后一条记录的序号。当master看到在处理某条特定记录不止失败一次时,master就标志着条记录需要被跳过,并且在下次重新执行相关的Map或者Reduce任务的时候跳过这条记录。
  5. 计数器。MapReduce库提供了计数器机制,它能用来统计不同活动的发生次数。例如,统计已经处理过的单词个数或者引用的德语文档的数量,等等。每个worker都有一个counter,每隔一段时间传给master,当MapReduce操作完成时,master会将这些已经成功完成的map和reduce任务中返回的counter的值聚合在一起,并将它们返回给用户代码,聚合时会去掉重复执行的map和reduce的counter。

任务

测试时,启动一个master和多个worker,也就是运行一次mrmaster.go、运行多次mrworker.go

master进程启动一个rpc服务器,每个worker进程通过rpc机制向Master要任务。任务可能是map任务和reduce任务,具体如何给worker分配取决于master

每个单词和它出现的次数以key-value键值对形式出现。map进程将每个出现的单词机械地分离出来,并给每一次出现标记为1次。很多单词在电子书中重复出现,也就产生了很多相同键值对。还没有对键值对进行合并,故此时产生的键值对的都是1。此过程在下图中mapper伸出箭头表示。

已经分离出的单词以键值对形式分配给特定reduce进程,reduce进程个数远小于单词个数,每个reduce进程都处理一定量单词。相同的单词应由相同的reduce进程处理。处理的方式和上面描述的算法类似,对单词排序,令单词在数组中处在相邻位置,再统计单词个数(也可以简单的通过一个map收集数据而省略了排序这一过程)。最终,每个reduce进程都有一个输出,合并这些输出,就是Word Count结果。此过程在下图中箭头进入reducer、以及后面的合并表示。

代码解析

总览

在这个项目中,第1部分的mrcoordinator.go和mrworker.go是程序的两个入口程序,mrsequential.go是官方提供的一个wc小案列,可以用于测试环境和借鉴一些思想。第2部分以pg-*命名的文件是map任务要读入的文件。第3部分是测试文件。第4部分是我们要实现的部分,我们也只要修改这一部分代码就可以了。第5部分是进行测试时程序要读入的插件文件,我们可以查看此文件查看map和reduce task的执行过程,需要将此文件构建为对应的so文件并加载,eg:wc.go->wc.so

coordinator

数据结构

coordinator就是master,对于lab1而言,一大难点就在于数据结构的设计。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
type TaskPhase int  //任务阶段
type TaskStatus int //任务状态

//任务阶段
const (
TaskPhase_Map TaskPhase = 0
TaskPhase_Reduce TaskPhase = 1
)

//任务状态
const (
TaskStatus_New TaskStatus = 0 //还没有创建
TaskStatus_Ready TaskStatus = 1 //进入队列
TaskStatus_Running TaskStatus = 2 //已经分配,正在运行
TaskStatus_Terminated TaskStatus = 3 //运行结束
TaskStatus_Error TaskStatus = 4 //运行出错
)

const (
ScheduleInterval = time.Millisecond * 500 //扫描任务状态的间隔时间
MaxTaskRunningTime = time.Second * 5 //每个任务的最大执行时间,用于判断是否超时
)

//任务
type Task struct {
FileName string //当前任务的文件名
Phase TaskPhase //当前任务状态
Seq int //当前的任务序列
NMap int //map任务/file的数量
NReduce int //reduce任务/分区的数量
Alive bool //是否存活
}

//任务状态
type TaskState struct {
Status TaskStatus //任务状态
WorkerId int //执行当前Task的workerid
StartTime time.Time //任务开始执行的时间
}

type Coordinator struct {
files []string //存储要处理的文件
nReduce int //reduce/分区数量
taskPhase TaskPhase //任务阶段
taskStates []TaskState //任务的状态
taskChan chan Task //任务队列
workerSeq int //worker序列
done bool //是否做完
muLock sync.Mutex //互斥锁

}

在coordinatro中,我定义了三种数据结构,分别是Coordinator,Task,TaskState。

Coordinator:

字段类型备注
files[]string存储要处理的文件,在构建coordinator时会传入
nReduceintreduce/分区的数量,在构建coordinator时会传入
taskPhaseTaskPhase任务阶段。MapReduce我们可以看做是两个阶段:Map和Reduce,而每个阶段Master要分配的任务也是不一样的。Map阶段的所有任务完成了才能开启Reduce阶段,Reduce阶段的任务全部完成,整个过程才算是结束了。(具体的阶段切换过程可以看scanTaskState()函数)
taskStates[]TaskState任务状态。存储了每个任务的状态,Master可以根据每个任务的状态进行不同的处理,将具体的任务和任务的状态进行了分离。(具体的不同状态的处理可以看scanTaskState()函数)
taskChanchan Task任务队列。用于存储任务,Master在定期扫描任务的状态时,发现有任务还没创建、任务处理超时、任务处理失败就会创建任务并放进任务队列。Worker可以通过RPC请求在任务队列中获取任务
workerSeqintWorker序列。用于存储已经注册的worker的数量,Worker创建后调用RPC请求来获取WorkerId
donebool是否做完。只有在Map和Reduce task全部做完done才能是true
muLocksync.Mutex互斥锁。在Coordinator的很多逻辑中,需要通过锁来保证共享数据的安全性。

TaskState:

字段类型备注
StatusTaskStatus任务状态,5中状态:TaskStatus_New,TaskStatus_Ready,TaskStatus_Running,TaskStatus_Terminated,TaskStatus_Error
WorkerIdint执行当前任务worker的id
StartTimetime.Time当前任务开始执行的时间,用于在扫描任务状态时判断是否超时

Task:

字段类型备注
FileNamestring当前任务的文件名。在Map阶段是要处理的每个文件的文件名;在Reduce阶段为空,因为Reduce阶段处理的文件是Map阶段生成的文件,有一定的命名规则,可以根据该规则获取文件名
PhaseTaskPhase当前任务的阶段
Seqint当前的任务序列。此序列是生成/获取中间文件名的一部分
NMapintmap任务/file的数量。用于在Reduce task的遍历获取中间文件
NReduceintreduce任务/分区的数量。用于在Map task的遍历获取中间文件
Aliveint是否存活。其实在本lab中并没有真正去将该值修改为false,但还是应该要有一个。

而另外的两个枚举和常量看意思应该就懂了吧。

初始化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func MakeCoordinator(files []string, nReduce int) *Coordinator {
c := Coordinator{
files: files,
nReduce: nReduce,
taskPhase: TaskPhase_Map,
taskStates: make([]TaskState, len(files)),
workerSeq: 0,
done: false,
}
if len(files) > nReduce {
c.taskChan = make(chan Task, len(files))
} else {
c.taskChan = make(chan Task, nReduce)
}

go c.schedule()
c.server()
DPrintf("master init")

return &c
}

调用MakeCoordinator方法会进行Coordinator的初始化,直接进入Map阶段;开启自身逻辑,进行定时任务;并且开启RPC服务器

1
2
3
4
5
6
7
8
9
10
11
12
func (c *Coordinator) server() {
rpc.Register(c) // 注册 RPC 服务
rpc.HandleHTTP() // 将 RPC 服务绑定到 HTTP 服务中去
//l, e := net.Listen("tcp", ":1234")
sockname := coordinatorSock()
os.Remove(sockname)
l, e := net.Listen("unix", sockname)
if e != nil {
log.Fatal("listen error:", e)
}
go http.Serve(l, nil)
}
自身逻辑

coordinator协程真正要做的事情就是定时的扫描每个任务的状态,并进行相应的处理;以及进行任务阶段的切换。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
//创建一个task
func (c *Coordinator) NewOneTask(seq int) Task {
task := Task{
FileName: "",
Phase: c.taskPhase,
NMap: len(c.files),
NReduce: c.nReduce,
Seq: seq,
Alive: true,
}

DPrintf("m:%+v, taskseq:%d, lenfiles:%d, lents:%d", c, seq, len(c.files), len(c.taskStates))

if task.Phase == TaskPhase_Map {
task.FileName = c.files[seq]
}
return task
}

//扫描任务状态并适当更新
func (c *Coordinator) scanTaskState() {
DPrintf("scanTaskState...")
c.muLock.Lock()
defer c.muLock.Unlock()

//这里不能使用函数Done(),因为此时已经上锁
if c.done {
return
}

allDone := true
//循环每个任务的状态
for k, v := range c.taskStates {
switch v.Status {
case TaskStatus_New:
allDone = false
c.taskStates[k].Status = TaskStatus_Ready
c.taskChan <- c.NewOneTask(k)
case TaskStatus_Ready:
allDone = false
case TaskStatus_Running:
allDone = false
//超时重新分配该任务
if time.Now().Sub(v.StartTime) > MaxTaskRunningTime {
c.taskStates[k].Status = TaskStatus_Ready
c.taskChan <- c.NewOneTask(k)
}
case TaskStatus_Terminated:
case TaskStatus_Error:
allDone = false
c.taskStates[k].Status = TaskStatus_Ready
c.taskChan <- c.NewOneTask(k)
default:
panic("t. status err in schedule")
}
}

if allDone {
if c.taskPhase == TaskPhase_Map {
//进入Reduce阶段
DPrintf("init ReduceTask")
c.taskPhase = TaskPhase_Reduce
c.taskStates = make([]TaskState, c.nReduce)
} else {
log.Println("finish all tasks!!!😊")
c.done = true
}
}
}

//定时更新状态
func (c *Coordinator) schedule() {
for !c.Done() {
c.scanTaskState()
time.Sleep(ScheduleInterval)
}
}

func (c *Coordinator) Done() bool {
c.muLock.Lock()
defer c.muLock.Unlock()

return c.done
}
RPC方法

每个Worker可以调用Coordinator提供的三个方法:

  • RegWorker:Worker在初始化的时候就要调用此方法来注册并获取自身的WorkerId
  • GetOneTask:通过此方法可以获取任务
  • ReportTask:通过此方法报告每个任务的执行情况
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
//处理Rpc请求:获取任务
func (c *Coordinator) GetOneTask(args *TaskArgs, reply *TaskReply) error {
task := <-c.taskChan
reply.Task = &task

if task.Alive {
//修改状态
c.muLock.Lock()
if task.Phase != c.taskPhase {
return errors.New("GetOneTask Task phase neq")
}
c.taskStates[task.Seq].WorkerId = args.WorkerId
c.taskStates[task.Seq].Status = TaskStatus_Running
c.taskStates[task.Seq].StartTime = time.Now()
c.muLock.Unlock()
}

DPrintf("in get one Task, args:%+v, reply:%+v", args, reply)
return nil
}

//处理Rpc请求:注册worker
func (c *Coordinator) RegWorker(args *RegArgs, reply *RegReply) error {
DPrintf("worker reg!")
c.muLock.Lock()
defer c.muLock.Unlock()
c.workerSeq++
reply.WorkerId = c.workerSeq
return nil
}

//处理Rpc请求:worker响应task完成情况
func (c *Coordinator) ReportTask(args *ReportTaskArgs, reply *ReportTaskReply) error {
c.muLock.Lock()
defer c.muLock.Unlock()

DPrintf("get report task: %+v, taskPhase: %+v", args, c.taskPhase)

//如果发现阶段不同或者当前任务已经分配给了其它worker就不修改当前任务状态
if c.taskPhase != args.Phase || c.taskStates[args.Seq].WorkerId != args.WorkerId {
DPrintf("in report task,workerId=%v report a useless task=%v", args.WorkerId, args.Seq)
return nil
}

if args.Done {
c.taskStates[args.Seq].Status = TaskStatus_Terminated
} else {
c.taskStates[args.Seq].Status = TaskStatus_Error
}

go c.scanTaskState()
return nil
}

worker

数据结构
1
2
3
4
5
6
7
8
9
10
type KeyValue struct {
Key string
Value string
}

type worker struct {
worerId int
mapF func(string, string) []KeyValue
reduceF func(string, []string) string
}

KeyValue是已经给我们定义好的,主要用来存储中间的key-value。而worker的定义也很简单,仅仅存储了worker的id和使用的map、reduce函数。

初始化

初始化时,除了初始化worker的数据结构,还要进行rpc请求注册当前worker,初始化worker的id,然后就可以运行worker。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {

// Your worker implementation here.
worker := worker{
mapF: mapf,
reduceF: reducef,
}

worker.register()
worker.run()
// uncomment to send the Example RPC to the coordinator.
// CallExample()

}
自身逻辑

Worker要做的事情很简单,就是不断的发送rpc请求获取任务,并根据任务的不同阶段进行Map或Reduce处理。

Map任务的处理流程:

  1. 通过任务的fileName读取文件内容
  2. 对读取的数据进行map处理生成中间key-value对
  3. 对相同的key数据进行压缩(省略)
  4. 创建一个二维数组,将map处理后生成的中间key-value对通过分区函数存储到不同的分区数组中
  5. 为每个分区创建一个文件,并将当前分区的数据全部存入文件

Reduce任务的处理流程:

  1. 通过文件命名规则读取每个Map task生成的当前分区的数据
  2. 对数据进行排序(这里是通过一个map集合进行存储数据,而省略了排序过程)
  3. 每个key进行reduce处理生成最终结果并存入集合中
  4. 将最终数据存入分区文件中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
func (w *worker) run() {
DPrintf("run")
for {
task, err := w.getTask()
if err != nil {
DPrintf(err.Error())
continue
}
if !task.Alive {
DPrintf("worker get task not alive, exit")
return
}
w.doTask(*task)
}
}

//开始做任务
func (w *worker) doTask(task Task) {
switch task.Phase {
case TaskPhase_Map:
w.doMapTask(task)
case TaskPhase_Reduce:
w.doReduceTask(task)
default:
panic(fmt.Sprintf("task phase err: %v", task.Phase))
}
}

func ihash(key string) int {
h := fnv.New32a()
h.Write([]byte(key))
return int(h.Sum32() & 0x7fffffff)
}

//map任务时获取要输出的文件名
func (w *worker) getReduceName(mapId, partitionId int) string {
return fmt.Sprintf("mr-kv-%d-%d", mapId, partitionId)
}

//reduce任务时获取要输出的文件名
func (w *worker) getMergeName(partitionId int) string {
return fmt.Sprintf("mr-out-%d", partitionId)
}

//做map任务
func (w *worker) doMapTask(task Task) {
DPrintf("%v start read file %v", w.worerId, task.FileName)
cont, err := ioutil.ReadFile(task.FileName)
if err != nil {
DPrintf("%v", err)
w.reportTask(task, false)
return
}

kvs := w.mapF(task.FileName, string(cont))
partions := make([][]KeyValue, task.NReduce)
for _, kv := range kvs {
pid := ihash(kv.Key) % task.NReduce
partions[pid] = append(partions[pid], kv)
}

for k, v := range partions {
fileName := w.getReduceName(task.Seq, k)
file, err := os.Create(fileName)
if err != nil {
DPrintf("create file-%v fail in doMapTask. %v", fileName, err)
w.reportTask(task, false)
return
}
encoder := json.NewEncoder(file)
for _, kv := range v {
if err := encoder.Encode(&kv); err != nil {
DPrintf("encode kvs to file-%v fail in doMapTask. %v", fileName, err)
w.reportTask(task, false)
}
}
if err := file.Close(); err != nil {
DPrintf("close file-%v fail in doMapTask. %v", fileName, err)
w.reportTask(task, false)
}
}
w.reportTask(task, true)
}

//做reduce任务
func (w *worker) doReduceTask(task Task) {
maps := make(map[string][]string)

for i := 0; i < task.NMap; i++ {
fileName := w.getReduceName(i, task.Seq)
file, err := os.Open(fileName)
if err != nil {
DPrintf("open file-%v fail in doReduceTask. %v", fileName, err)
w.reportTask(task, false)
return
}
decoder := json.NewDecoder(file)
for {
var kv KeyValue
if err := decoder.Decode(&kv); err != nil {
break
}
if _, ok := maps[kv.Key]; !ok {
maps[kv.Key] = make([]string, 0)
}
maps[kv.Key] = append(maps[kv.Key], kv.Value)
}
}

res := make([]string, 0)
for k, v := range maps {
len := w.reduceF(k, v)
res = append(res, fmt.Sprintf("%v %v\n", k, len))
}

fileName := w.getMergeName(task.Seq)
if err := ioutil.WriteFile(fileName, []byte(strings.Join(res, "")), 0600); err != nil {
DPrintf("write file-%v in doReduceTask. %v", fileName, err)
w.reportTask(task, false)
}

w.reportTask(task, true)
}
RPC方法

和coordinator对应,也是三个方法:

  • register:Worker在初始化的时候就要调用此方法来注册并获取自身的WorkerId,在coordinator中使用一个seq来记录worker个数
  • getTask:通过此方法可以获取任务
  • reportTask:通过此方法报告任务的执行情况
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
//rpc请求:注册worker
func (w *worker) register() {
DPrintf("reg")
args := &RegArgs{}
reply := &RegReply{}

if err := call("Coordinator.RegWorker", args, reply); !err {
log.Fatal("worker register error!", err)
}
w.worerId = reply.WorkerId
}

//rpc请求:请求获取任务
func (w *worker) getTask() (*Task, error) {
args := TaskArgs{WorkerId: w.worerId}
reply := TaskReply{}

if err := call("Coordinator.GetOneTask", &args, &reply); !err {
return nil, errors.New("worker getTask error!")
}
DPrintf("worker get task:%+v", reply.Task)
return reply.Task, nil
}

//rpc请求:报告任务状态
func (w *worker) reportTask(task Task, done bool) {
args := ReportTaskArgs{
WorkerId: w.worerId,
Phase: task.Phase,
Seq: task.Seq,
Done: done,
}
reply := ReportTaskReply{}
if ok := call("Coordinator.ReportTask", &args, &reply); !ok {
DPrintf("report task fail:%+v", args)
}
}

//
// send an RPC request to the coordinator, wait for the response.
// usually returns true.
// returns false if something goes wrong.
//
func call(rpcname string, args interface{}, reply interface{}) bool {
// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
sockname := coordinatorSock()
conn, err := rpc.DialHTTP("unix", sockname)
if err != nil {
log.Fatal("dialing:", err)
}
defer conn.Close()

err = conn.Call(rpcname, args, reply) //rpcname = 结构体名.方法名
if err == nil {
return true
}

fmt.Println(err)
return false
}

RPC

主要是用于定义RPC通信时调用方法传入的参数和结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
type ExampleArgs struct {
X int
}

type ExampleReply struct {
Y int
}

// Add your RPC definitions here.

//用于获取任务
type TaskArgs struct {
WorkerId int
}

type TaskReply struct {
Task *Task
}

//用于worker创建后的注册
type RegArgs struct {
}

type RegReply struct {
WorkerId int
}

//用于worker响应任务
type ReportTaskArgs struct {
WorkerId int
Phase TaskPhase
Seq int
Done bool
}

type ReportTaskReply struct {
}

// Cook up a unique-ish UNIX-domain socket name
// in /var/tmp, for the coordinator.
// Can't use the current directory since
// Athena AFS doesn't support UNIX-domain sockets.
func coordinatorSock() string {
s := "/var/tmp/824-mr-"
s += strconv.Itoa(os.Getuid())
return s
}

common

主要用于定义一些通用的动作,在这里仅定义了打印日志。在debug下是直接输出,在运行状态下会输出都log文件中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
const Debug = false

var file *os.File

func init() {
rand.Seed(10)
f, err := os.Create("log-" + strconv.Itoa(int(time.Now().Unix()+rand.Int63n(100))) + ".txt")
if err != nil {
DPrintf("log create file fail!")
}
file = f
}

//debug下打印日志
func DPrintf(format string, value ...interface{}) {
now := time.Now()
info := fmt.Sprintf("%v-%v-%v %v:%v:%v: ", now.Year(), int(now.Month()), now.Day(), now.Hour(), now.Minute(), now.Second()) + fmt.Sprintf(format+"\n", value...)

if Debug {
log.Printf(info)
} else {
file.WriteString(info)
}
}

运行

  1. 删除文件

    1
    2
    3
    rm -f log*
    rm -f mr-kv*
    rm -f mr-out*
  2. 运行coordinator

    1
    go run -race mrcoordinator.go pg-*.txt
  3. 运行worker

    1
    2
    go build -race -buildmode=plugin ../mrapps/wc.go
    go run -race mrworker.go wc.so

测试

当你测试的时候,如果是在linux中运行,会报错:

1
2
3
4
5
6
test-mr.sh:行2: $'\r': 未找到命令
test-mr.sh:行6: $'\r': 未找到命令
test-mr.sh:行9: $'\r': 未找到命令
test-mr.sh:行10: 条件表达式中有语法错误
' 附近有语法错误 `]]
'est-mr.sh:行10: `if [[ "$OSTYPE" = "darwin"* ]]

这是因为命令直接从windows 复制过来导致的,我们可以安装一个dos2unix

1
yum install dos2unix

安装后然后:

1
dos2unix test-mr.sh

然后运行就可以了:

1
bash test-mr.sh

输出文件

在运行时会创建一个mr-tmp文件夹,文件都在此文件夹中:

log就是我们的输出日志,mr-kv-*是map处理后输出的中间key-value对文件,mr-out-*是reduce处理后输出的分区文件。

其他的文件就是进行测试后生成的文件

注意

注:我的实现有一个测试用例并没有通过,early exit test这里会FAIL,经过一段时间的排错后,感觉没什么问题,应该是什么细节漏掉了,但也无伤大雅。

参考

-------------本文结束感谢您的阅读-------------
六经蕴籍胸中久,一剑十年磨在手

欢迎关注我的其它发布渠道