diff --git a/export.go b/export.go index af7f52649..f2e49e465 100644 --- a/export.go +++ b/export.go @@ -1,6 +1,7 @@ package iavl import ( + "bytes" "context" "errors" "fmt" @@ -31,13 +32,24 @@ type ExportNode struct { // depth-first post-order (LRN), this order must be preserved when importing in order to recreate // the same tree structure. type Exporter struct { - tree *ImmutableTree - ch chan *ExportNode - cancel context.CancelFunc + tree *ImmutableTree + ch chan *ExportNode + cancel context.CancelFunc + optimistic bool // export raw key value pairs for optimistic import } // NewExporter creates a new Exporter. Callers must call Close() when done. func newExporter(tree *ImmutableTree) (*Exporter, error) { + return newExporterWithOptions(tree, false) +} + +// NewOptimisticExporter creates a new Exporter with raw Key Values. Callers must call Close() when done. +func newOptimisticExporter(tree *ImmutableTree) (*Exporter, error) { + return newExporterWithOptions(tree, true) +} + +// NewExporterWithOptions creates a new Exporter and configures optimistic mode +func newExporterWithOptions(tree *ImmutableTree, optimistic bool) (*Exporter, error) { if tree == nil { return nil, fmt.Errorf("tree is nil: %w", ErrNotInitalizedTree) } @@ -48,13 +60,18 @@ func newExporter(tree *ImmutableTree) (*Exporter, error) { ctx, cancel := context.WithCancel(context.Background()) exporter := &Exporter{ - tree: tree, - ch: make(chan *ExportNode, exportBufferSize), - cancel: cancel, + tree: tree, + ch: make(chan *ExportNode, exportBufferSize), + cancel: cancel, + optimistic: optimistic, } tree.ndb.incrVersionReaders(tree.version) - go exporter.export(ctx) + if exporter.optimistic { + go exporter.optimisticExport(ctx) + } else { + go exporter.export(ctx) + } return exporter, nil } @@ -79,6 +96,40 @@ func (e *Exporter) export(ctx context.Context) { close(e.ch) } +// optimisticExport exports raw key, value nodes +// Cosmos-SDK should set different snapshot format so nodes can select between either "untrusted statesync" or "trusted-peer optimistic" import +func (e *Exporter) optimisticExport(ctx context.Context) { + e.tree.root.traverse(e.tree, true, func(node *Node) bool { + // TODO: How to get the original db value bytes directly without writeBytes()? + buf := bufPool.Get().(*bytes.Buffer) + buf.Reset() + defer bufPool.Put(buf) + + if err := node.writeBytes(buf); err != nil { + fmt.Printf("WARN: failed writeBytes") + } + + bytesCopy := make([]byte, buf.Len()) + copy(bytesCopy, buf.Bytes()) + + // Use Export Node Format. + exportNode := &ExportNode{ + Key: node.GetKey(), // TODO: How to get prefixed key so that import does not need to prefix? + Value: bytesCopy, + Version: 0, // Version not used + Height: 0, // Height not used + } + + select { + case e.ch <- exportNode: + return false + case <-ctx.Done(): + return true + } + }) + close(e.ch) +} + // Next fetches the next exported node, or returns ExportDone when done. func (e *Exporter) Next() (*ExportNode, error) { if exportNode, ok := <-e.ch; ok { diff --git a/export_test.go b/export_test.go index 3abd602e9..2997a4b38 100644 --- a/export_test.go +++ b/export_test.go @@ -298,6 +298,62 @@ func TestExporter_Import(t *testing.T) { } } +func TestOptimisticExporter_Import(t *testing.T) { + testcases := map[string]*ImmutableTree{ + "empty tree": NewImmutableTree(dbm.NewMemDB(), 0, false, log.NewNopLogger()), + "basic tree": setupExportTreeBasic(t), + } + if !testing.Short() { + testcases["sized tree"] = setupExportTreeSized(t, 4096) + testcases["random tree"] = setupExportTreeRandom(t) + } + + for desc, tree := range testcases { + tree := tree + t.Run(desc, func(t *testing.T) { + t.Parallel() + + exporter, err := tree.OptimisticExport() + require.NoError(t, err) + defer exporter.Close() + + newTree := NewMutableTree(dbm.NewMemDB(), 0, false, log.NewNopLogger()) + importer, err := newTree.OptimisticImport(tree.Version()) + require.NoError(t, err) + defer importer.Close() + + for { + item, err := exporter.Next() + if err == ErrorExportDone { + err = importer.Commit() + require.NoError(t, err) + break + } + require.NoError(t, err) + err = importer.Add(item) + require.NoError(t, err) + } + + treeHash := tree.Hash() + newTreeHash := newTree.Hash() + + require.Equal(t, treeHash, newTreeHash, "Tree hash mismatch") + require.Equal(t, tree.Size(), newTree.Size(), "Tree size mismatch") + require.Equal(t, tree.Version(), newTree.Version(), "Tree version mismatch") + + tree.Iterate(func(key, value []byte) bool { //nolint:errcheck + index, _, err := tree.GetWithIndex(key) + require.NoError(t, err) + newIndex, newValue, err := newTree.GetWithIndex(key) + require.NoError(t, err) + require.Equal(t, index, newIndex, "Index mismatch for key %v", key) + require.Equal(t, value, newValue, "Value mismatch for key %v", key) + return false + }) + }) + } +} + func TestExporter_Close(t *testing.T) { tree := setupExportTreeSized(t, 4096) exporter, err := tree.Export() diff --git a/immutable_tree.go b/immutable_tree.go index 7bff7077b..d03b8ffb1 100644 --- a/immutable_tree.go +++ b/immutable_tree.go @@ -160,6 +160,12 @@ func (t *ImmutableTree) Export() (*Exporter, error) { return newExporter(t) } +// OptimisiticExport returns an iterator that exports tree nodes as ExportNodes. These nodes can be +// imported with MutableTree.Import() to recreate an identical tree. +func (t *ImmutableTree) OptimisticExport() (*Exporter, error) { + return newOptimisticExporter(t) +} + // GetWithIndex returns the index and value of the specified key if it exists, or nil and the next index // otherwise. The returned value must not be modified, since it may point to data stored within // IAVL. diff --git a/import.go b/import.go index 44802170f..2499a5627 100644 --- a/import.go +++ b/import.go @@ -31,6 +31,9 @@ type Importer struct { // inflightCommit tracks a batch commit, if any. inflightCommit <-chan error + + // Optimistic raw key value import + optimistic bool } // newImporter creates a new Importer for an empty MutableTree. @@ -38,6 +41,17 @@ type Importer struct { // version should correspond to the version that was initially exported. It must be greater than // or equal to the highest ExportNode version number given. func newImporter(tree *MutableTree, version int64) (*Importer, error) { + return newImporterWithOptions(tree, version, false) +} + +// newOptimisticImporter creates a new Importer for an empty MutableTree. +// +// expects optimistic raw key values for import +func newOptimisticImporter(tree *MutableTree, version int64) (*Importer, error) { + return newImporterWithOptions(tree, version, true) +} + +func newImporterWithOptions(tree *MutableTree, version int64, optimistic bool) (*Importer, error) { if version < 0 { return nil, errors.New("imported version cannot be negative") } @@ -49,11 +63,12 @@ func newImporter(tree *MutableTree, version int64) (*Importer, error) { } return &Importer{ - tree: tree, - version: version, - batch: tree.ndb.db.NewBatch(), - stack: make([]*Node, 0, 8), - nonces: make([]uint32, version+1), + tree: tree, + version: version, + batch: tree.ndb.db.NewBatch(), + stack: make([]*Node, 0, 8), + nonces: make([]uint32, version+1), + optimistic: optimistic, }, nil } @@ -117,10 +132,65 @@ func (i *Importer) Close() { i.tree = nil } +// sendBatchIfFull can be called during imports after each key add +// automatically batch.Write() when pending writes > maxBatchSize +func (i *Importer) sendBatchIfFull() error { + if i.batchSize >= maxBatchSize { + // Wait for previous batch. + var err error + if i.inflightCommit != nil { + err = <-i.inflightCommit + i.inflightCommit = nil + } + if err != nil { + return err + } + result := make(chan error) + i.inflightCommit = result + go func(batch db.Batch) { + defer batch.Close() + result <- batch.Write() + }(i.batch) + i.batch = i.tree.ndb.db.NewBatch() + i.batchSize = 0 + } + + return nil +} + +// OptimisticAdd adds a TRUSTED leveldb key value pair WITHOUT verification +func (i *Importer) OptimisticAdd(exportNode *ExportNode) error { + if i.tree == nil { + return ErrNoImport + } + if exportNode == nil { + return errors.New("node cannot be nil") + } + if exportNode.Key == nil { + return errors.New("node.Key cannot be nil") + } + if exportNode.Value == nil { + return errors.New("node.Value cannot be nil") + } + + if err := i.batch.Set(i.tree.ndb.nodeKey(exportNode.Key), exportNode.Value); err != nil { + return err + } + i.batchSize++ + + i.sendBatchIfFull() + + return nil +} + // Add adds an ExportNode to the import. ExportNodes must be added in the order returned by // Exporter, i.e. depth-first post-order (LRN). Nodes are periodically flushed to the database, // but the imported version is not visible until Commit() is called. func (i *Importer) Add(exportNode *ExportNode) error { + // Keep the same Add(node) API but run faster optimistic import when configured + if i.optimistic { + return i.OptimisticAdd(exportNode) + } if i.tree == nil { return ErrNoImport } @@ -193,24 +263,28 @@ func (i *Importer) Commit() error { return ErrNoImport } - switch len(i.stack) { - case 0: - if err := i.batch.Set(i.tree.ndb.nodeKey(GetRootKey(i.version)), []byte{}); err != nil { - return err - } - case 1: - i.stack[0].nodeKey.nonce = 1 - if err := i.writeNode(i.stack[0]); err != nil { - return err - } - if i.stack[0].nodeKey.version < i.version { // it means there is no update in the given version - if err := i.batch.Set(i.tree.ndb.nodeKey(GetRootKey(i.version)), i.tree.ndb.nodeKeyPrefix(i.stack[0].nodeKey.version)); err != nil { + if i.optimistic { + // All keys should be already imported + } else { + switch len(i.stack) { + case 0: + if err := i.batch.Set(i.tree.ndb.nodeKey(GetRootKey(i.version)), []byte{}); err != nil { return err } + case 1: + i.stack[0].nodeKey.nonce = 1 + if err := i.writeNode(i.stack[0]); err != nil { + return err + } + if i.stack[0].nodeKey.version < i.version { // it means there is no update in the given version + if err := i.batch.Set(i.tree.ndb.nodeKey(GetRootKey(i.version)), i.tree.ndb.nodeKeyPrefix(i.stack[0].nodeKey.version)); err != nil { + return err + } + } + default: + return fmt.Errorf("invalid node structure, found stack size %v when committing", + len(i.stack)) } - default: - return fmt.Errorf("invalid node structure, found stack size %v when committing", - len(i.stack)) } err := i.batch.WriteSync() diff --git a/import_test.go b/import_test.go index 13a925be7..a1ce06f0d 100644 --- a/import_test.go +++ b/import_test.go @@ -232,6 +232,43 @@ func TestImporter_Commit_Empty(t *testing.T) { assert.EqualValues(t, 3, tree.Version()) } +func TestImporter_OptimisticAdd(t *testing.T) { + k := []byte("rawStoreKey") + v := []byte("rawStoreValue") + + testcases := map[string]struct { + node *ExportNode + valid bool + }{ + "nil node": {nil, false}, + "trusted_valid": {&ExportNode{Key: k, Value: v, Version: 1, Height: 0}, true}, + "no key": {&ExportNode{Key: nil, Value: v, Version: 1, Height: 0}, false}, + "no value": {&ExportNode{Key: k, Value: nil, Version: 1, Height: 0}, false}, + // Only Key and Value used for Optimistic Add + // Version and Height is ignored + // further cases will be handled by Node.validate() + } + for desc, tc := range testcases { + tc := tc // appease scopelint + t.Run(desc, func(t *testing.T) { + tree := NewMutableTree(dbm.NewMemDB(), 0, false, log.NewNopLogger()) + importer, err := tree.Import(1) + require.NoError(t, err) + defer importer.Close() + + err = importer.OptimisticAdd(tc.node) + if tc.valid { + require.NoError(t, err) + } else { + if err == nil { + err = importer.Commit() + } + require.Error(t, err) + } + }) + } +} + func BenchmarkImport(b *testing.B) { benchmarkImport(b, 4096) } diff --git a/mutable_tree.go b/mutable_tree.go index a3ea3e148..aaf6463c7 100644 --- a/mutable_tree.go +++ b/mutable_tree.go @@ -200,6 +200,12 @@ func (tree *MutableTree) Import(version int64) (*Importer, error) { return newImporter(tree, version) } +// OptimisticImport returns an importer for tree nodes previously exported by ImmutableTree.OptimisticExport(), +// producing an identical IAVL tree. The caller must call Close() on the importer when done. +func (tree *MutableTree) OptimisticImport(version int64) (*Importer, error) { + return newOptimisticImporter(tree, version) +} + // Iterate iterates over all keys of the tree. The keys and values must not be modified, // since they may point to data stored within IAVL. Returns true if stopped by callnack, false otherwise func (tree *MutableTree) Iterate(fn func(key []byte, value []byte) bool) (stopped bool, err error) { diff --git a/nodedb.go b/nodedb.go index daafa5718..2ef1fb989 100644 --- a/nodedb.go +++ b/nodedb.go @@ -162,6 +162,52 @@ func (ndb *nodeDB) GetNode(nk []byte) (*Node, error) { return node, nil } +/* +// CV GetNodeKeyValueBytes gets a node's key and value bytes from memory or disk. +// It is used for both formats of nodes: legacy and new. +// `legacy`: nk is the hash of the node. `new`: . +// returns nodeKey []byte, nodeValue +func (ndb *nodeDB) GetNodeKeyValueBytes(nk []byte) ([]byte, []byte, error) { + ndb.mtx.Lock() + defer ndb.mtx.Unlock() + + if nk == nil { + return nil, nil, ErrNodeMissingNodeKey + } + + isLegcyNode := len(nk) == hashSize + var nodeKey []byte + if isLegcyNode { + nodeKey = ndb.legacyNodeKey(nk) + } else { + nodeKey = ndb.nodeKey(nk) + } + + valueBytes, err := ndb.db.Get(nodeKey) + if err != nil { + return nil, nil, fmt.Errorf("can't get node %v: %v", nk, err) + } + if valueBytes == nil { + return nil, nil, fmt.Errorf("Value missing for key %v corresponding to nodeKey %x", nk, nodeKey) + } + + return nodeKey, valueBytes, nil +} + +// CV SaveNodeKeyValueBytes saves a node to disk with raw byte arrays for key and value. +func (ndb *nodeDB) SaveNodeKeyValueBytes(keyBytes []byte, valueBytes []byte) error { + ndb.mtx.Lock() + defer ndb.mtx.Unlock() + + if err := ndb.batch.Set(keyBytes, valueBytes); err != nil { + return err + } + + ndb.logger.Debug("BATCH SAVE", "key", keyBytes, "value", valueBytes) + return nil +} +*/ + func (ndb *nodeDB) GetFastNode(key []byte) (*fastnode.Node, error) { if !ndb.hasUpgradedToFastStorage() { return nil, errors.New("storage version is not fast")