Skip to content

Commit

Permalink
server: Implement MaxBytes option for RangeRequest
Browse files Browse the repository at this point in the history
Signed-off-by: Eric Lin <[email protected]>
  • Loading branch information
linxiulei committed Sep 15, 2023
1 parent e3c2f75 commit f5ce3c7
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 7 deletions.
12 changes: 12 additions & 0 deletions client/v3/op.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type Op struct {

// for range
limit int64
maxBytes int64
sort *SortOption
serializable bool
keysOnly bool
Expand Down Expand Up @@ -157,6 +158,7 @@ func (op Op) toRangeRequest() *pb.RangeRequest {
Key: op.key,
RangeEnd: op.end,
Limit: op.limit,
MaxBytes: op.maxBytes,
Revision: op.rev,
Serializable: op.serializable,
KeysOnly: op.keysOnly,
Expand Down Expand Up @@ -267,6 +269,8 @@ func OpDelete(key string, opts ...OpOption) Op {
panic("unexpected filter in delete")
case ret.createdNotify:
panic("unexpected createdNotify in delete")
case ret.maxBytes != 0:
panic("unexpected maxBytes in delete")
}
return ret
}
Expand Down Expand Up @@ -296,6 +300,8 @@ func OpPut(key, val string, opts ...OpOption) Op {
panic("unexpected filter in put")
case ret.createdNotify:
panic("unexpected createdNotify in put")
case ret.maxBytes != 0:
panic("unexpected maxBytes in delete")
}
return ret
}
Expand Down Expand Up @@ -323,6 +329,8 @@ func opWatch(key string, opts ...OpOption) Op {
panic("unexpected mod revision filter in watch")
case ret.minCreateRev != 0, ret.maxCreateRev != 0:
panic("unexpected create revision filter in watch")
case ret.maxBytes != 0:
panic("unexpected maxBytes in delete")
}
return ret
}
Expand All @@ -345,6 +353,10 @@ func WithLease(leaseID LeaseID) OpOption {
// If WithLimit is given a 0 limit, it is treated as no limit.
func WithLimit(n int64) OpOption { return func(op *Op) { op.limit = n } }

// WithMaxBytes limits the size in bytes of results to return from 'Get' request.
// If WithMaxBytes is given a 0 limit, it is treated as no limit.
func WithMaxBytes(n int64) OpOption { return func(op *Op) { op.maxBytes = n } }

// WithRev specifies the store revision for 'Get' request.
// Or the start revision of 'Watch' request.
func WithRev(rev int64) OpOption { return func(op *Op) { op.rev = rev } }
Expand Down
21 changes: 17 additions & 4 deletions server/etcdserver/txn/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,18 +150,20 @@ func executeRange(ctx context.Context, lg *zap.Logger, txnRead mvcc.TxnRead, r *
resp := &pb.RangeResponse{}
resp.Header = &pb.ResponseHeader{}

limit := r.Limit
limit, maxBytes := r.Limit, r.MaxBytes
if r.SortOrder != pb.RangeRequest_NONE ||
r.MinModRevision != 0 || r.MaxModRevision != 0 ||
r.MinCreateRevision != 0 || r.MaxCreateRevision != 0 {
// fetch everything; sort and truncate afterwards
limit = 0
maxBytes = 0
}

ro := mvcc.RangeOptions{
Limit: limit,
Rev: r.Revision,
Count: r.CountOnly,
Limit: limit,
MaxBytes: maxBytes,
Rev: r.Revision,
Count: r.CountOnly,
}

rr, err := txnRead.Range(ctx, r.Key, mkGteRange(r.RangeEnd), ro)
Expand Down Expand Up @@ -227,6 +229,17 @@ func executeRange(ctx context.Context, lg *zap.Logger, txnRead mvcc.TxnRead, r *
rr.KVs = rr.KVs[:r.Limit]
resp.More = true
}
if r.MaxBytes > 0 && maxBytes == 0 {
var totalBytes int64
for i, kv := range rr.KVs {
totalBytes += int64(kv.Size())
if totalBytes > r.MaxBytes {
resp.More = true
rr.KVs = rr.KVs[:i]
break
}
}
}

trace.Step("filter and sort the key-value pairs")
resp.Header.Revision = rr.Rev
Expand Down
7 changes: 4 additions & 3 deletions server/storage/mvcc/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ import (
)

type RangeOptions struct {
Limit int64
Rev int64
Count bool
Limit int64
MaxBytes int64
Rev int64
Count bool
}

type RangeResult struct {
Expand Down
47 changes: 47 additions & 0 deletions server/storage/mvcc/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,53 @@ func testKVRangeLimit(t *testing.T, f rangeFunc) {
}
}

func TestKVRangeMaxBytes(t *testing.T) { testKVRangeMaxBytes(t, normalRangeFunc) }
func TestKVTxnRangeMaxBytes(t *testing.T) { testKVRangeMaxBytes(t, txnRangeFunc) }

func testKVRangeMaxBytes(t *testing.T, f rangeFunc) {
b, _ := betesting.NewDefaultTmpBackend(t)
s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
defer cleanup(s, b)

kvs := put3TestKVs(s)

kvsSize := func(kvs []mvccpb.KeyValue) int64 {
var s int
for _, kv := range kvs {
s += kv.Size()
}
return int64(s)
}

wrev := int64(4)
tests := []struct {
maxBytes int64
wcounts int64
wkvs []mvccpb.KeyValue
wmore bool
}{
// no limit
{-1, 3, kvs, false},
// no limit
{0, 3, kvs, false},
{kvsSize(kvs[:1]), 3, kvs[:1], true},
{kvsSize(kvs[:2]), 3, kvs[:2], true},
{kvsSize(kvs), 3, kvs, false},
{1000, 3, kvs, false},
}
for i, tt := range tests {
tt := tt
t.Run(fmt.Sprint(i), func(t *testing.T) {
r, err := f(s, []byte("foo"), []byte("foo3"), RangeOptions{MaxBytes: tt.maxBytes})
require.Nil(t, err)
require.Equal(t, tt.wkvs, r.KVs)
require.Equal(t, wrev, r.Rev)
require.Equal(t, len(kvs), r.Count)
require.Equal(t, tt.wmore, r.More)
})
}
}

func TestKVPutMultipleTimes(t *testing.T) { testKVPutMultipleTimes(t, normalPutFunc) }
func TestKVTxnPutMultipleTimes(t *testing.T) { testKVPutMultipleTimes(t, txnPutFunc) }

Expand Down
6 changes: 6 additions & 0 deletions server/storage/mvcc/kvstore_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func (tr *storeTxnCommon) rangeKeys(ctx context.Context, key, end []byte, curRev

kvs := make([]mvccpb.KeyValue, limit)
revBytes := newRevBytes()
var totalBytes int64
for i, revpair := range revpairs[:len(kvs)] {
select {
case <-ctx.Done():
Expand All @@ -120,6 +121,11 @@ func (tr *storeTxnCommon) rangeKeys(ctx context.Context, key, end []byte, curRev
zap.Int("len-values", len(vs)),
)
}
totalBytes += int64(len(vs[0]))
if ro.MaxBytes > 0 && totalBytes > ro.MaxBytes {
kvs = kvs[:i]
break
}
if err := kvs[i].Unmarshal(vs[0]); err != nil {
tr.s.lg.Fatal(
"failed to unmarshal mvccpb.KeyValue",
Expand Down

0 comments on commit f5ce3c7

Please sign in to comment.