Skip to content

Commit

Permalink
post order load snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
cool-develope committed Nov 27, 2023
1 parent 00de27d commit bb9cdc6
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 30 deletions.
2 changes: 1 addition & 1 deletion multitree.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func ImportMultiTree(pool *NodePool, version int64, path string, treeOpts TreeOp
return nil, err
}
go func(p string) {
root, importErr := sql.ImportSnapshotFromTable(version, false)
root, importErr := sql.ImportSnapshotFromTable(version, PreOrder, false)

tree := NewTree(sql, pool, mt.treeOpts)
tree.root = root
Expand Down
99 changes: 81 additions & 18 deletions snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,12 +315,18 @@ type SnapshotNode struct {
Height int8
}

func (sql *SqliteDb) ImportSnapshotFromTable(version int64, loadLeaves bool) (*Node, error) {
func (sql *SqliteDb) ImportSnapshotFromTable(version int64, traverseOrder TraverseOrderType, loadLeaves bool) (*Node, error) {
read, err := sql.getReadConn()
if err != nil {
return nil, err
}
q, err := read.Prepare(fmt.Sprintf("SELECT version, sequence, bytes FROM snapshot_%d ORDER BY ordinal", version))

var q *sqlite3.Stmt
if traverseOrder == PostOrder {
q, err = read.Prepare(fmt.Sprintf("SELECT version, sequence, bytes FROM snapshot_%d ORDER BY ordinal DESC", version))
} else if traverseOrder == PreOrder {
q, err = read.Prepare(fmt.Sprintf("SELECT version, sequence, bytes FROM snapshot_%d ORDER BY ordinal ASC", version))
}
if err != nil {
return nil, err
}
Expand All @@ -338,7 +344,12 @@ func (sql *SqliteDb) ImportSnapshotFromTable(version int64, loadLeaves bool) (*N
since: time.Now(),
log: log.With().Str("path", sql.opts.Path).Logger(),
}
root, err := imp.queryStep()
var root *Node
if traverseOrder == PostOrder {
root, err = imp.queryStepPostOrder()
} else if traverseOrder == PreOrder {
root, err = imp.queryStepPreOrder()
}
if err != nil {
return nil, err
}
Expand All @@ -356,7 +367,7 @@ func (sql *SqliteDb) ImportSnapshotFromTable(version int64, loadLeaves bool) (*N
return root, nil
}

func (sql *SqliteDb) ImportMostRecentSnapshot(targetVersion int64, loadLeaves bool) (*Node, int64, error) {
func (sql *SqliteDb) ImportMostRecentSnapshot(targetVersion int64, traverseOrder TraverseOrderType, loadLeaves bool) (*Node, int64, error) {
read, err := sql.getReadConn()
if err != nil {
return nil, 0, err
Expand Down Expand Up @@ -401,7 +412,7 @@ func (sql *SqliteDb) ImportMostRecentSnapshot(targetVersion int64, loadLeaves bo
}
}

root, err := sql.ImportSnapshotFromTable(version, loadLeaves)
root, err := sql.ImportSnapshotFromTable(version, traverseOrder, loadLeaves)
if err != nil {
return nil, 0, err
}
Expand Down Expand Up @@ -570,11 +581,13 @@ func (snap *sqliteSnapshot) restorePostOrderStep(nextFn func() (*SnapshotNode, e
break
}

ordinal := snap.ordinal

uniqueVersions[snapshotNode.Version] = struct{}{}
node := &Node{
key: snapshotNode.Key,
subtreeHeight: snapshotNode.Height,
nodeKey: NewNodeKey(snapshotNode.Version, uint32(snap.ordinal)),
nodeKey: NewNodeKey(snapshotNode.Version, uint32(ordinal)),
}

stackSize := len(stack)
Expand All @@ -587,7 +600,7 @@ func (snap *sqliteSnapshot) restorePostOrderStep(nextFn func() (*SnapshotNode, e
}

count++
if err := snap.writeSnapNode(node, snapshotNode.Version, count); err != nil {
if err := snap.writeSnapNode(node, snapshotNode.Version, count, ordinal, count); err != nil {
return nil, nil, err
}
} else if stackSize >= 2 && stack[stackSize-1].subtreeHeight < node.subtreeHeight && stack[stackSize-2].subtreeHeight < node.subtreeHeight {
Expand All @@ -603,7 +616,7 @@ func (snap *sqliteSnapshot) restorePostOrderStep(nextFn func() (*SnapshotNode, e
node.rightNode = nil

count++
if err := snap.writeSnapNode(node, snapshotNode.Version, count); err != nil {
if err := snap.writeSnapNode(node, snapshotNode.Version, count, ordinal, count); err != nil {
return nil, nil, err
}
}
Expand Down Expand Up @@ -636,10 +649,13 @@ func (snap *sqliteSnapshot) restorePreOrderStep(nextFn func() (*SnapshotNode, er
return nil, err
}

ordinal := snap.ordinal
snap.ordinal++

node := &Node{
key: snapshotNode.Key,
subtreeHeight: snapshotNode.Height,
nodeKey: NewNodeKey(snapshotNode.Version, uint32(snap.ordinal)),
nodeKey: NewNodeKey(snapshotNode.Version, uint32(ordinal)),
}

if node.isLeaf() {
Expand Down Expand Up @@ -669,7 +685,7 @@ func (snap *sqliteSnapshot) restorePreOrderStep(nextFn func() (*SnapshotNode, er
}

count++
if err := snap.writeSnapNode(node, snapshotNode.Version, count); err != nil {
if err := snap.writeSnapNode(node, snapshotNode.Version, ordinal, ordinal, count); err != nil {
return nil, err
}
snap.ordinal++
Expand All @@ -682,22 +698,20 @@ func (snap *sqliteSnapshot) restorePreOrderStep(nextFn func() (*SnapshotNode, er
return node, uniqueVersions, err
}

func (snap *sqliteSnapshot) writeSnapNode(node *Node, version int64, count int) error {
ordinal := snap.ordinal

func (snap *sqliteSnapshot) writeSnapNode(node *Node, version int64, ordinal, sequence, count int) error {
nodeBz, err := node.Bytes()
if err != nil {
return err
}
if err = snap.snapshotInsert.Exec(ordinal, version, ordinal, nodeBz); err != nil {
if err = snap.snapshotInsert.Exec(ordinal, version, sequence, nodeBz); err != nil {
return err
}
if node.isLeaf() {
if err = snap.leafInsert.Exec(version, ordinal, nodeBz); err != nil {
return err
}
} else {
if err = snap.treeInsert.Exec(version, ordinal, nodeBz); err != nil {
if err = snap.treeInsert.Exec(version, sequence, nodeBz); err != nil {
return err
}
}
Expand Down Expand Up @@ -736,7 +750,7 @@ type sqliteImport struct {
log zerolog.Logger
}

func (sqlImport *sqliteImport) queryStep() (node *Node, err error) {
func (sqlImport *sqliteImport) queryStepPreOrder() (node *Node, err error) {
sqlImport.i++
if sqlImport.i%1_000_000 == 0 {
sqlImport.log.Debug().Msgf("import: nodes=%s, node/s=%s",
Expand Down Expand Up @@ -773,13 +787,62 @@ func (sqlImport *sqliteImport) queryStep() (node *Node, err error) {
return nil, nil
}

node.leftNode, err = sqlImport.queryStep()
node.leftNode, err = sqlImport.queryStepPreOrder()
if err != nil {
return nil, err
}
node.rightNode, err = sqlImport.queryStep()
node.rightNode, err = sqlImport.queryStepPreOrder()
if err != nil {
return nil, err
}
return node, nil
}

func (sqlImport *sqliteImport) queryStepPostOrder() (node *Node, err error) {
sqlImport.i++
if sqlImport.i%1_000_000 == 0 {
sqlImport.log.Debug().Msgf("import: nodes=%s, node/s=%s",
humanize.Comma(sqlImport.i),
humanize.Comma(int64(float64(1_000_000)/time.Since(sqlImport.since).Seconds())),
)
sqlImport.since = time.Now()
}

hasRow, err := sqlImport.query.Step()
if !hasRow {
return nil, nil
}
if err != nil {
return nil, err
}
var bz sqlite3.RawBytes
var version, seq int
err = sqlImport.query.Scan(&version, &seq, &bz)
if err != nil {
return nil, err
}
nodeKey := NewNodeKey(int64(version), uint32(seq))
node, err = MakeNode(sqlImport.pool, nodeKey, bz)
if err != nil {
return nil, err
}

if node.isLeaf() && sqlImport.i > 1 {
if sqlImport.loadLeaves {
return node, nil
}
sqlImport.pool.Put(node)
return nil, nil
}

node.rightNode, err = sqlImport.queryStepPostOrder()
if err != nil {
return nil, err
}
node.leftNode, err = sqlImport.queryStepPostOrder()
if err != nil {
return nil, err
}

return node, nil
}
4 changes: 2 additions & 2 deletions tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ func (tree *Tree) LoadVersion(version int64) error {
return nil
}

func (tree *Tree) LoadSnapshot(version int64) (err error) {
func (tree *Tree) LoadSnapshot(version int64, traverseOrder TraverseOrderType) (err error) {
var v int64
tree.root, v, err = tree.sql.ImportMostRecentSnapshot(version, true)
tree.root, v, err = tree.sql.ImportMostRecentSnapshot(version, traverseOrder, true)
if err != nil {
return err
}
Expand Down
31 changes: 22 additions & 9 deletions tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,26 +202,39 @@ func TestTree_Build_Load(t *testing.T) {
opts.UntilHash = "3a037f8dd67a5e1a9ef83a53b81c619c9ac0233abee6f34a400fb9b9dfbb4f8d"
testTreeBuild(t, mt, opts)

// export the tree at version 12,000 and import it into a sql db
ctx := context.Background()
// export the tree at version 12,000 and import it into a sql db in pre-order
traverseOrder := PreOrder
restoreMt := NewMultiTree(t.TempDir(), TreeOptions{CheckpointInterval: 4000})
restorePreOrderMt := NewMultiTree(t.TempDir(), TreeOptions{CheckpointInterval: 4000})
for sk, tree := range multiTree.Trees {
require.NoError(t, restoreMt.MountTree(sk))
require.NoError(t, restorePreOrderMt.MountTree(sk))
exporter := tree.Export(traverseOrder)

restoreTree := restoreMt.Trees[sk]
_, err := restoreTree.sql.WriteSnapshot(ctx, tree.Version(), exporter.Next, SnapshotOptions{SaveTree: true, TraverseOrder: traverseOrder})
restoreTree := restorePreOrderMt.Trees[sk]
_, err := restoreTree.sql.WriteSnapshot(context.Background(), tree.Version(), exporter.Next, SnapshotOptions{SaveTree: true, TraverseOrder: traverseOrder})
require.NoError(t, err)
require.NoError(t, restoreTree.LoadSnapshot(tree.Version()))
require.NoError(t, restoreTree.LoadSnapshot(tree.Version(), traverseOrder))
}

// export the tree at version 12,000 and import it into a sql db in post-order
traverseOrder = PostOrder
restorePostOrderMt := NewMultiTree(t.TempDir(), TreeOptions{CheckpointInterval: 4000})
for sk, tree := range multiTree.Trees {
require.NoError(t, restorePostOrderMt.MountTree(sk))
exporter := tree.Export(traverseOrder)

restoreTree := restorePostOrderMt.Trees[sk]
_, err := restoreTree.sql.WriteSnapshot(context.Background(), tree.Version(), exporter.Next, SnapshotOptions{SaveTree: true, TraverseOrder: traverseOrder})
require.NoError(t, err)
require.NoError(t, restoreTree.LoadSnapshot(tree.Version(), traverseOrder))
}
require.Equal(t, restorePostOrderMt.Hash(), restorePreOrderMt.Hash())

// play changes until version 20_000
require.NoError(t, opts.Iterator.Next())
require.Equal(t, int64(12_001), opts.Iterator.Version())
opts.Until = 20_000
opts.UntilHash = "25907b193c697903218d92fa70a87ef6cdd6fa5b9162d955a4d70a9d5d2c4824"
testTreeBuild(t, restoreMt, opts)
testTreeBuild(t, restorePostOrderMt, opts)
}

func TestOsmoLike_HotStart(t *testing.T) {
Expand Down Expand Up @@ -259,7 +272,7 @@ func TestTree_Import(t *testing.T) {
sql, err := NewSqliteDb(pool, SqliteDbOptions{Path: tmpDir})
require.NoError(t, err)

root, err := sql.ImportSnapshotFromTable(1, true)
root, err := sql.ImportSnapshotFromTable(1, PreOrder, true)
require.NoError(t, err)
require.NotNil(t, root)
}
Expand Down

0 comments on commit bb9cdc6

Please sign in to comment.