diff --git a/raft.go b/raft.go index 5a150562..de58f014 100644 --- a/raft.go +++ b/raft.go @@ -375,6 +375,11 @@ type raft struct { // the leader id lead uint64 + // logSynced is true if this node's log is guaranteed to be a prefix of the + // leader's log at this term. Always true for the leader. Always false for a + // candidate. For a follower, becomes true the first time a MsgApp append to + // the log succeeds. + logSynced bool // leadTransferee is id of the leader transfer target when its value is not zero. // Follow the procedure defined in raft thesis 3.10. leadTransferee uint64 @@ -763,6 +768,7 @@ func (r *raft) reset(term uint64) { r.Vote = None } r.lead = None + r.logSynced = false r.electionElapsed = 0 r.heartbeatElapsed = 0 @@ -908,6 +914,7 @@ func (r *raft) becomeLeader() { r.reset(r.Term) r.tick = r.tickHeartbeat r.lead = r.id + r.logSynced = true // the leader's log is in sync with itself r.state = StateLeader // Followers enter replicate mode when they've been successfully probed // (perhaps after having received a snapshot as a result). The leader is @@ -1735,6 +1742,7 @@ func (r *raft) handleAppendEntries(m pb.Message) { return } if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok { + r.logSynced = true // from now on, the log is a prefix of the leader's log r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex}) return } @@ -1770,7 +1778,11 @@ func (r *raft) handleAppendEntries(m pb.Message) { } func (r *raft) handleHeartbeat(m pb.Message) { - r.raftLog.commitTo(m.Commit) + // If our log is not a prefix of the leader's log, it is unsafe to advance the + // commit index, because the entries at this index may mismatch. + if r.logSynced { + r.raftLog.commitTo(min(m.Commit, r.raftLog.lastIndex())) + } r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context}) }