Skip to content

Commit

Permalink
raft: persist Lead into the HardState
Browse files Browse the repository at this point in the history
Previously, leader information for a given term was stored as volatile
state in raft. This change persists it durably by moving it to
HardState. This is done in service of leader leases, where we want to
ensure a follower doesn't violate its promise of support to the leader
even after a restart.

Fixes #124421

Release note: None
  • Loading branch information
arulajmani committed Jul 17, 2024
1 parent 84b9dd2 commit e60feee
Show file tree
Hide file tree
Showing 43 changed files with 466 additions and 383 deletions.
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/below_raft_protos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func TestBelowRaftProtosDontChange(t *testing.T) {
Term uint64
Vote raftpb.PeerID
Commit uint64
Lead raftpb.PeerID
}
// Conversion fails if new fields are added to `HardState`, in which case this method
// and the expected sums should be updated.
Expand All @@ -64,6 +65,7 @@ func TestBelowRaftProtosDontChange(t *testing.T) {
Term: n % 3,
Vote: raftpb.PeerID(n % 7),
Commit: n % 11,
Lead: raftpb.PeerID(n % 13),
}
},
func(r *rand.Rand) protoutil.Message {
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/logstore/logstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ func (s *LogStore) storeEntriesAndCommitBatch(
Term: m.Term,
Vote: m.Vote,
Commit: m.Commit,
Lead: m.Lead,
}
if !raft.IsEmptyHardState(hs) {
// NB: Note that without additional safeguards, it's incorrect to write
Expand Down
9 changes: 8 additions & 1 deletion pkg/kv/kvserver/logstore/stateloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,16 +174,23 @@ func (sl StateLoader) SynthesizeHardState(
return errors.Newf("can't decrease HardState.Commit from %d to %d",
redact.Safe(oldHS.Commit), redact.Safe(newHS.Commit))
}

// TODO(arul): This function can be called with an empty OldHS. In all other
// cases, where a term is included, we should be able to assert that the term
// isn't regressing (i.e. oldHS.Term >= newHS.Term).

if oldHS.Term > newHS.Term {
// The existing HardState is allowed to be ahead of us, which is
// relevant in practice for the split trigger. We already checked above
// that we're not rewinding the acknowledged index, and we haven't
// updated votes yet.
newHS.Term = oldHS.Term
}
// If the existing HardState voted in this term, remember that.
// If the existing HardState voted in this term and knows who the leader is,
// remember that.
if oldHS.Term == newHS.Term {
newHS.Vote = oldHS.Vote
newHS.Lead = oldHS.Lead
}
err := sl.SetHardState(ctx, readWriter, newHS)
return errors.Wrapf(err, "writing HardState %+v", &newHS)
Expand Down
52 changes: 24 additions & 28 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,6 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
}

var hasReady bool
var softState *raft.SoftState
var outboundMsgs []raftpb.Message
var msgStorageAppend, msgStorageApply raftpb.Message
r.mu.Lock()
Expand Down Expand Up @@ -836,7 +835,6 @@ func (r *Replica) handleRaftReadyRaftMuLocked(

logRaftReady(ctx, syncRd)
asyncRd := makeAsyncReady(syncRd)
softState = asyncRd.SoftState
outboundMsgs, msgStorageAppend, msgStorageApply = splitLocalStorageMsgs(asyncRd.Messages)
}
// We unquiesce if we have a Ready (= there's work to do). We also have
Expand Down Expand Up @@ -876,24 +874,6 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
return stats, nil
}

refreshReason := noReason
if softState != nil && leaderID != roachpb.ReplicaID(softState.Lead) {
// Refresh pending commands if the Raft leader has changed. This is usually
// the first indication we have of a new leader on a restarted node.
//
// TODO(peter): Re-proposing commands when SoftState.Lead changes can lead
// to wasteful multiple-reproposals when we later see an empty Raft command
// indicating a newly elected leader or a conf change. Replay protection
// prevents any corruption, so the waste is only a performance issue.
if log.V(3) {
log.Infof(ctx, "raft leader changed: %d -> %d", leaderID, softState.Lead)
}
if !r.store.TestingKnobs().DisableRefreshReasonNewLeader {
refreshReason = reasonNewLeader
}
leaderID = roachpb.ReplicaID(softState.Lead)
}

r.traceMessageSends(outboundMsgs, "sending messages")
r.sendRaftMessages(ctx, outboundMsgs, pausedFollowers, true /* willDeliverLocal */)

Expand Down Expand Up @@ -941,7 +921,28 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
}
}

refreshReason := noReason
if hasMsg(msgStorageAppend) {
// Leadership changes, if any, are communicated through MsgStorageAppends.
// Check if that's the case here.
if msgStorageAppend.Lead != raft.None && leaderID != roachpb.ReplicaID(msgStorageAppend.Lead) {
// Refresh pending commands if the Raft leader has changed. This is
// usually the first indication we have of a new leader on a restarted
// node.
//
// TODO(peter): Re-proposing commands when SoftState.Lead changes can lead
// to wasteful multiple-reproposals when we later see an empty Raft command
// indicating a newly elected leader or a conf change. Replay protection
// prevents any corruption, so the waste is only a performance issue.
if log.V(3) {
log.Infof(ctx, "raft leader changed: %d -> %d", leaderID, msgStorageAppend.Lead)
}
if !r.store.TestingKnobs().DisableRefreshReasonNewLeader {
refreshReason = reasonNewLeader
}
leaderID = roachpb.ReplicaID(msgStorageAppend.Lead)
}

if msgStorageAppend.Snapshot != nil {
if inSnap.Desc == nil {
// If we didn't expect Raft to have a snapshot but it has one
Expand Down Expand Up @@ -969,6 +970,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
Term: msgStorageAppend.Term,
Vote: msgStorageAppend.Vote,
Commit: msgStorageAppend.Commit,
Lead: msgStorageAppend.Lead,
}
if len(msgStorageAppend.Entries) != 0 {
log.Fatalf(ctx, "found Entries in MsgStorageAppend with non-empty Snapshot")
Expand Down Expand Up @@ -1204,11 +1206,6 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
// All fields in asyncReady are read-only.
// TODO(nvanbenschoten): move this into go.etcd.io/raft.
type asyncReady struct {
// The current volatile state of a Node.
// SoftState will be nil if there is no update.
// It is not required to consume or store SoftState.
*raft.SoftState

// Messages specifies outbound messages to other peers and to local storage
// threads. These messages can be sent in any order.
//
Expand All @@ -1220,8 +1217,7 @@ type asyncReady struct {
// makeAsyncReady constructs an asyncReady from the provided Ready.
func makeAsyncReady(rd raft.Ready) asyncReady {
return asyncReady{
SoftState: rd.SoftState,
Messages: rd.Messages,
Messages: rd.Messages,
}
}

Expand Down Expand Up @@ -2023,7 +2019,7 @@ func (r *Replica) hasOutstandingSnapshotInFlightToStore(

// HasRaftLeader returns true if the raft group has a raft leader currently.
func HasRaftLeader(raftStatus *raft.Status) bool {
return raftStatus != nil && raftStatus.SoftState.Lead != 0
return raftStatus != nil && raftStatus.HardState.Lead != 0
}

// pendingCmdSlice sorts by increasing MaxLeaseIndex.
Expand Down
22 changes: 13 additions & 9 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ func upToDateRaftStatus(repls []roachpb.ReplicaDescriptor) *raft.Status {
}
return &raft.Status{
BasicStatus: raft.BasicStatus{
HardState: raftpb.HardState{Commit: 100},
SoftState: raft.SoftState{Lead: 1, RaftState: raft.StateLeader},
HardState: raftpb.HardState{Commit: 100, Lead: 1},
SoftState: raft.SoftState{RaftState: raft.StateLeader},
},
Progress: prs,
}
Expand Down Expand Up @@ -8918,7 +8918,7 @@ func TestReplicaMetrics(t *testing.T) {
} else {
status.SoftState.RaftState = raft.StateFollower
}
status.SoftState.Lead = lead
status.HardState.Lead = lead
return status
}
desc := func(ids ...int) roachpb.RangeDescriptor {
Expand Down Expand Up @@ -10117,9 +10117,7 @@ func TestFollowerQuiesceOnNotify(t *testing.T) {
HardState: raftpb.HardState{
Term: 5,
Commit: 10,
},
SoftState: raft.SoftState{
Lead: 1,
Lead: 1,
},
},
},
Expand Down Expand Up @@ -11481,7 +11479,9 @@ func TestReplicaShouldCampaignOnWake(t *testing.T) {
raftStatus: raft.BasicStatus{
SoftState: raft.SoftState{
RaftState: raft.StateFollower,
Lead: 2,
},
HardState: raftpb.HardState{
Lead: 2,
},
},
livenessMap: livenesspb.IsLiveMap{
Expand Down Expand Up @@ -11599,7 +11599,9 @@ func TestReplicaShouldCampaignOnLeaseRequestRedirect(t *testing.T) {
raftStatus: raft.BasicStatus{
SoftState: raft.SoftState{
RaftState: raft.StateFollower,
Lead: 2,
},
HardState: raftpb.HardState{
Lead: 2,
},
},
livenessMap: livenesspb.IsLiveMap{
Expand Down Expand Up @@ -11710,7 +11712,9 @@ func TestReplicaShouldForgetLeaderOnVoteRequest(t *testing.T) {
raftStatus: raft.BasicStatus{
SoftState: raft.SoftState{
RaftState: raft.StateFollower,
Lead: 2,
},
HardState: raftpb.HardState{
Lead: 2,
},
},
livenessMap: livenesspb.IsLiveMap{
Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/stateloader/initial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestSynthesizeHardState(t *testing.T) {
eng := storage.NewDefaultInMemForTesting()
stopper.AddCloser(eng)

tHS := raftpb.HardState{Term: 2, Vote: 3, Commit: 4}
tHS := raftpb.HardState{Term: 2, Vote: 3, Commit: 4, Lead: 5}

testCases := []struct {
TruncTerm kvpb.RaftTerm
Expand All @@ -45,10 +45,10 @@ func TestSynthesizeHardState(t *testing.T) {
// Can't wind back the committed index of the new HardState.
{OldHS: &tHS, RaftAppliedIndex: kvpb.RaftIndex(tHS.Commit - 1), Err: "can't decrease HardState.Commit"},
{OldHS: &tHS, RaftAppliedIndex: kvpb.RaftIndex(tHS.Commit), NewHS: tHS},
{OldHS: &tHS, RaftAppliedIndex: kvpb.RaftIndex(tHS.Commit + 1), NewHS: raftpb.HardState{Term: tHS.Term, Vote: 3, Commit: tHS.Commit + 1}},
// Higher Term is picked up, but vote isn't carried over when the term
// changes.
{OldHS: &tHS, RaftAppliedIndex: kvpb.RaftIndex(tHS.Commit), TruncTerm: 11, NewHS: raftpb.HardState{Term: 11, Vote: 0, Commit: tHS.Commit}},
{OldHS: &tHS, RaftAppliedIndex: kvpb.RaftIndex(tHS.Commit + 1), NewHS: raftpb.HardState{Term: tHS.Term, Vote: 3, Commit: tHS.Commit + 1, Lead: 5}},
// Higher Term is picked up, but Vote and Lead aren't carried over when the
// term changes.
{OldHS: &tHS, RaftAppliedIndex: kvpb.RaftIndex(tHS.Commit), TruncTerm: 11, NewHS: raftpb.HardState{Term: 11, Vote: 0, Commit: tHS.Commit, Lead: 0}},
}

for i, test := range testCases {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
echo
----
5484201021249543052
2027450491469602084
5 changes: 2 additions & 3 deletions pkg/raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,11 @@ var (
// SoftState provides state that is useful for logging and debugging.
// The state is volatile and does not need to be persisted to the WAL.
type SoftState struct {
Lead pb.PeerID // must use atomic operations to access; keep 64-bit aligned.
RaftState StateType
}

func (a *SoftState) equal(b *SoftState) bool {
return a.Lead == b.Lead && a.RaftState == b.RaftState
return a.RaftState == b.RaftState
}

// Ready encapsulates the entries and messages that are ready to read,
Expand Down Expand Up @@ -112,7 +111,7 @@ type Ready struct {
}

func isHardStateEqual(a, b pb.HardState) bool {
return a.Term == b.Term && a.Vote == b.Vote && a.Commit == b.Commit
return a.Term == b.Term && a.Vote == b.Vote && a.Commit == b.Commit && a.Lead == b.Lead
}

// IsEmptyHardState returns true if the given HardState is empty.
Expand Down
14 changes: 7 additions & 7 deletions pkg/raft/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func TestNodePropose(t *testing.T) {
rd := <-n.Ready()
s.Append(rd.Entries)
// change the step function to appendStep until this raft becomes leader
if rd.SoftState.Lead == r.id {
if rd.HardState.Lead == r.id {
r.step = appendStep
n.Advance()
break
Expand Down Expand Up @@ -215,7 +215,7 @@ func TestNodeProposeConfig(t *testing.T) {
rd := <-n.Ready()
s.Append(rd.Entries)
// change the step function to appendStep until this raft becomes leader
if rd.SoftState.Lead == r.id {
if rd.HardState.Lead == r.id {
r.step = appendStep
n.Advance()
break
Expand Down Expand Up @@ -366,7 +366,7 @@ func TestNodeProposeWaitDropped(t *testing.T) {
rd := <-n.Ready()
s.Append(rd.Entries)
// change the step function to dropStep until this raft becomes leader
if rd.SoftState.Lead == r.id {
if rd.HardState.Lead == r.id {
r.step = dropStep
n.Advance()
break
Expand Down Expand Up @@ -442,7 +442,7 @@ func TestNodeStart(t *testing.T) {
require.NoError(t, err)
wants := []Ready{
{
HardState: raftpb.HardState{Term: 1, Commit: 1, Vote: 0},
HardState: raftpb.HardState{Term: 1, Commit: 1, Vote: 0, Lead: 0},
Entries: []raftpb.Entry{
{Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata},
},
Expand All @@ -452,13 +452,13 @@ func TestNodeStart(t *testing.T) {
MustSync: true,
},
{
HardState: raftpb.HardState{Term: 2, Commit: 2, Vote: 1},
HardState: raftpb.HardState{Term: 2, Commit: 2, Vote: 1, Lead: 1},
Entries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
CommittedEntries: []raftpb.Entry{{Term: 2, Index: 2, Data: nil}},
MustSync: true,
},
{
HardState: raftpb.HardState{Term: 2, Commit: 3, Vote: 1},
HardState: raftpb.HardState{Term: 2, Commit: 3, Vote: 1, Lead: 1},
Entries: nil,
CommittedEntries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
MustSync: false,
Expand Down Expand Up @@ -647,7 +647,6 @@ func TestSoftStateEqual(t *testing.T) {
we bool
}{
{&SoftState{}, true},
{&SoftState{Lead: 1}, false},
{&SoftState{RaftState: StateLeader}, false},
}
for i, tt := range tests {
Expand All @@ -664,6 +663,7 @@ func TestIsHardStateEqual(t *testing.T) {
{raftpb.HardState{Vote: 1}, false},
{raftpb.HardState{Commit: 1}, false},
{raftpb.HardState{Term: 1}, false},
{raftpb.HardState{Lead: 1}, false},
}

for i, tt := range tests {
Expand Down
Loading

0 comments on commit e60feee

Please sign in to comment.