From 691a0972eb10e921d85d1440917a108901b478ef Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Thu, 18 May 2023 09:38:09 +0100 Subject: [PATCH] raft: add backpressure for CommittedEntries flow This commit introduces a mechanism for limiting the flow of CommittedEntries that raft pushes through the Ready handling loop. Signed-off-by: Pavel Kalinnikov --- node.go | 15 ++++++++++++++- rawnode.go | 18 +++++++++++++----- rawnode_test.go | 4 ++-- 3 files changed, 29 insertions(+), 8 deletions(-) diff --git a/node.go b/node.go index 9c53aed2..921f597a 100644 --- a/node.go +++ b/node.go @@ -114,6 +114,19 @@ type Ready struct { MustSync bool } +// ReadyOpts shapes and tunes the work returned via the Ready struct. +type ReadyOpts struct { + // DisableCommittedEntries instructs raft not to push more committed entries + // through Ready.CommittedEntries. This indicates that the Ready handler is + // not prepared to apply committed log entries at the moment. + // + // Normally, raft keeps up to Config.MaxCommittedSizePerReady worth of + // committed entries in flight to be applied. DisableCommittedEntries allows + // the caller to pause this flow. For instance, this is helpful when the + // handler temporarily does not have resources to apply more entries timely. + DisableCommittedEntries bool +} + func isHardStateEqual(a, b pb.HardState) bool { return a.Term == b.Term && a.Vote == b.Vote && a.Commit == b.Commit } @@ -334,7 +347,7 @@ func (n *node) run() { // handled first, but it's generally good to emit larger Readys plus // it simplifies testing (by emitting less frequently and more // predictably). - rd = n.rn.readyWithoutAccept() + rd = n.rn.readyWithoutAccept(ReadyOpts{}) readyc = n.readyc } diff --git a/rawnode.go b/rawnode.go index 0f3807c6..bccc81f5 100644 --- a/rawnode.go +++ b/rawnode.go @@ -129,20 +129,28 @@ func (rn *RawNode) Step(m pb.Message) error { // and sending messages. The returned Ready() *must* be handled and subsequently // passed back via Advance(). func (rn *RawNode) Ready() Ready { - rd := rn.readyWithoutAccept() + return rn.ReadyWithOpts(ReadyOpts{}) +} + +// ReadyWithOpts is like Ready(), with options helping the caller to control +// aspects of the work returned. +func (rn *RawNode) ReadyWithOpts(opts ReadyOpts) Ready { + rd := rn.readyWithoutAccept(opts) rn.acceptReady(rd) return rd } // readyWithoutAccept returns a Ready. This is a read-only operation, i.e. there // is no obligation that the Ready must be handled. -func (rn *RawNode) readyWithoutAccept() Ready { +func (rn *RawNode) readyWithoutAccept(opts ReadyOpts) Ready { r := rn.raft rd := Ready{ - Entries: r.raftLog.nextUnstableEnts(), - CommittedEntries: r.raftLog.nextCommittedEnts(rn.applyUnstableEntries()), - Messages: r.msgs, + Entries: r.raftLog.nextUnstableEnts(), + Messages: r.msgs, + } + if !opts.DisableCommittedEntries { + rd.CommittedEntries = r.raftLog.nextCommittedEnts(rn.applyUnstableEntries()) } if softSt := r.softState(); !softSt.equal(rn.prevSoftSt) { // Allocate only when SoftState changes. diff --git a/rawnode_test.go b/rawnode_test.go index 553d74fd..ec67d7fd 100644 --- a/rawnode_test.go +++ b/rawnode_test.go @@ -473,7 +473,7 @@ func TestRawNodeJointAutoLeave(t *testing.T) { } // Move the RawNode along. It should not leave joint because it's follower. - rd := rawNode.readyWithoutAccept() + rd := rawNode.readyWithoutAccept(ReadyOpts{}) // Check that the right ConfChange comes out. if len(rd.Entries) != 0 { t.Fatalf("expected zero entry, got %+v", rd) @@ -1120,7 +1120,7 @@ func TestRawNodeConsumeReady(t *testing.T) { // Inject first message, make sure it's visible via readyWithoutAccept. rn.raft.msgs = append(rn.raft.msgs, m1) - rd := rn.readyWithoutAccept() + rd := rn.readyWithoutAccept(ReadyOpts{}) if len(rd.Messages) != 1 || !reflect.DeepEqual(rd.Messages[0], m1) { t.Fatalf("expected only m1 sent, got %+v", rd.Messages) }