绿色健康小清新

耐得住寂寞,守得住繁华

MIT6.824-lab2D-日志压缩(log compaction)

2D(日志压缩)

做到现在为止,我们重启的服务器会重放完整的 Raft 日志以恢复其状态。但是,长期运行的服务永远记住完整的 Raft 日志是不切实际的。因此,我们可以做一个快照,快照存储了在某一个index之前的所有日志的一个状态, Raft 就可以看丢弃快照之前的日志条目。结果是更少量的持久数据和更快的重启。

任务

主要是实现四个API:

  • sendInstallSnapshotToPeer(server int) :向指定的节点发送快照;
  • InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply):节点接收到快照;
  • Snapshot(index int, snapshot []byte):index是快照所包含的最后一个日志的index,snapshot是已经处理好的快照字节流,用来生成一次快照;
  • CondInstallSnapshot(lastIncludedTerm int, lastIncludedIndex int, snapshot []byte) bool:当follower收到leader的快照并判断接收后,要生成一个快照命令,应用该快照命令时会调用CondInstallSnapshot()函数进行处理;

这里要注意的点就是:

  • log日志中0位置存储的是快照,其实0位置真正存储的是一个空命令,可以理解为是快照的一个标识,lastSnapshotTerm和lastSnapshotIndex分别表示当前快照的term和index,也就是快照所包含的最后一个日志的term和index;
  • 快照命令并不算做日志的一项,该命令和日志所记录的命令是分开进行处理的。当follower判断接收了一个快照后,就会生成一个快照命令并发送到applyChan来进行处理。
  • 定时处理snapshot/初始化时读取快照。follower会接收到leader发送的snapshot,生成命令进行处理,但是当节点crash重启后,没法生成命令进行处理,因此需要手动调用CondInstallSnapshot函数应用一次快照,否则lastApplied属性的内容会有错,导致crash后,数据始终无法进行同步。(这点千万不要遗漏,虽然后来想到确实要这么写,但是当是当局者迷了,找了很久的问题才将最后一个测试通过)

任务须知

如果系统设置了快照,那么raft会使用applierSnap来处理我们要应用的命令:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func make_config(t *testing.T, n int, unreliable bool, snapshot bool) *config {
...
applier := cfg.applier
if snapshot {
//如果开启了快照,则使用这个applier
applier = cfg.applierSnap
}
// create a full set of Rafts.
for i := 0; i < cfg.n; i++ {
cfg.logs[i] = map[int]interface{}{}
cfg.start1(i, applier) //启动一个raft节点
}
...
}

func (cfg *config) start1(i int, applier func(int, chan ApplyMsg)) {
...
go applier(i, applyCh) //创建一个协成不断运行
...
}

具体的运行函数applierSnap:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func (cfg *config) applierSnap(i int, applyCh chan ApplyMsg) {
cfg.mu.Lock()
rf := cfg.rafts[i]
cfg.mu.Unlock()
if rf == nil {
return // ???
}

for m := range applyCh {
err_msg := ""
//如果是一个快照命令
if m.SnapshotValid {
//如果是一个快照命令,代表着接收了leader发来的新快照,调用CondInstallSnapshot()进行处理
if rf.CondInstallSnapshot(m.SnapshotTerm, m.SnapshotIndex, m.Snapshot) {
cfg.mu.Lock()
err_msg = cfg.ingestSnap(i, m.Snapshot, m.SnapshotIndex)
cfg.mu.Unlock()
}
//如果是一个普通命令(也就是日志存储的命令)
} else if m.CommandValid {
...
//每有10条日志,就调用Snapshot()进行一次快照生成(其实是9条,因为0位置存储的是快照)
if (m.CommandIndex+1)%SnapShotInterval == 0 {
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
e.Encode(m.CommandIndex)
var xlog []interface{}
for j := 0; j <= m.CommandIndex; j++ {
xlog = append(xlog, cfg.logs[i][j])
}
e.Encode(xlog)
rf.Snapshot(m.CommandIndex, w.Bytes())
}
} else {
// Ignore other types of ApplyMsg.
}
if err_msg != "" {
log.Fatalf("apply error: %v", err_msg)
cfg.applyErr[i] = err_msg
}
}
}

根据以上的源码,我们可以知道:

  • raft会根据日志的数量,调用Snapshot()来进行一次快照生成;
  • 当follower收到leader的快照并判断接收后,要生成一个快照命令,应用该快照命令时会调用CondInstallSnapshot()函数进行处理;

代码

rpc参数和回复

1
2
3
4
5
6
7
8
9
10
11
12
13
type InstallSnapshotArgs struct {
Term int
LeaderId int
LastIncludedIndex int
LastIncludedTerm int
//Offset int
Data []byte
//Done bool
}

type InstallSnapshotReply struct {
Term int
}

startApplyLogs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//处理要应用的日志,快照的命令比较特殊,不在这里提交
func (rf *Raft) startApplyLogs() {
defer rf.applyTimer.Reset(ApplyInterval)

rf.mu.Lock()
var msgs []ApplyMsg
if rf.lastApplied < rf.lastSnapshotIndex {
//此时要安装快照,命令在接收到快照时就发布过了,等待处理
msgs = make([]ApplyMsg, 0)
rf.mu.Unlock()
//读取快照
rf.CondInstallSnapshot(rf.lastSnapshotTerm, rf.lastSnapshotIndex, rf.persister.snapshot)
return
} else if rf.commitIndex <= rf.lastApplied {
// snapShot 没有更新 commitidx
msgs = make([]ApplyMsg, 0)
} else {
msgs = make([]ApplyMsg, 0, rf.commitIndex-rf.lastApplied)
for i := rf.lastApplied + 1; i <= rf.commitIndex; i++ {
msgs = append(msgs, ApplyMsg{
CommandValid: true,
Command: rf.logs[rf.getStoreIndexByLogIndex(i)].Command,
CommandIndex: i,
})
}
}
rf.mu.Unlock()

for _, msg := range msgs {
rf.applyCh <- msg
rf.mu.Lock()
rf.lastApplied = msg.CommandIndex
rf.mu.Unlock()
}
}

这个定时任务中主要是定期处理已提交的日志放入applyCh中,加一个判断:如果发现了rf.lastApplied < rf.lastSnapshotIndex,就要进行一次快照读取。主要是应对crash后快照的读取问题,在raft初始化时可能也可以进行处理。

sendInstallSnapshotToPeer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
//向指定节点发送快照
func (rf *Raft) sendInstallSnapshotToPeer(server int) {
rf.mu.Lock()
args := InstallSnapshotArgs{
Term: rf.currentTerm,
LeaderId: rf.me,
LastIncludedIndex: rf.lastSnapshotIndex,
LastIncludedTerm: rf.lastSnapshotTerm,
Data: rf.persister.ReadSnapshot(),
}
rf.mu.Unlock()

//用于调用超时
timer := time.NewTimer(RPCTimeout)
defer timer.Stop()
DPrintf("%v role: %v, send snapshot to peer,%v,args = %+v,reply = %+v", rf.me, rf.role, server, args)

//发送rpc
for {
timer.Stop()
timer.Reset(RPCTimeout)

ch := make(chan bool, 1)
reply := &InstallSnapshotReply{}
go func() {
ok := rf.peers[server].Call("Raft.InstallSnapshot", &args, reply)
if !ok {
time.Sleep(time.Millisecond * 10)
}
ch <- ok
}()

select {
case <-rf.stopCh:
return
case <-timer.C:
DPrintf("%v role: %v, send snapshot to peer %v TIME OUT!!!", rf.me, rf.role, server)
continue
case ok := <-ch:
if !ok {
continue
}
}

//对结果进行处理
rf.mu.Lock()
defer rf.mu.Unlock()
if rf.role != Role_Leader || args.Term != rf.currentTerm {
return
}
if reply.Term > rf.currentTerm {
rf.changeRole(Role_Follower)
rf.currentTerm = reply.Term
rf.resetElectionTimer()
rf.persist()
return
}

if args.LastIncludedIndex > rf.matchIndex[server] {
rf.matchIndex[server] = args.LastIncludedIndex
}
if args.LastIncludedIndex+1 > rf.nextIndex[server] {
rf.nextIndex[server] = args.LastIncludedIndex + 1
}
return
}
}

大体流程分为三步:

  • 根据当前节点的状态生成InstallSnapshotArgs;
  • 在rpc超时时间以内,不断地向指定节点发送rpc,直到发送成功;
  • 根据发送获取的结果,进行判断,更新相应的数据结构,特别是matchIndex和nextindex。

InstallSnapshot

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (rf *Raft) InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) {
rf.mu.Lock()
defer rf.mu.Unlock()

reply.Term = rf.currentTerm
if rf.currentTerm > args.Term {
return
}

if args.Term > rf.currentTerm || rf.role != Role_Follower {
rf.changeRole(Role_Follower)
rf.votedFor = -1
rf.currentTerm = args.Term
rf.resetElectionTimer()
rf.persist()
}

//如果自身快照包含的最后一个日志>=leader快照包含的最后一个日志,就没必要接受了
if rf.lastSnapshotIndex >= args.LastIncludedIndex {
return
}

//接收发来的快照,并提交一个命令处理
rf.applyCh <- ApplyMsg{
SnapshotValid: true,
Snapshot: args.Data,
SnapshotTerm: args.LastIncludedTerm,
SnapshotIndex: args.LastIncludedIndex,
}
}

大概分为三步:

  • 当节点收到快照后,如果自身的term大于发送方的term,就拒绝;否者更新自身的term和role等信息;
  • 如果自身快照包含的最后一个日志>=leader快照包含的最后一个日志,就没必要接受了;
  • 否则,接收发来的快照,并提交一个命令处理。

Snapshot

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//生成一次快照,实现很简单,删除掉对应已经被压缩的 raft log 即可
//index是当前要压缩到的index,snapshot是已经帮我们压缩好的数据
func (rf *Raft) Snapshot(index int, snapshot []byte) {
// Your code here (2D).
rf.mu.Lock()
defer rf.mu.Unlock()
snapshotIndex := rf.lastSnapshotIndex
//如果当前快照包含的最后一条log的index≥参数中快照的index,则没必要进行压缩
if snapshotIndex >= index {
DPrintf("{Node %v} rejects replacing log with snapshotIndex %v as current snapshotIndex %v is larger in term %v", rf.me, index, snapshotIndex, rf.currentTerm)
return
}
oldLastSnapshotIndex := rf.lastSnapshotIndex
rf.lastSnapshotTerm = rf.logs[rf.getStoreIndexByLogIndex(index)].Term
rf.lastSnapshotIndex = index
//删掉index前的所有日志
rf.logs = rf.logs[index-oldLastSnapshotIndex:]
//0位置就是快照命令
rf.logs[0].Term = rf.lastSnapshotTerm
rf.logs[0].Command = nil
rf.persister.SaveStateAndSnapshot(rf.getPersistData(), snapshot)
DPrintf("{Node %v}'s state is {role %v,term %v,commitIndex %v,lastApplied %v} after replacing log with snapshotIndex %v as old snapshotIndex %v is smaller", rf.me, rf.role, rf.currentTerm, rf.commitIndex, rf.lastApplied, index, snapshotIndex)
}

CondInstallSnapshot

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (rf *Raft) CondInstallSnapshot(lastIncludedTerm int, lastIncludedIndex int, snapshot []byte) bool {
rf.mu.Lock()
defer rf.mu.Unlock()
// Your code here (2D).
//判断接收快照后,logs中还有没有多余的日志,并进行不同的处理
_, lastIndex := rf.getLastLogTermAndIndex()
if lastIncludedIndex > lastIndex {
rf.logs = make([]LogEntry, 1)
} else {
installLen := lastIncludedIndex - rf.lastSnapshotIndex
rf.logs = rf.logs[installLen:]
rf.logs[0].Command = nil
}
//0处是空日志,代表了快照日志的标记
rf.logs[0].Term = lastIncludedTerm

//其实接下来可以读入快照的数据进行同步,这里可以不写

rf.lastSnapshotIndex, rf.lastSnapshotTerm = lastIncludedIndex, lastIncludedTerm
rf.lastApplied, rf.commitIndex = lastIncludedIndex, lastIncludedIndex
//保存快照和状态
rf.persister.SaveStateAndSnapshot(rf.getPersistData(), snapshot)
return true
}

测试结果

-------------本文结束感谢您的阅读-------------
六经蕴籍胸中久,一剑十年磨在手

欢迎关注我的其它发布渠道