Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(v1.x.x): async pruning of orphan nodes #876

Merged
merged 16 commits into from
Feb 15, 2024
12 changes: 6 additions & 6 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ jobs:
name: golangci-lint
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-go@v4
- name: Check out repository code
uses: actions/checkout@v4
- name: 🐿 Setup Golang
uses: actions/setup-go@v4
with:
go-version: '^1.20.0'
go-version: 1.21
- name: golangci-lint
uses: golangci/golangci-lint-action@v3
with:
version: v1.51.2
run: make lint
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## Unreleased

### Improvements

- [#876](https://github.com/cosmos/iavl/pull/876) Make pruning of legacy orphan nodes asynchronous.

## v1.0.0 (October 30, 2023)

### Improvements
Expand Down
9 changes: 9 additions & 0 deletions batch.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package iavl

import (
"sync"

dbm "github.com/cosmos/cosmos-db"
)

Expand All @@ -11,6 +13,7 @@ type BatchWithFlusher struct {
db dbm.DB // This is only used to create new batch
batch dbm.Batch // Batched writing buffer.

mtx sync.Mutex
flushThreshold int // The threshold to flush the batch to disk.
}

Expand Down Expand Up @@ -46,6 +49,9 @@ func (b *BatchWithFlusher) estimateSizeAfterSetting(key []byte, value []byte) (i
// the batch is flushed to disk, cleared, and a new one is created with buffer pre-allocated to threshold.
// The addition entry is then added to the batch.
func (b *BatchWithFlusher) Set(key, value []byte) error {
b.mtx.Lock()
defer b.mtx.Unlock()

batchSizeAfter, err := b.estimateSizeAfterSetting(key, value)
if err != nil {
return err
Expand All @@ -67,6 +73,9 @@ func (b *BatchWithFlusher) Set(key, value []byte) error {
// the batch is flushed to disk, cleared, and a new one is created with buffer pre-allocated to threshold.
// The deletion entry is then added to the batch.
func (b *BatchWithFlusher) Delete(key []byte) error {
b.mtx.Lock()
defer b.mtx.Unlock()

batchSizeAfter, err := b.estimateSizeAfterSetting(key, []byte{})
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions fastnode/fast_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func NewNode(key []byte, value []byte, version int64) *Node {
}

// DeserializeNode constructs an *FastNode from an encoded byte slice.
// It assumes we do not mutate this input []byte.
func DeserializeNode(key []byte, buf []byte) (*Node, error) {
ver, n, err := encoding.DecodeVarint(buf)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ require (
github.com/emicklei/dot v1.4.2
github.com/golang/mock v1.6.0
github.com/stretchr/testify v1.8.4
google.golang.org/protobuf v1.30.0
golang.org/x/crypto v0.12.0
google.golang.org/protobuf v1.30.0
)

require (
Expand Down Expand Up @@ -49,8 +49,8 @@ require (
)

retract (
v0.18.0
// This version is not used by the Cosmos SDK and adds a maintenance burden.
// Use v1.x.x instead.
[v0.21.0, v0.21.2]
v0.18.0
)
22 changes: 19 additions & 3 deletions internal/encoding/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ var uvarintPool = &sync.Pool{

// decodeBytes decodes a varint length-prefixed byte slice, returning it along with the number
// of input bytes read.
// Assumes bz will not be mutated.
func DecodeBytes(bz []byte) ([]byte, int, error) {
s, n, err := DecodeUvarint(bz)
if err != nil {
Expand All @@ -51,9 +52,7 @@ func DecodeBytes(bz []byte) ([]byte, int, error) {
if len(bz) < end {
return nil, n, fmt.Errorf("insufficient bytes decoding []byte of length %v", size)
}
bz2 := make([]byte, size)
copy(bz2, bz[n:end])
return bz2, end, nil
return bz[n:end], end, nil
}

// decodeUvarint decodes a varint-encoded unsigned integer from a byte slice, returning it and the
Expand Down Expand Up @@ -97,6 +96,23 @@ func EncodeBytes(w io.Writer, bz []byte) error {
return err
}

var hashLenBz []byte

func init() {
hashLenBz = make([]byte, 1)
binary.PutUvarint(hashLenBz, 32)
}

// Encode 32 byte long hash
func Encode32BytesHash(w io.Writer, bz []byte) error {
_, err := w.Write(hashLenBz)
if err != nil {
return err
}
_, err = w.Write(bz)
return err
}

// encodeBytesSlice length-prefixes the byte slice and returns it.
func EncodeBytesSlice(bz []byte) ([]byte, error) {
buf := bufPool.Get().(*bytes.Buffer)
Expand Down
42 changes: 42 additions & 0 deletions keyformat/prefix_formatter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package keyformat

import "encoding/binary"

// This file builds some dedicated key formatters for what appears in benchmarks.

// Prefixes a single byte before a 32 byte hash.
type FastPrefixFormatter struct {
prefix byte
length int
prefixSlice []byte
}

func NewFastPrefixFormatter(prefix byte, length int) *FastPrefixFormatter {
return &FastPrefixFormatter{prefix: prefix, length: length, prefixSlice: []byte{prefix}}
}

func (f *FastPrefixFormatter) Key(bz []byte) []byte {
key := make([]byte, 1+f.length)
key[0] = f.prefix
copy(key[1:], bz)
return key
}

func (f *FastPrefixFormatter) Scan(key []byte, a interface{}) {
scan(a, key[1:])
}

func (f *FastPrefixFormatter) KeyInt64(bz int64) []byte {
key := make([]byte, 1+f.length)
key[0] = f.prefix
binary.BigEndian.PutUint64(key[1:], uint64(bz))
return key
}

func (f *FastPrefixFormatter) Prefix() []byte {
return f.prefixSlice
}

func (f *FastPrefixFormatter) Length() int {
return 1 + f.length
}
106 changes: 55 additions & 51 deletions mutable_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,64 +266,68 @@ func (tree *MutableTree) set(key []byte, value []byte) (updated bool, err error)
func (tree *MutableTree) recursiveSet(node *Node, key []byte, value []byte) (
newSelf *Node, updated bool, err error,
) {
version := tree.version + 1

if node.isLeaf() {
if !tree.skipFastStorageUpgrade {
tree.addUnsavedAddition(key, fastnode.NewNode(key, value, version))
}
switch bytes.Compare(key, node.key) {
case -1: // setKey < leafKey
return &Node{
key: node.key,
subtreeHeight: 1,
size: 2,
nodeKey: nil,
leftNode: NewNode(key, value),
rightNode: node,
}, false, nil
case 1: // setKey > leafKey
return &Node{
key: key,
subtreeHeight: 1,
size: 2,
nodeKey: nil,
leftNode: node,
rightNode: NewNode(key, value),
}, false, nil
default:
return NewNode(key, value), true, nil
return tree.recursiveSetLeaf(node, key, value)
}
node, err = node.clone(tree)
if err != nil {
return nil, false, err
}

if bytes.Compare(key, node.key) < 0 {
node.leftNode, updated, err = tree.recursiveSet(node.leftNode, key, value)
if err != nil {
return nil, updated, err
}
} else {
node, err = node.clone(tree)
node.rightNode, updated, err = tree.recursiveSet(node.rightNode, key, value)
if err != nil {
return nil, false, err
return nil, updated, err
}
}

if bytes.Compare(key, node.key) < 0 {
node.leftNode, updated, err = tree.recursiveSet(node.leftNode, key, value)
if err != nil {
return nil, updated, err
}
} else {
node.rightNode, updated, err = tree.recursiveSet(node.rightNode, key, value)
if err != nil {
return nil, updated, err
}
}
if updated {
return node, updated, nil
}
err = node.calcHeightAndSize(tree.ImmutableTree)
if err != nil {
return nil, false, err
}
newNode, err := tree.balance(node)
if err != nil {
return nil, false, err
}
return newNode, updated, err
}

if updated {
return node, updated, nil
}
err = node.calcHeightAndSize(tree.ImmutableTree)
if err != nil {
return nil, false, err
}
newNode, err := tree.balance(node)
if err != nil {
return nil, false, err
}
return newNode, updated, err
func (tree *MutableTree) recursiveSetLeaf(node *Node, key []byte, value []byte) (
newSelf *Node, updated bool, err error,
) {
version := tree.version + 1
if !tree.skipFastStorageUpgrade {
tree.addUnsavedAddition(key, fastnode.NewNode(key, value, version))
}
switch bytes.Compare(key, node.key) {
case -1: // setKey < leafKey
return &Node{
key: node.key,
subtreeHeight: 1,
size: 2,
nodeKey: nil,
leftNode: NewNode(key, value),
rightNode: node,
}, false, nil
case 1: // setKey > leafKey
return &Node{
key: key,
subtreeHeight: 1,
size: 2,
nodeKey: nil,
leftNode: node,
rightNode: NewNode(key, value),
}, false, nil
default:
return NewNode(key, value), true, nil
}
}

Expand Down
32 changes: 23 additions & 9 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,15 @@ func GetRootKey(version int64) []byte {

// Node represents a node in a Tree.
type Node struct {
key []byte
value []byte
hash []byte
nodeKey *NodeKey
leftNodeKey []byte
key []byte
value []byte
hash []byte
nodeKey *NodeKey
// Legacy: LeftNodeHash
// v1: Left node ptr via Version/key
leftNodeKey []byte
// Legacy: RightNodeHash
// v1: Right node ptr via Version/key
rightNodeKey []byte
size int64
leftNode *Node
Expand Down Expand Up @@ -517,19 +521,29 @@ func (node *Node) writeHashBytes(w io.Writer, version int64) error {
// (e.g. ProofLeafNode.ValueHash)
valueHash := sha256.Sum256(node.value)

err = encoding.EncodeBytes(w, valueHash[:])
err = encoding.Encode32BytesHash(w, valueHash[:])
if err != nil {
return fmt.Errorf("writing value, %w", err)
}
} else {
if node.leftNode == nil || node.rightNode == nil {
if (node.leftNode == nil && len(node.leftNodeKey) != 32) || (node.rightNode == nil && len(node.rightNodeKey) != 32) {
return ErrEmptyChild
}
err = encoding.EncodeBytes(w, node.leftNode.hash)
// If left/rightNodeKey is 32 bytes, it is a legacy node whose value is just the hash.
// We may have skipped fetching leftNode/rightNode.
if len(node.leftNodeKey) == 32 {
err = encoding.Encode32BytesHash(w, node.leftNodeKey)
} else {
err = encoding.Encode32BytesHash(w, node.leftNode.hash)
}
if err != nil {
return fmt.Errorf("writing left hash, %w", err)
}
err = encoding.EncodeBytes(w, node.rightNode.hash)
if len(node.rightNodeKey) == 32 {
err = encoding.Encode32BytesHash(w, node.rightNodeKey)
} else {
err = encoding.Encode32BytesHash(w, node.rightNode.hash)
}
if err != nil {
return fmt.Errorf("writing right hash, %w", err)
}
Expand Down
Loading
Loading