绿色健康小清新

耐得住寂寞,守得住繁华

MIT6.824-lab2C-持久化(persistence)

2c(持久化)

如果一个基于 Raft 的服务器重新启动,它应该从中断的地方恢复服务。这要求 Raft 在重启后保持持久状态。

一个真正的实现是将 Raft 的持久状态在每次更改时写入磁盘,并在重新启动后重新启动时从磁盘读取状态。 但是在我们的实现中,并不会真正的在磁盘中进行操作; 相反,我们将从 Persister 对象保存和恢复持久状态(请参阅 persister.go)。 Persister 最初保存了 Raft 最近持久化的状态(如果有的话)。 Raft 应该从那个 Persister 初始化它的状态,并且应该在每次状态改变时使用它来保存它的持久状态。 使用 Persister 的 ReadRaftState() 和 SaveRaftState() 方法。

任务

我们的任务就是通过添加代码来保存和恢复持久状态:

  1. 完成 raft.go 中的 persist()readPersist() 函数:readPersist(),当节点重启时,会重新读取状态恢复;persist(),将状态存储下来(实际工作中会写到硬盘,Lab里写入内存);
  2. 在状态改变的时候,调用persist()函数存储状态。

我们需要将状态编码(或“序列化”)为字节数组,以便将其传递给 Persister;使用labgob编码器; 具体的使用方法可以查看官方给出的persist() 和 readPersist() 中的注释。

Lab2C真正难的地方在于它严苛的测试,涉及到节点的反复选举、宕机与重新上线、状态恢复、网络断开、日志未提交等等问题。2A和2B中很难测出的错误很有可能在2C的测试里暴露出来。(我是深有体会呀)

代码

代码部分的其实很简单,首先来看下persist这个数据结构:

1
2
3
4
5
type Persister struct {
mu sync.Mutex
raftstate []byte //存储当前raft的状态
snapshot []byte //存储当前的快照
}

一共是三个属性:互斥锁、当前raft的状态、当前的快照。以这一个构造体来存储持久化数据,代替了实际上的磁盘IO。

主要就是persist()和readPersist()这两个函数;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
//获取持久化的数据
func (rf *Raft) getPersistData() []byte {
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
e.Encode(rf.currentTerm)
e.Encode(rf.votedFor)
e.Encode(rf.commitIndex)
e.Encode(rf.lastSnapshotTerm)
e.Encode(rf.lastSnapshotIndex)
e.Encode(rf.logs)
data := w.Bytes()
return data
}

// 保存持久化状态
func (rf *Raft) persist() {
data := rf.getPersistData()
rf.persister.SaveRaftState(data)
}

//读取持久化数据
func (rf *Raft) readPersist(data []byte) {
if data == nil || len(data) < 1 { // bootstrap without any state?
return
}
r := bytes.NewBuffer(data)
d := labgob.NewDecoder(r)

var (
currentTerm int
votedFor int
logs []LogEntry
commitIndex, lastSnapshotTerm, lastSnapshotIndex int
)

if d.Decode(&currentTerm) != nil || d.Decode(&votedFor) != nil || d.Decode(&commitIndex) != nil ||
d.Decode(&lastSnapshotIndex) != nil || d.Decode(&lastSnapshotTerm) != nil || d.Decode(&logs) != nil {
log.Fatal("rf read persist err!")
} else {
rf.currentTerm = currentTerm
rf.votedFor = votedFor
rf.commitIndex = commitIndex
rf.lastSnapshotIndex = lastSnapshotIndex
rf.lastSnapshotTerm = lastSnapshotTerm
rf.logs = logs
}
}

在我们持久化的状态任何一个改变的时候,我们就需要调用persist()来进行持久化,这里涉及之处很多,就不一一列出了,举几个例子:

(1)在接收到RequestVote rpc时,如果rcp的term大于当前节点的term,就要修改状态,并且如果要投票,也要修改一次状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
if rf.currentTerm < args.Term {
rf.currentTerm = args.Term
rf.changeRole(Role_Follower)
rf.votedFor = -1
reply.Term = rf.currentTerm
rf.persist() //持久化
}

//判断日志完整性
if lastLogTerm > args.LastLogTerm || (lastLogTerm == args.LastLogTerm && lastLogIndex > args.LastLogIndex) {
return
}

rf.votedFor = args.CandidateId
reply.VoteGranted = true
rf.changeRole(Role_Follower)
rf.resetElectionTimer()
rf.persist() //持久化
DPrintf("%v, role:%v,voteFor: %v", rf.me, rf.role, rf.votedFor)

(2)在发送AppendEntries rpc,收到回复,要修改回复节点的nextIndex、matchIndex,可能还要修改commitIndex,因此要进行持久化一次:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//响应:成功了,即:发送的数据全部接收了,或者根本没有数据
if reply.Success {
if reply.NextLogIndex > rf.nextIndex[peerId] {
rf.nextIndex[peerId] = reply.NextLogIndex
rf.matchIndex[peerId] = reply.NextLogIndex - 1
}
if len(args.Entries) > 0 && args.Entries[len(args.Entries)-1].Term == rf.currentTerm {
//每个leader只能提交自己任期的日志
rf.tryCommitLog()
}
rf.persist() //持久化
rf.mu.Unlock()
return
}

而在当前节点启动后,也要读取一次持久化的状态来进行恢复,具体的调用点并不固定,只需要在持久化相关属性初始化后就可以了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func Make(peers []*labrpc.ClientEnd, me int,
persister *Persister, applyCh chan ApplyMsg) *Raft {
...
rf.commitIndex = 0
rf.lastApplied = 0
rf.nextIndex = make([]int, len(rf.peers))
rf.matchIndex = make([]int, len(rf.peers))
//读取持久化数据
rf.readPersist(persister.ReadRaftState())

rf.electionTimer = time.NewTimer(rf.getElectionTimeout())
rf.appendEntriesTimers = make([]*time.Timer, len(rf.peers))
...
}

测试结果

-------------本文结束感谢您的阅读-------------
六经蕴籍胸中久,一剑十年磨在手

欢迎关注我的其它发布渠道