Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: iavl live migration of orphan nodes #866

Closed
wants to merge 26 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ jobs:
name: golangci-lint
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
- name: Check out repository code
uses: actions/checkout@v4
- name: 🐿 Setup Golang
uses: actions/setup-go@v4
with:
go-version: '^1.20.0'
go-version: 1.21
- name: golangci-lint
uses: golangci/golangci-lint-action@v3
with:
version: v1.51.2
Comment on lines -15 to -22
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a drive by change, the linter was not working on any open PR

run: make lint
9 changes: 9 additions & 0 deletions batch.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package iavl

import (
"sync"

dbm "github.com/cosmos/cosmos-db"
)

Expand All @@ -11,6 +13,7 @@ type BatchWithFlusher struct {
db dbm.DB // This is only used to create new batch
batch dbm.Batch // Batched writing buffer.

mtx sync.Mutex
flushThreshold int // The threshold to flush the batch to disk.
}

Expand Down Expand Up @@ -46,6 +49,9 @@ func (b *BatchWithFlusher) estimateSizeAfterSetting(key []byte, value []byte) (i
// the batch is flushed to disk, cleared, and a new one is created with buffer pre-allocated to threshold.
// The addition entry is then added to the batch.
func (b *BatchWithFlusher) Set(key, value []byte) error {
b.mtx.Lock()
defer b.mtx.Unlock()

batchSizeAfter, err := b.estimateSizeAfterSetting(key, value)
if err != nil {
return err
Expand All @@ -67,6 +73,9 @@ func (b *BatchWithFlusher) Set(key, value []byte) error {
// the batch is flushed to disk, cleared, and a new one is created with buffer pre-allocated to threshold.
// The deletion entry is then added to the batch.
func (b *BatchWithFlusher) Delete(key []byte) error {
b.mtx.Lock()
defer b.mtx.Unlock()

batchSizeAfter, err := b.estimateSizeAfterSetting(key, []byte{})
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions fastnode/fast_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func NewNode(key []byte, value []byte, version int64) *Node {
}

// DeserializeNode constructs an *FastNode from an encoded byte slice.
// It assumes we do not mutate this input []byte.
func DeserializeNode(key []byte, buf []byte) (*Node, error) {
ver, n, err := encoding.DecodeVarint(buf)
if err != nil {
Expand Down
22 changes: 19 additions & 3 deletions internal/encoding/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ var uvarintPool = &sync.Pool{

// decodeBytes decodes a varint length-prefixed byte slice, returning it along with the number
// of input bytes read.
// Assumes bz will not be mutated.
func DecodeBytes(bz []byte) ([]byte, int, error) {
s, n, err := DecodeUvarint(bz)
if err != nil {
Expand All @@ -51,9 +52,7 @@ func DecodeBytes(bz []byte) ([]byte, int, error) {
if len(bz) < end {
return nil, n, fmt.Errorf("insufficient bytes decoding []byte of length %v", size)
}
bz2 := make([]byte, size)
copy(bz2, bz[n:end])
return bz2, end, nil
return bz[n:end], end, nil
}

// decodeUvarint decodes a varint-encoded unsigned integer from a byte slice, returning it and the
Expand Down Expand Up @@ -97,6 +96,23 @@ func EncodeBytes(w io.Writer, bz []byte) error {
return err
}

var hashLenBz []byte

func init() {
hashLenBz = make([]byte, 1)
binary.PutUvarint(hashLenBz, 32)
}

// Encode 32 byte long hash
func Encode32BytesHash(w io.Writer, bz []byte) error {
_, err := w.Write(hashLenBz)
if err != nil {
return err
}
_, err = w.Write(bz)
return err
}

// encodeBytesSlice length-prefixes the byte slice and returns it.
func EncodeBytesSlice(bz []byte) ([]byte, error) {
buf := bufPool.Get().(*bytes.Buffer)
Expand Down
42 changes: 42 additions & 0 deletions keyformat/prefix_formatter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package keyformat

import "encoding/binary"

// This file builds some dedicated key formatters for what appears in benchmarks.

// Prefixes a single byte before a 32 byte hash.
type FastPrefixFormatter struct {
prefix byte
length int
prefixSlice []byte
}

func NewFastPrefixFormatter(prefix byte, length int) *FastPrefixFormatter {
return &FastPrefixFormatter{prefix: prefix, length: length, prefixSlice: []byte{prefix}}
}

func (f *FastPrefixFormatter) Key(bz []byte) []byte {
key := make([]byte, 1+f.length)
key[0] = f.prefix
copy(key[1:], bz)
return key
}

func (f *FastPrefixFormatter) Scan(key []byte, a interface{}) {
scan(a, key[1:])
}

func (f *FastPrefixFormatter) KeyInt64(bz int64) []byte {
key := make([]byte, 1+f.length)
key[0] = f.prefix
binary.BigEndian.PutUint64(key[1:], uint64(bz))
return key
}

func (f *FastPrefixFormatter) Prefix() []byte {
return f.prefixSlice
}

func (f *FastPrefixFormatter) Length() int {
return 1 + f.length
}
164 changes: 115 additions & 49 deletions mutable_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,65 +266,131 @@ func (tree *MutableTree) set(key []byte, value []byte) (updated bool, err error)
func (tree *MutableTree) recursiveSet(node *Node, key []byte, value []byte) (
newSelf *Node, updated bool, err error,
) {
version := tree.version + 1

if node.isLeaf() {
if !tree.skipFastStorageUpgrade {
tree.addUnsavedAddition(key, fastnode.NewNode(key, value, version))
}
switch bytes.Compare(key, node.key) {
case -1: // setKey < leafKey
return &Node{
key: node.key,
subtreeHeight: 1,
size: 2,
nodeKey: nil,
leftNode: NewNode(key, value),
rightNode: node,
}, false, nil
case 1: // setKey > leafKey
return &Node{
key: key,
subtreeHeight: 1,
size: 2,
nodeKey: nil,
leftNode: node,
rightNode: NewNode(key, value),
}, false, nil
default:
return NewNode(key, value), true, nil
return tree.recursiveSetLeaf(node, key, value)
}
node, err = node.clone(tree)
if err != nil {
return nil, false, err
}

if bytes.Compare(key, node.key) < 0 {
if len(node.leftNodeKey) == 32 {
node.leftNode, updated, err = tree.recursiveSetLegacy(node.leftNode, key, value)
} else {
node.leftNode, updated, err = tree.recursiveSet(node.leftNode, key, value)
}
} else {
node, err = node.clone(tree)
if err != nil {
return nil, false, err
return nil, updated, err
}

if bytes.Compare(key, node.key) < 0 {
node.leftNode, updated, err = tree.recursiveSet(node.leftNode, key, value)
if err != nil {
return nil, updated, err
}
} else {
if len(node.rightNodeKey) == 32 {
node.rightNode, updated, err = tree.recursiveSetLegacy(node.rightNode, key, value)
} else {
node.rightNode, updated, err = tree.recursiveSet(node.rightNode, key, value)
if err != nil {
return nil, updated, err
}
}

if updated {
return node, updated, nil
}
err = node.calcHeightAndSize(tree.ImmutableTree)
if err != nil {
return nil, false, err
}
newNode, err := tree.balance(node)
if err != nil {
return nil, false, err
return nil, updated, err
}
return newNode, updated, err
}

if updated {
return node, updated, nil
}
err = node.calcHeightAndSize(tree.ImmutableTree)
if err != nil {
return nil, false, err
}
newNode, err := tree.balance(node)
if err != nil {
return nil, false, err
}
return newNode, updated, err
}

func (tree *MutableTree) recursiveSetLeaf(node *Node, key []byte, value []byte) (
newSelf *Node, updated bool, err error,
) {
version := tree.version + 1
if !tree.skipFastStorageUpgrade {
tree.addUnsavedAddition(key, fastnode.NewNode(key, value, version))
}
switch bytes.Compare(key, node.key) {
case -1: // setKey < leafKey
return &Node{
key: node.key,
subtreeHeight: 1,
size: 2,
nodeKey: nil,
leftNode: NewNode(key, value),
rightNode: node,
}, false, nil
case 1: // setKey > leafKey
return &Node{
key: key,
subtreeHeight: 1,
size: 2,
nodeKey: nil,
leftNode: node,
rightNode: NewNode(key, value),
}, false, nil
default:
return NewNode(key, value), true, nil
}
}

// recursiveSetLegacy is the same as recursiveSet but takes an optimization where
// if you are updating an existing leaf, you do not need to get both children, you only need one child.
//
// This operates on the assumption that when recursiveSet enters a legacy node,
// all of the legacy nodes children will be legacy nodes.
func (tree *MutableTree) recursiveSetLegacy(node *Node, key []byte, value []byte) (
newSelf *Node, updated bool, err error,
) {
if node.isLeaf() {
return tree.recursiveSetLeaf(node, key, value)
}
node, err = node.cloneNoChildFetch()
if err != nil {
return nil, false, err
}

recurseLeft := false
if bytes.Compare(key, node.key) < 0 {
recurseLeft = true
}
child, err := node.fetchOneChild(tree, recurseLeft)
if err != nil {
return nil, false, err
}

newChild, updated, err := tree.recursiveSetLegacy(child, key, value)
if err != nil {
return nil, updated, err
}
if recurseLeft {
node.leftNode = newChild
} else {
node.rightNode = newChild
}

if updated {
return node, updated, nil
}
_, err = node.fetchOneChild(tree, !recurseLeft)
if err != nil {
return nil, false, err
}

err = node.calcHeightAndSize(tree.ImmutableTree)
if err != nil {
return nil, false, err
}
newNode, err := tree.balance(node)
if err != nil {
return nil, false, err
}
return newNode, updated, err
}

// Remove removes a key from the working tree. The given key byte slice should not be modified
Expand Down
Loading