Skip to content

Commit

Permalink
raft: persist LeadEpoch in HardState
Browse files Browse the repository at this point in the history
Closes cockroachdb#124497

Release note: None
  • Loading branch information
arulajmani committed Jul 17, 2024
1 parent 1e96b20 commit 9288cbf
Show file tree
Hide file tree
Showing 14 changed files with 66 additions and 39 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ go_test(
"//pkg/raft",
"//pkg/raft/confchange",
"//pkg/raft/raftpb",
"//pkg/raft/raftstoreliveness",
"//pkg/raft/tracker",
"//pkg/roachpb",
"//pkg/rpc",
Expand Down
19 changes: 11 additions & 8 deletions pkg/kv/kvserver/below_raft_protos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/raft/raftstoreliveness"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/testutils/datapathutils"
Expand Down Expand Up @@ -51,21 +52,23 @@ func TestBelowRaftProtosDontChange(t *testing.T) {
},
func(r *rand.Rand) protoutil.Message {
type expectedHardState struct {
Term uint64
Vote raftpb.PeerID
Commit uint64
Lead raftpb.PeerID
Term uint64
Vote raftpb.PeerID
Commit uint64
Lead raftpb.PeerID
LeadEpoch raftstoreliveness.StoreLivenessEpoch
}
// Conversion fails if new fields are added to `HardState`, in which case this method
// and the expected sums should be updated.
var _ = expectedHardState(raftpb.HardState{})

n := r.Uint64()
return &raftpb.HardState{
Term: n % 3,
Vote: raftpb.PeerID(n % 7),
Commit: n % 11,
Lead: raftpb.PeerID(n % 13),
Term: n % 3,
Vote: raftpb.PeerID(n % 7),
Commit: n % 11,
Lead: raftpb.PeerID(n % 13),
LeadEpoch: raftstoreliveness.StoreLivenessEpoch(n % 17),
}
},
func(r *rand.Rand) protoutil.Message {
Expand Down
9 changes: 5 additions & 4 deletions pkg/kv/kvserver/logstore/logstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,11 @@ func (s *LogStore) storeEntriesAndCommitBatch(
}

hs := raftpb.HardState{
Term: m.Term,
Vote: m.Vote,
Commit: m.Commit,
Lead: m.Lead,
Term: m.Term,
Vote: m.Vote,
Commit: m.Commit,
Lead: m.Lead,
LeadEpoch: m.LeadEpoch,
}
if !raft.IsEmptyHardState(hs) {
// NB: Note that without additional safeguards, it's incorrect to write
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/logstore/stateloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ func (sl StateLoader) SynthesizeHardState(
if oldHS.Term == newHS.Term {
newHS.Vote = oldHS.Vote
newHS.Lead = oldHS.Lead
newHS.LeadEpoch = oldHS.LeadEpoch
}
err := sl.SetHardState(ctx, readWriter, newHS)
return errors.Wrapf(err, "writing HardState %+v", &newHS)
Expand Down
9 changes: 5 additions & 4 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -967,10 +967,11 @@ func (r *Replica) handleRaftReadyRaftMuLocked(

snap := *msgStorageAppend.Snapshot
hs := raftpb.HardState{
Term: msgStorageAppend.Term,
Vote: msgStorageAppend.Vote,
Commit: msgStorageAppend.Commit,
Lead: msgStorageAppend.Lead,
Term: msgStorageAppend.Term,
Vote: msgStorageAppend.Vote,
Commit: msgStorageAppend.Commit,
Lead: msgStorageAppend.Lead,
LeadEpoch: msgStorageAppend.LeadEpoch,
}
if len(msgStorageAppend.Entries) != 0 {
log.Fatalf(ctx, "found Entries in MsgStorageAppend with non-empty Snapshot")
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/stateloader/initial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestSynthesizeHardState(t *testing.T) {
eng := storage.NewDefaultInMemForTesting()
stopper.AddCloser(eng)

tHS := raftpb.HardState{Term: 2, Vote: 3, Commit: 4, Lead: 5}
tHS := raftpb.HardState{Term: 2, Vote: 3, Commit: 4, Lead: 5, LeadEpoch: 6}

testCases := []struct {
TruncTerm kvpb.RaftTerm
Expand All @@ -45,9 +45,9 @@ func TestSynthesizeHardState(t *testing.T) {
// Can't wind back the committed index of the new HardState.
{OldHS: &tHS, RaftAppliedIndex: kvpb.RaftIndex(tHS.Commit - 1), Err: "can't decrease HardState.Commit"},
{OldHS: &tHS, RaftAppliedIndex: kvpb.RaftIndex(tHS.Commit), NewHS: tHS},
{OldHS: &tHS, RaftAppliedIndex: kvpb.RaftIndex(tHS.Commit + 1), NewHS: raftpb.HardState{Term: tHS.Term, Vote: 3, Commit: tHS.Commit + 1, Lead: 5}},
// Higher Term is picked up, but Vote and Lead aren't carried over when the
// term changes.
{OldHS: &tHS, RaftAppliedIndex: kvpb.RaftIndex(tHS.Commit + 1), NewHS: raftpb.HardState{Term: tHS.Term, Vote: 3, Commit: tHS.Commit + 1, Lead: 5, LeadEpoch: 6}},
// Higher Term is picked up, but Vote, Lead, and LeadEpoch aren't carried
// over when the term changes.
{OldHS: &tHS, RaftAppliedIndex: kvpb.RaftIndex(tHS.Commit), TruncTerm: 11, NewHS: raftpb.HardState{Term: 11, Vote: 0, Commit: tHS.Commit, Lead: 0}},
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
echo
----
2027450491469602084
9497879862544146727
1 change: 1 addition & 0 deletions pkg/raft/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ go_test(
deps = [
"//pkg/raft/quorum",
"//pkg/raft/raftpb",
"//pkg/raft/raftstoreliveness",
"//pkg/raft/rafttest",
"//pkg/raft/tracker",
"@com_github_cockroachdb_datadriven//:datadriven",
Expand Down
12 changes: 8 additions & 4 deletions pkg/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,8 @@ type raft struct {

// the leader id
lead pb.PeerID
// TODO(arul): This should be populated when responding to a MsgFortify.
leadEpoch raftstoreliveness.StoreLivenessEpoch
// leadTransferee is id of the leader transfer target when its value is not zero.
// Follow the procedure defined in raft thesis 3.10.
leadTransferee pb.PeerID
Expand Down Expand Up @@ -462,10 +464,11 @@ func (r *raft) softState() SoftState { return SoftState{RaftState: r.state} }

func (r *raft) hardState() pb.HardState {
return pb.HardState{
Term: r.Term,
Vote: r.Vote,
Commit: r.raftLog.committed,
Lead: r.lead,
Term: r.Term,
Vote: r.Vote,
Commit: r.raftLog.committed,
Lead: r.lead,
LeadEpoch: r.leadEpoch,
}
}

Expand Down Expand Up @@ -2010,6 +2013,7 @@ func (r *raft) loadState(state pb.HardState) {
r.Term = state.Term
r.Vote = state.Vote
r.lead = state.Lead
r.leadEpoch = state.LeadEpoch
}

// pastElectionTimeout returns true if r.electionElapsed is greater
Expand Down
5 changes: 4 additions & 1 deletion pkg/raft/raftpb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ go_proto_library(
importpath = "github.com/cockroachdb/cockroach/pkg/raft/raftpb",
proto = ":raftpb_proto",
visibility = ["//visibility:public"],
deps = ["@com_github_gogo_protobuf//gogoproto"],
deps = [
"//pkg/raft/raftstoreliveness", #keep
"@com_github_gogo_protobuf//gogoproto",
],
)

go_library(
Expand Down
18 changes: 11 additions & 7 deletions pkg/raft/raftpb/raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,12 @@ message Message {
repeated Entry entries = 7 [(gogoproto.nullable) = false];
optional uint64 commit = 8 [(gogoproto.nullable) = false];
optional uint64 lead = 16 [(gogoproto.nullable) = false, (gogoproto.casttype) = "PeerID"];
optional uint64 leadEpoch = 17 [(gogoproto.nullable) = false,
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/raft/raftstoreliveness.StoreLivenessEpoch"];
// (type=MsgStorageAppend,vote=5,term=10) means the local node is voting for
// peer 5 in term 10. For MsgStorageAppends, the term, vote, lead, and commit
// fields will either all be set (to facilitate the construction of a
// HardState) if any of the fields have changed or will all be unset if none
// peer 5 in term 10. For MsgStorageAppends, the term, vote, lead, leadEpoch,
// and commit fields will either all be set (to facilitate the construction of
// a HardState) if any of the fields have changed or will all be unset if none
// of the fields have changed.
optional uint64 vote = 13 [(gogoproto.nullable) = false, (gogoproto.casttype) = "PeerID"];
// snapshot is non-nil and non-empty for MsgSnap messages and nil for all other
Expand Down Expand Up @@ -125,10 +127,12 @@ message Message {
}

message HardState {
optional uint64 term = 1 [(gogoproto.nullable) = false];
optional uint64 vote = 2 [(gogoproto.nullable) = false, (gogoproto.casttype) = "PeerID"];
optional uint64 commit = 3 [(gogoproto.nullable) = false];
optional uint64 lead = 4 [(gogoproto.nullable) = false, (gogoproto.casttype) = "PeerID"];
optional uint64 term = 1 [(gogoproto.nullable) = false];
optional uint64 vote = 2 [(gogoproto.nullable) = false, (gogoproto.casttype) = "PeerID"];
optional uint64 commit = 3 [(gogoproto.nullable) = false];
optional uint64 lead = 4 [(gogoproto.nullable) = false, (gogoproto.casttype) = "PeerID"];
optional uint64 lead_epoch = 5 [(gogoproto.nullable) = false,
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/raft/raftstoreliveness.StoreLivenessEpoch"];
}

// ConfChangeTransition specifies the behavior of a configuration change with
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,11 @@ func (env *InteractionEnv) ProcessAppendThread(idx int) error {
env.Output.WriteString("Processing:\n")
env.Output.WriteString(raft.DescribeMessage(m, defaultEntryFormatter) + "\n")
st := raftpb.HardState{
Term: m.Term,
Vote: m.Vote,
Commit: m.Commit,
Lead: m.Lead,
Term: m.Term,
Vote: m.Vote,
Commit: m.Commit,
Lead: m.Lead,
LeadEpoch: m.LeadEpoch,
}
var snap raftpb.Snapshot
if m.Snapshot != nil {
Expand Down
7 changes: 6 additions & 1 deletion pkg/raft/rawnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,14 @@ func MustSync(st, prevst pb.HardState, entsnum int) bool {
// (Updated on stable storage before responding to RPCs)
// currentTerm
// currentLead
// currentLeadEpoch
// votedFor
// log entries[]
return entsnum != 0 || st.Vote != prevst.Vote || st.Term != prevst.Term || st.Lead != prevst.Lead
return entsnum != 0 || st.Vote != prevst.Vote || st.Term != prevst.Term ||
// TODO(arul): The st.LeadEpoch != prevst.LeadEpoch condition is currently
// untested because we don't set r.leadEpoch yet. We'll do so when we
// introduce MsgFortifyResp. Test this then.
st.Lead != prevst.Lead || st.LeadEpoch != prevst.LeadEpoch
}

func needStorageAppendMsg(r *raft, rd Ready) bool {
Expand Down
4 changes: 3 additions & 1 deletion pkg/raft/rawnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/raft/quorum"
pb "github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/raft/raftstoreliveness"
"github.com/cockroachdb/cockroach/pkg/raft/tracker"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -610,7 +611,7 @@ func TestRawNodeRestart(t *testing.T) {
{Term: 1, Index: 1},
{Term: 1, Index: 2, Data: []byte("foo")},
}
st := pb.HardState{Term: 1, Commit: 1, Lead: 1}
st := pb.HardState{Term: 1, Commit: 1, Lead: 1, LeadEpoch: 1}

want := Ready{
HardState: emptyState, // no HardState is emitted because there was no change
Expand All @@ -633,6 +634,7 @@ func TestRawNodeRestart(t *testing.T) {
assert.Equal(t, uint64(1), rawNode.raft.raftLog.committed)
assert.Equal(t, pb.PeerID(1), rawNode.raft.lead)
assert.True(t, rawNode.raft.state == StateFollower)
assert.Equal(t, raftstoreliveness.StoreLivenessEpoch(1), rawNode.raft.leadEpoch)

// Ensure we campaign after the election timeout has elapsed.
for i := 0; i < rawNode.raft.randomizedElectionTimeout; i++ {
Expand Down

0 comments on commit 9288cbf

Please sign in to comment.