Skip to content

Commit

Permalink
Merge #127385
Browse files Browse the repository at this point in the history
127385: raft: persist LeadEpoch in HardState r=arulajmani a=arulajmani

Closes #124497

Release note: None

Co-authored-by: Arul Ajmani <[email protected]>
  • Loading branch information
craig[bot] and arulajmani committed Jul 18, 2024
2 parents 4e6ad53 + 00a0052 commit 59608c1
Show file tree
Hide file tree
Showing 17 changed files with 211 additions and 186 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@ go_test(
"//pkg/raft",
"//pkg/raft/confchange",
"//pkg/raft/raftpb",
"//pkg/raft/raftstoreliveness",
"//pkg/raft/tracker",
"//pkg/roachpb",
"//pkg/rpc",
Expand Down
19 changes: 11 additions & 8 deletions pkg/kv/kvserver/below_raft_protos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/raft/raftstoreliveness"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/testutils/datapathutils"
Expand Down Expand Up @@ -51,21 +52,23 @@ func TestBelowRaftProtosDontChange(t *testing.T) {
},
func(r *rand.Rand) protoutil.Message {
type expectedHardState struct {
Term uint64
Vote raftpb.PeerID
Commit uint64
Lead raftpb.PeerID
Term uint64
Vote raftpb.PeerID
Commit uint64
Lead raftpb.PeerID
LeadEpoch raftstoreliveness.Epoch
}
// Conversion fails if new fields are added to `HardState`, in which case this method
// and the expected sums should be updated.
var _ = expectedHardState(raftpb.HardState{})

n := r.Uint64()
return &raftpb.HardState{
Term: n % 3,
Vote: raftpb.PeerID(n % 7),
Commit: n % 11,
Lead: raftpb.PeerID(n % 13),
Term: n % 3,
Vote: raftpb.PeerID(n % 7),
Commit: n % 11,
Lead: raftpb.PeerID(n % 13),
LeadEpoch: raftstoreliveness.Epoch(n % 17),
}
},
func(r *rand.Rand) protoutil.Message {
Expand Down
9 changes: 5 additions & 4 deletions pkg/kv/kvserver/logstore/logstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,11 @@ func (s *LogStore) storeEntriesAndCommitBatch(
}

hs := raftpb.HardState{
Term: m.Term,
Vote: m.Vote,
Commit: m.Commit,
Lead: m.Lead,
Term: m.Term,
Vote: m.Vote,
Commit: m.Commit,
Lead: m.Lead,
LeadEpoch: m.LeadEpoch,
}
if !raft.IsEmptyHardState(hs) {
// NB: Note that without additional safeguards, it's incorrect to write
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/logstore/stateloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ func (sl StateLoader) SynthesizeHardState(
if oldHS.Term == newHS.Term {
newHS.Vote = oldHS.Vote
newHS.Lead = oldHS.Lead
newHS.LeadEpoch = oldHS.LeadEpoch
}
err := sl.SetHardState(ctx, readWriter, newHS)
return errors.Wrapf(err, "writing HardState %+v", &newHS)
Expand Down
9 changes: 5 additions & 4 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -967,10 +967,11 @@ func (r *Replica) handleRaftReadyRaftMuLocked(

snap := *msgStorageAppend.Snapshot
hs := raftpb.HardState{
Term: msgStorageAppend.Term,
Vote: msgStorageAppend.Vote,
Commit: msgStorageAppend.Commit,
Lead: msgStorageAppend.Lead,
Term: msgStorageAppend.Term,
Vote: msgStorageAppend.Vote,
Commit: msgStorageAppend.Commit,
Lead: msgStorageAppend.Lead,
LeadEpoch: msgStorageAppend.LeadEpoch,
}
if len(msgStorageAppend.Entries) != 0 {
log.Fatalf(ctx, "found Entries in MsgStorageAppend with non-empty Snapshot")
Expand Down
10 changes: 4 additions & 6 deletions pkg/kv/kvserver/replica_store_liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ func (r *replicaRLockedStoreLiveness) getStoreIdent(replicaID uint64) (slpb.Stor
}

// SupportFor implements the raftstoreliveness.StoreLiveness interface.
func (r *replicaRLockedStoreLiveness) SupportFor(
replicaID uint64,
) (raftstoreliveness.StoreLivenessEpoch, bool) {
func (r *replicaRLockedStoreLiveness) SupportFor(replicaID uint64) (raftstoreliveness.Epoch, bool) {
storeID, ok := r.getStoreIdent(replicaID)
if !ok {
return 0, false
Expand All @@ -45,13 +43,13 @@ func (r *replicaRLockedStoreLiveness) SupportFor(
if !ok {
return 0, false
}
return raftstoreliveness.StoreLivenessEpoch(epoch), true
return raftstoreliveness.Epoch(epoch), true
}

// SupportFrom implements the raftstoreliveness.StoreLiveness interface.
func (r *replicaRLockedStoreLiveness) SupportFrom(
replicaID uint64,
) (raftstoreliveness.StoreLivenessEpoch, hlc.Timestamp, bool) {
) (raftstoreliveness.Epoch, hlc.Timestamp, bool) {
storeID, ok := r.getStoreIdent(replicaID)
if !ok {
return 0, hlc.Timestamp{}, false
Expand All @@ -60,7 +58,7 @@ func (r *replicaRLockedStoreLiveness) SupportFrom(
if !ok {
return 0, hlc.Timestamp{}, false
}
return raftstoreliveness.StoreLivenessEpoch(epoch), exp, true
return raftstoreliveness.Epoch(epoch), exp, true
}

// SupportFromEnabled implements the raftstoreliveness.StoreLiveness interface.
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/stateloader/initial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestSynthesizeHardState(t *testing.T) {
eng := storage.NewDefaultInMemForTesting()
stopper.AddCloser(eng)

tHS := raftpb.HardState{Term: 2, Vote: 3, Commit: 4, Lead: 5}
tHS := raftpb.HardState{Term: 2, Vote: 3, Commit: 4, Lead: 5, LeadEpoch: 6}

testCases := []struct {
TruncTerm kvpb.RaftTerm
Expand All @@ -45,9 +45,9 @@ func TestSynthesizeHardState(t *testing.T) {
// Can't wind back the committed index of the new HardState.
{OldHS: &tHS, RaftAppliedIndex: kvpb.RaftIndex(tHS.Commit - 1), Err: "can't decrease HardState.Commit"},
{OldHS: &tHS, RaftAppliedIndex: kvpb.RaftIndex(tHS.Commit), NewHS: tHS},
{OldHS: &tHS, RaftAppliedIndex: kvpb.RaftIndex(tHS.Commit + 1), NewHS: raftpb.HardState{Term: tHS.Term, Vote: 3, Commit: tHS.Commit + 1, Lead: 5}},
// Higher Term is picked up, but Vote and Lead aren't carried over when the
// term changes.
{OldHS: &tHS, RaftAppliedIndex: kvpb.RaftIndex(tHS.Commit + 1), NewHS: raftpb.HardState{Term: tHS.Term, Vote: 3, Commit: tHS.Commit + 1, Lead: 5, LeadEpoch: 6}},
// Higher Term is picked up, but Vote, Lead, and LeadEpoch aren't carried
// over when the term changes.
{OldHS: &tHS, RaftAppliedIndex: kvpb.RaftIndex(tHS.Commit), TruncTerm: 11, NewHS: raftpb.HardState{Term: 11, Vote: 0, Commit: tHS.Commit, Lead: 0}},
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
echo
----
2027450491469602084
9497879862544146727
1 change: 1 addition & 0 deletions pkg/raft/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ go_test(
deps = [
"//pkg/raft/quorum",
"//pkg/raft/raftpb",
"//pkg/raft/raftstoreliveness",
"//pkg/raft/rafttest",
"//pkg/raft/tracker",
"@com_github_cockroachdb_datadriven//:datadriven",
Expand Down
12 changes: 8 additions & 4 deletions pkg/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,8 @@ type raft struct {

// the leader id
lead pb.PeerID
// TODO(arul): This should be populated when responding to a MsgFortify.
leadEpoch raftstoreliveness.Epoch
// leadTransferee is id of the leader transfer target when its value is not zero.
// Follow the procedure defined in raft thesis 3.10.
leadTransferee pb.PeerID
Expand Down Expand Up @@ -462,10 +464,11 @@ func (r *raft) softState() SoftState { return SoftState{RaftState: r.state} }

func (r *raft) hardState() pb.HardState {
return pb.HardState{
Term: r.Term,
Vote: r.Vote,
Commit: r.raftLog.committed,
Lead: r.lead,
Term: r.Term,
Vote: r.Vote,
Commit: r.raftLog.committed,
Lead: r.lead,
LeadEpoch: r.leadEpoch,
}
}

Expand Down Expand Up @@ -2010,6 +2013,7 @@ func (r *raft) loadState(state pb.HardState) {
r.Term = state.Term
r.Vote = state.Vote
r.lead = state.Lead
r.leadEpoch = state.LeadEpoch
}

// pastElectionTimeout returns true if r.electionElapsed is greater
Expand Down
5 changes: 4 additions & 1 deletion pkg/raft/raftpb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ go_proto_library(
importpath = "github.com/cockroachdb/cockroach/pkg/raft/raftpb",
proto = ":raftpb_proto",
visibility = ["//visibility:public"],
deps = ["@com_github_gogo_protobuf//gogoproto"],
deps = [
"//pkg/raft/raftstoreliveness", #keep
"@com_github_gogo_protobuf//gogoproto",
],
)

go_library(
Expand Down
Loading

0 comments on commit 59608c1

Please sign in to comment.