Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raft: add backpressure for CommittedEntries flow #60

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion node.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,19 @@ type Ready struct {
MustSync bool
}

// ReadyOpts shapes and tunes the work returned via the Ready struct.
type ReadyOpts struct {
// DisableCommittedEntries instructs raft not to push more committed entries
// through Ready.CommittedEntries. This indicates that the Ready handler is
// not prepared to apply committed log entries at the moment.
//
// Normally, raft keeps up to Config.MaxCommittedSizePerReady worth of
// committed entries in flight to be applied. DisableCommittedEntries allows
// the caller to pause this flow. For instance, this is helpful when the
// handler temporarily does not have resources to apply more entries timely.
DisableCommittedEntries bool
}

func isHardStateEqual(a, b pb.HardState) bool {
return a.Term == b.Term && a.Vote == b.Vote && a.Commit == b.Commit
}
Expand Down Expand Up @@ -334,7 +347,7 @@ func (n *node) run() {
// handled first, but it's generally good to emit larger Readys plus
// it simplifies testing (by emitting less frequently and more
// predictably).
rd = n.rn.readyWithoutAccept()
rd = n.rn.readyWithoutAccept(ReadyOpts{})
readyc = n.readyc
}

Expand Down
18 changes: 13 additions & 5 deletions rawnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,20 +129,28 @@ func (rn *RawNode) Step(m pb.Message) error {
// and sending messages. The returned Ready() *must* be handled and subsequently
// passed back via Advance().
func (rn *RawNode) Ready() Ready {
rd := rn.readyWithoutAccept()
return rn.ReadyWithOpts(ReadyOpts{})
}

// ReadyWithOpts is like Ready(), with options helping the caller to control
// aspects of the work returned.
func (rn *RawNode) ReadyWithOpts(opts ReadyOpts) Ready {
rd := rn.readyWithoutAccept(opts)
rn.acceptReady(rd)
return rd
}

// readyWithoutAccept returns a Ready. This is a read-only operation, i.e. there
// is no obligation that the Ready must be handled.
func (rn *RawNode) readyWithoutAccept() Ready {
func (rn *RawNode) readyWithoutAccept(opts ReadyOpts) Ready {
r := rn.raft

rd := Ready{
Entries: r.raftLog.nextUnstableEnts(),
CommittedEntries: r.raftLog.nextCommittedEnts(rn.applyUnstableEntries()),
Messages: r.msgs,
Entries: r.raftLog.nextUnstableEnts(),
Messages: r.msgs,
}
if !opts.DisableCommittedEntries {
rd.CommittedEntries = r.raftLog.nextCommittedEnts(rn.applyUnstableEntries())
}
if softSt := r.softState(); !softSt.equal(rn.prevSoftSt) {
// Allocate only when SoftState changes.
Expand Down
4 changes: 2 additions & 2 deletions rawnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ func TestRawNodeJointAutoLeave(t *testing.T) {
}

// Move the RawNode along. It should not leave joint because it's follower.
rd := rawNode.readyWithoutAccept()
rd := rawNode.readyWithoutAccept(ReadyOpts{})
// Check that the right ConfChange comes out.
if len(rd.Entries) != 0 {
t.Fatalf("expected zero entry, got %+v", rd)
Expand Down Expand Up @@ -1120,7 +1120,7 @@ func TestRawNodeConsumeReady(t *testing.T) {

// Inject first message, make sure it's visible via readyWithoutAccept.
rn.raft.msgs = append(rn.raft.msgs, m1)
rd := rn.readyWithoutAccept()
rd := rn.readyWithoutAccept(ReadyOpts{})
if len(rd.Messages) != 1 || !reflect.DeepEqual(rd.Messages[0], m1) {
t.Fatalf("expected only m1 sent, got %+v", rd.Messages)
}
Expand Down