etcd中raft协议的消息(三) ——MspApp和MsgAppResp消息
MsgApp消息
上小结我们看到节点发起选举的相关消息,当候选人通过选举成为新的Leader后,首先会调用becomeLeader方法初始化相关状态,然后会调用bcastAppend方法,该方法主要向集群中其他节点广播MsgApp消息。消息的简略流程如下:
becomeLeader方法的主要功能:
作用:节点赢得选举变成Leader节点
1.检测当前节点如果是Follower则panic
2.将step设置成stepLeader
3.重置raft实例的Term、心跳计时器、选举计时器、选举超时时间等字段
4.将Leader设置成当前节点的id
5.更新当前节点的角色
6.保守地将pendingConfIndex设置为日志中的最后一个索引
7.向当前节点的raftLog中追加一条空的Entry记录
func (r *raft) becomeLeader() {
// TODO(xiangli) remove the panic when the raft implementation is stable
//检测当前节点的状态,禁止从Follower状态切换成Leader状态
if r.state == StateFollower {
panic("invalid transition [follower -> leader]")
}
//将step字段设置成stepLeader
r.step = stepLeader
r.reset(r.Term) //重置raft实例的Term等字段
r.tick = r.tickHeartbeat //tick字段设置成tickHeartbeat方法
r.lead = r.id //将leader字段当前节点的id
r.state = StateLeader //更新当前节点的角色
ents, err := r.raftLog.entries(r.raftLog.committed+1, noLimit)
if err != nil {
r.logger.Panicf("unexpected error getting uncommitted entries (%v)", err)
}
// Conservatively set the pendingConfIndex to the last index in the
// log. There may or may not be a pending config change, but it's
// safe to delay any future proposals until we commit all our
// pending log entries, and scanning the entire tail of the log
// could be expensive.
/*
保守地将pendingConfIndex设置为日志中的最后一个索引。
可能有也可能没有挂起的配置更改,但是在我们提交所有挂起的日志条目之前延迟任何未来的提议是安全的,
并且扫描日志的整个尾部可能很昂贵。
*/
if len(ents) > 0 {
r.pendingConfIndex = ents[len(ents)-1].Index
}
//向当前节点的raftLog中追加一条空的Entry记录
r.appendEntry(pb.Entry{Data: nil})
r.logger.Infof("%x became leader at term %d", r.id, r.Term)
}
bcastAppend方法
该方法的主要作用是向集群中的其他节点发送MsgApp(或MsgSnap)消息,这里我们先忽略MsgSnap消息,后面会详细介绍。
func (r *raft) bcastAppend() {
r.forEachProgress(func(id uint64, _ *Progress) {
if id == r.id {
return
}
r.sendAppend(id)
})
}
sendAppend方法的处理流程
1.获取目标节点的Progress,如果是暂停状态,则返回 2.创建待发送的消息实体m,并设置目标节点的ID 3.获取Next-1对应的记录的Term值,获取需要发送的Entries 4.上述两次raftLog查找出现异常时,就会形成MsgSnap消息,将快照数据发送到指定节点。 5.否则,发送MsgSnap类型的消息 6.发送前面创建的MsgApp或MsgSnap类型的消息,raft.send()会设置MsgApp消息的Term值,并将其追加到raft.msgs中等待发送。
func (r *raft) sendAppend(to uint64) {
pr := r.getProgress(to) //获取目标节点的Progress
if pr.IsPaused() { //检测当前节点是否可以向目标节点发送消息
return
}
m := pb.Message{} //创建待发送的消息
m.To = to //设置目标节点的ID
term, errt := r.raftLog.term(pr.Next - 1) //pr.Next是下个待复制Entry的索引位置,获取Next索引对应的记录的Term值
ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize) //获取需要发送的Entry记录(ents)
if errt != nil || erre != nil { // 上述两次raftLog查找出现异常时,就会形成MsgSnap消息,将快照数据发送到指定节点。
/*
1.上述两次raftLog查找出现异常时(获取不到需要发送的Entry记录),就会形成MsgSnap消息,将快照数据发送到指定节点。
2.向该节点发送MsgSnap类型的消息
3.将目标Follower节点对应的Progress切换成ProgressStateSnapshot状态
*/
if !pr.RecentActive { //如果该节点已经不存活,则退出(RecentActive为true表示该节点存活)
r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
return
}
m.Type = pb.MsgSnap //将消息设置成MsgSnap,为后续发送快照做准备
snapshot, err := r.raftLog.snapshot() //获取快照数据
if err != nil { //异常检测,如果获取快照数据异常,则终止整个程序
if err == ErrSnapshotTemporarilyUnavailable {
r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to)
return
}
panic(err) // TODO(bdarnell)
}
if IsEmptySnap(snapshot) {
panic("need non-empty snapshot")
}
m.Snapshot = snapshot //设置MsgSnap消息的Snapshot字段
sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term //获取快照的相关信息
r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
pr.becomeSnapshot(sindex)//将目标Follower节点对应的Progress切换成ProgressStateSnapshot状态,其中会用Progress.PendingSnapshot字段记录快照数据信息
r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)
} else {
m.Type = pb.MsgApp //设置消息类型
m.Index = pr.Next - 1 //设置MsgApp消息的Index字段
m.LogTerm = term //设置MsgApp消息的LogTerm字段
m.Entries = ents //设置消息携带的Entry记录集合
//设置消息的Commit字段,即当前节点的raftLog中最后一条已提交的记录索引值
m.Commit = r.raftLog.committed
if n := len(m.Entries); n != 0 {
/*
Progress.State:
ProgressStateSnapshot状态:表示Leader节点正在向目标节点发送快照数据。
ProgressStateProbe状态:表示节点一次不能向目标节点发送多条消息,只能待一条消息被响应之后,才能发送下一条消息
ProgressStateReplicate状态:表示正常的Entry记录复制状态,Leader节点向目标节点发送完消息之后,无需等待响应,即可开始后续消息发送。
*/
switch pr.State {
// optimistically increase the next when in ProgressStateReplicate
case ProgressStateReplicate:
last := m.Entries[n-1].Index
pr.optimisticUpdate(last)
pr.ins.add(last)
case ProgressStateProbe:
pr.pause()
default:
r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
}
}
}
//发送前面创建的MsgApp消息,raft.send()会设置MsgApp消息的Term值,并将其追加到raft.msgs中等待发送。
r.send(m)
}
其他节点收到MsgApp消息后的处理流程
1)将当前节点的状态切换成Follower
当节点(无论是什么角色,包括上一届Leader,Follower,Candidate)收到Term比自己任期号大,并且消息类型是MsgApp、MsgHeartbeat、MsgSnap类型的消息都会调用becomeFollower(m.Term,m.From),都会将当前节点的状态切换成Follower,并进行相关状态的初始化。相关代码如下:
func (r *raft) Step(m pb.Message) error {
// Handle the message term, which may result in our stepping down to a follower.
switch { //首先根据消息的Term值进行分类处理
case m.Term == 0: //对本地消息并没有做什么处理
// local message
case m.Term > r.Term: //例如参与选举的Term值会比当前未参与的值大
if m.Type == pb.MsgVote || m.Type == pb.MsgPreVote { //处理MsgVote和MsgPreVote
//根据消息的Context字段判断收到的MsgPreVote(或MsgVote)消息是否为Leader
//节点转移场景下产生的,如果是,则强制当前节点参与本次预选(或选举)
force := bytes.Equal(m.Context, []byte(campaignTransfer))
//检测集群是否开启CheckQuorum模式,当前节点是否有已知的Lead节点,以及其选举计时器的时间
inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeout
if !force && inLease { //满足此条件,该节点不参与此次选举
// If a server receives a RequestVote request within the minimum election timeout
// of hearing from a current leader, it does not update its term or grant its vote
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: lease is not expired (remaining ticks: %d)",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term, r.electionTimeout-r.electionElapsed)
return nil
}
}
switch {//在这个switch中,当前节点会根据消息类型决定是否切换状态
case m.Type == pb.MsgPreVote: //收到MsgPreVote消息时,不会引起当前节点的状态切换
// Never change our term in response to a PreVote
case m.Type == pb.MsgPreVoteResp && !m.Reject:
// We send pre-vote requests with a term in our future. If the
// pre-vote is granted, we will increment our term when we get a
// quorum. If it is not, the term comes from the node that
// rejected our vote so we should become a follower at the new
// term.
default:
r.logger.Infof("%x [term: %d] received a %s message with higher term from %x [term: %d]",
r.id, r.Term, m.Type, m.From, m.Term)
if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {
r.becomeFollower(m.Term, m.From)
} else {
r.becomeFollower(m.Term, None)
}
}
}
becomeFollower方法的主要流程
1.将step(行为)切换成 stepFollower
2.重置当前raft r的相关状态
3.切换tick(作用推动逻辑时钟)
4.重置leader
5.重置leader
6.设置当前raft的状态为StateFollower
func (r *raft) becomeFollower(term uint64, lead uint64) {
r.step = stepFollower
r.reset(term)
r.tick = r.tickElection
r.lead = lead
r.state = StateFollower
r.logger.Infof("%x became follower at term %d", r.id, r.Term)
}
reset方法进行相关状态的初始化
1.将raft的Term设置为term,Vote置为空
2.清空leader字段
3.重置选举计时器和心跳计时器,并重置选举计时器electionElapsed
4.重置votes字段,votes是收到的选票
5.重置prs和learners
6.重置pendingConfIndex
6.重置readOnly 只读请求相关的设置
func (r *raft) reset(term uint64) {
if r.Term != term { //重置term和Vote字段
r.Term = term
r.Vote = None
}
r.lead = None //清空lead字段
r.electionElapsed = 0 //重置选举计时器和心跳计时器
r.heartbeatElapsed = 0
r.resetRandomizedElectionTimeout() //重置选举计时器的过期时间 electionElapsed到2*electionElapsed-1之间
r.abortLeaderTransfer() //清空leadTransferee
r.votes = make(map[uint64]bool) //重置votes字段
r.forEachProgress(func(id uint64, pr *Progress) { //重置prs
*pr = Progress{Next: r.raftLog.lastIndex() + 1, ins: newInflights(r.maxInflight), IsLearner: pr.IsLearner}
if id == r.id {
pr.Match = r.raftLog.lastIndex()
}
})
r.pendingConfIndex = 0 //重置pendingConfIndex
r.readOnly = newReadOnly(r.readOnly.option) //重置只读请求的相关设置
}
2)Follower处理MsgApp消息
1.重置选举计时器
2.保存当前的Leader节点ID
3.调用handleAppendEntries(m)方法 将MsgApp消息中携带的Entry记录追加到raftLog中,并且向Leader节点发送MsgAppResp消息,响应此次MsgApp消息
func stepFollower(r *raft, m pb.Message) error {
//其他类型消息的处理略
......
case pb.MsgApp:
r.electionElapsed = 0 //重置选举计时器,防止当前Follower发起新一轮选举
r.lead = m.From //保存当前的Leader节点ID
r.handleAppendEntries(m) //将MsgApp消息中携带的Entry记录追加到raftLog中,并且向Leader节点发送MsgAppResp消息,响应此次MsgApp消息
}
调用handleAppendEntries方法尝试将消息追加到当前节点的日志中
首先检测MsgApp消息中携带的Entry记录是否合法,然后将这些Entry记录追加到raftLog中,最后创建相应的MsgAppResp消息
可能有以下三种情况
1.消息的索引值小于当前节点已提交的值,则返回MsgAppResp消息类型,并返回已提交的位置
2.如果消息追加成功,则返回MsgAppResp消息类型,并返回最后一条记录的索引值
3.如果追加失败,则Reject设为true,并返回raftLog中最后一条记录的索引
func (r *raft) handleAppendEntries(m pb.Message) {
//m.Index表示leader发送给follower的上一条日志的索引位置
//如果Follower节点在Index位置的Entry记录已经提交过了
//则不能进行追加操作,已提交的记录不能被覆盖
//Follower会将其提交位置committed通过MsgAppResp消息通知Leader节点
if m.Index < r.raftLog.committed {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
return
}
//尝试将携带的Entry记录追加到raftLog中
if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {//如果追加成功,则将最后一条记录的索引值通过MsgAppResp消息返回Leader,这样Leader就可以根据其对应的Next和Match值
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
} else {//如果追加失败,则会Reject:true 拒绝此次追加Entry的消息,并返回raftLog中最后一条记录的索引。
r.logger.Debugf("%x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x",
r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})
}
}
MsgAppResp消息
当Follower节点收到MsgApp消息进行相关的处理,最后会给Leader响应MsgAppResp消息,接下来我们看一下Leader是如何处理MsgAppResp消息的。
Progress的三个状态
ProgressStateProbe ProgressStateType = iota //表示Leader节点一次不能向目标节点发送多条消息,只能待一次消息被响应之后,才能发送下一条消息。 当刚刚复制完快照数据、上次MsgApp消息被拒绝(或发送失败)或者Leader节点初始化时,都会导致目标节点的Progress切换成该状态
ProgressStateReplicate //正常的Entry记录复制状态,Leader节点向目标节点发送完消息之后,无需等待响应,即可开始后续消息的发送
ProgressStateSnapshot //表示Leader节点正在向目标节点发送快照数据
Leader处理MsgAppResp消息在stepLeader方法中
处理其他节点返回的MsgAppResp类型消息
1.将pr的RecentActive设置为true,表明该节点存活
2.如果该节点拒绝了日志追加,根据RejectHint重新设置其Next值,并将该节点的状态设置成ProgressStateProbe
3.再次向Follower节点发送消息(调整Next)
4.如果追加成功
尝试更新该节点的Match和Next值,如果更新成功则会设置State的相关状态
尝试更新committed,如果超过半数的节点成功复制了消息,则更新Leader的commit值,并发送给集群中的其他消息
如果不能更新committed并且当前节点之前是暂停状态,在上述代码已经重置了相应状态,则可以继续发送MsgApp消息
//用于处理Leader接受到的消息
func stepLeader(r *raft, m pb.Message) error {
switch m.Type {
case pb.MsgAppResp:
pr.RecentActive = true //将RecentActive设置成true,表明该节点可以收发消息(存活)
if m.Reject { //MsgApp消息被拒绝
r.logger.Debugf("%x received msgApp rejection(lastindex: %d) from %x for index %d",
r.id, m.RejectHint, m.From, m.Index)
//通过MsgAppResp消息携带的信息及对应的Progress状态,
if pr.maybeDecrTo(m.Index, m.RejectHint) {
r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
if pr.State == ProgressStateReplicate {
pr.becomeProbe()
}
r.sendAppend(m.From) //再次向Follower节点发送消息(调整Next)
}
} else { //消息接受成功(Entry记录被成功追加)
oldPaused := pr.IsPaused()
if pr.maybeUpdate(m.Index) { //更新对应的Match和Next值
switch {
case pr.State == ProgressStateProbe:
pr.becomeReplicate()
case pr.State == ProgressStateSnapshot && pr.needSnapshotAbort():
r.logger.Debugf("%x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
pr.becomeProbe()
case pr.State == ProgressStateReplicate:
pr.ins.freeTo(m.Index) //清空指定索引之前的消息
}
if r.maybeCommit() { //尝试更新committed,因为有些Entry记录可能在此次复制中被保存到了半数以上的节点中
//向所有节点发送MsgApp消息,此次的Commit字段与上次MsgApp消息不同
r.bcastAppend()
} else if oldPaused { //之前是pause状态,现在可以发送消息
// update() reset the wait state on this node. If we had delayed sending
// an update before, send it now.
r.sendAppend(m.From)
}
//当收到目标Follower节点的MsgAppResp消息并且两者的raftLog完全匹配,则发送MsgTimeoutNow消息
// Transfer leadership is in progress.
if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() {
r.logger.Infof("%x sent MsgTimeoutNow to %x after received MsgAppResp", r.id, m.From)
r.sendTimeoutNow(m.From)
}
}
}
}
}