-
Notifications
You must be signed in to change notification settings - Fork 273
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(master): async pruning of orphan nodes #877
Conversation
Co-authored-by: Julien Robert <[email protected]>
Co-authored-by: mmsqe <[email protected]> Co-authored-by: Julien Robert <[email protected]>
Co-authored-by: Julien Robert <[email protected]>
Co-authored-by: khanh-notional <[email protected]>
Co-authored-by: Elias Naur <[email protected]>
…806) (#821) Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Marko Baricevic <[email protected]>
#822) Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
…#805) (#825) Co-authored-by: Matt, Park <[email protected]>
Co-authored-by: Marko Baricevic <[email protected]>
Co-authored-by: cool-developer <[email protected]>
…n error (backport #843) (#844) Co-authored-by: Emmanuel T Odeke <[email protected]>
Co-authored-by: cool-developer <[email protected]>
…er (backport #846) (#847) Co-authored-by: Emmanuel T Odeke <[email protected]>
This reverts commit d8c630d.
WalkthroughThe recent updates focus on enhancing performance, ensuring thread safety, and improving code clarity. Key changes include upgrading the Go version in the CI workflow, introducing thread safety measures in the Changes
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configration File (
|
nodedb.go
Outdated
var prevVersion, curVersion int64 | ||
var rootKeys [][]byte | ||
for ; itr.Valid(); itr.Next() { | ||
legacyRootKeyFormat.Scan(itr.Key(), &curVersion) | ||
rootKeys = append(rootKeys, itr.Key()) | ||
if prevVersion > 0 { | ||
if err := ndb.traverseOrphans(prevVersion, curVersion, func(orphan *Node) error { | ||
go func() { | ||
defer func() { | ||
isDeletingLegacyVersionsMutex.Lock() | ||
isDeletingLegacyVersions = false | ||
isDeletingLegacyVersionsMutex.Unlock() | ||
}() | ||
|
||
// Check if we have a legacy version | ||
itr, err := dbm.IteratePrefix(ndb.db, legacyRootKeyFormat.Key()) | ||
if err != nil { | ||
ndb.logger.Error(err.Error()) | ||
return | ||
} | ||
defer itr.Close() | ||
|
||
// Delete orphans for all legacy versions | ||
var prevVersion, curVersion int64 | ||
var rootKeys [][]byte | ||
counter := 0 | ||
for ; itr.Valid(); itr.Next() { | ||
legacyRootKeyFormat.Scan(itr.Key(), &curVersion) | ||
rootKeys = append(rootKeys, itr.Key()) | ||
if prevVersion > 0 { | ||
if err := ndb.traverseOrphans(prevVersion, curVersion, func(orphan *Node) error { | ||
counter++ | ||
if counter == 1000 { | ||
counter = 0 | ||
time.Sleep(1000 * time.Millisecond) | ||
fmt.Println("IAVL sleep happening") | ||
} | ||
return ndb.batch.Delete(ndb.nodeKey(orphan.GetKey())) | ||
}); err != nil { | ||
ndb.logger.Error(err.Error()) | ||
return | ||
} | ||
} | ||
prevVersion = curVersion | ||
} | ||
// Delete the last version for the legacyLastVersion | ||
if curVersion > 0 { | ||
legacyLatestVersion, err := ndb.getLegacyLatestVersion() | ||
if err != nil { | ||
ndb.logger.Error(err.Error()) | ||
return | ||
} | ||
if curVersion != legacyLatestVersion { | ||
ndb.logger.Error("expected legacyLatestVersion to be %d, got %d", legacyLatestVersion, curVersion) | ||
return | ||
} | ||
if err := ndb.traverseOrphans(curVersion, curVersion+1, func(orphan *Node) error { | ||
return ndb.batch.Delete(ndb.nodeKey(orphan.GetKey())) | ||
}); err != nil { | ||
return err | ||
ndb.logger.Error("failed to clean legacy orphans between versions", "err", err) | ||
return | ||
} | ||
} | ||
prevVersion = curVersion | ||
} | ||
// Delete the last version for the legacyLastVersion | ||
if curVersion > 0 { | ||
legacyLatestVersion, err := ndb.getLegacyLatestVersion() | ||
if err != nil { | ||
return err | ||
} | ||
if curVersion != legacyLatestVersion { | ||
return fmt.Errorf("expected legacyLatestVersion to be %d, got %d", legacyLatestVersion, curVersion) | ||
|
||
// Delete all roots of the legacy versions | ||
for _, rootKey := range rootKeys { | ||
if err := ndb.batch.Delete(rootKey); err != nil { | ||
ndb.logger.Error("failed to clean legacy orphans root keys", "err", err) | ||
return | ||
} | ||
} | ||
if err := ndb.traverseOrphans(curVersion, curVersion+1, func(orphan *Node) error { | ||
return ndb.batch.Delete(ndb.nodeKey(orphan.GetKey())) | ||
}); err != nil { | ||
return err | ||
|
||
// Initialize the legacy latest version to -1 to demonstrate that all legacy versions have been deleted | ||
ndb.legacyLatestVersion = -1 | ||
|
||
// Delete all orphan nodes of the legacy versions | ||
// TODO: Is this just deadcode????? | ||
if err := ndb.deleteOrphans(); err != nil { | ||
ndb.logger.Error("failed to clean legacy orphans", "err", err) | ||
return | ||
} | ||
}() | ||
|
||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The implementation of deleteLegacyVersions
uses a mutex to ensure that the deletion process is not initiated multiple times concurrently. This is a good practice for maintaining data integrity during potentially destructive operations. However, the presence of a sleep (time.Sleep(1000 * time.Millisecond)
) within a loop (lines 463-467) could indicate an attempt to mitigate a resource contention issue or rate limit the deletion process. This approach might not be optimal for performance and could benefit from a review to explore more efficient synchronization or batching strategies.
- time.Sleep(1000 * time.Millisecond)
+ // Consider implementing a more efficient synchronization or batching strategy.
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
isDeletingLegacyVersionsMutex.Lock() | |
if isDeletingLegacyVersions { | |
isDeletingLegacyVersionsMutex.Unlock() | |
return nil | |
} | |
defer itr.Close() | |
isDeletingLegacyVersions = true | |
isDeletingLegacyVersionsMutex.Unlock() | |
// Delete orphans for all legacy versions | |
var prevVersion, curVersion int64 | |
var rootKeys [][]byte | |
for ; itr.Valid(); itr.Next() { | |
legacyRootKeyFormat.Scan(itr.Key(), &curVersion) | |
rootKeys = append(rootKeys, itr.Key()) | |
if prevVersion > 0 { | |
if err := ndb.traverseOrphans(prevVersion, curVersion, func(orphan *Node) error { | |
go func() { | |
defer func() { | |
isDeletingLegacyVersionsMutex.Lock() | |
isDeletingLegacyVersions = false | |
isDeletingLegacyVersionsMutex.Unlock() | |
}() | |
// Check if we have a legacy version | |
itr, err := dbm.IteratePrefix(ndb.db, legacyRootKeyFormat.Key()) | |
if err != nil { | |
ndb.logger.Error(err.Error()) | |
return | |
} | |
defer itr.Close() | |
// Delete orphans for all legacy versions | |
var prevVersion, curVersion int64 | |
var rootKeys [][]byte | |
counter := 0 | |
for ; itr.Valid(); itr.Next() { | |
legacyRootKeyFormat.Scan(itr.Key(), &curVersion) | |
rootKeys = append(rootKeys, itr.Key()) | |
if prevVersion > 0 { | |
if err := ndb.traverseOrphans(prevVersion, curVersion, func(orphan *Node) error { | |
counter++ | |
if counter == 1000 { | |
counter = 0 | |
time.Sleep(1000 * time.Millisecond) | |
fmt.Println("IAVL sleep happening") | |
} | |
return ndb.batch.Delete(ndb.nodeKey(orphan.GetKey())) | |
}); err != nil { | |
ndb.logger.Error(err.Error()) | |
return | |
} | |
} | |
prevVersion = curVersion | |
} | |
// Delete the last version for the legacyLastVersion | |
if curVersion > 0 { | |
legacyLatestVersion, err := ndb.getLegacyLatestVersion() | |
if err != nil { | |
ndb.logger.Error(err.Error()) | |
return | |
} | |
if curVersion != legacyLatestVersion { | |
ndb.logger.Error("expected legacyLatestVersion to be %d, got %d", legacyLatestVersion, curVersion) | |
return | |
} | |
if err := ndb.traverseOrphans(curVersion, curVersion+1, func(orphan *Node) error { | |
return ndb.batch.Delete(ndb.nodeKey(orphan.GetKey())) | |
}); err != nil { | |
return err | |
ndb.logger.Error("failed to clean legacy orphans between versions", "err", err) | |
return | |
} | |
} | |
prevVersion = curVersion | |
} | |
// Delete the last version for the legacyLastVersion | |
if curVersion > 0 { | |
legacyLatestVersion, err := ndb.getLegacyLatestVersion() | |
if err != nil { | |
return err | |
} | |
if curVersion != legacyLatestVersion { | |
return fmt.Errorf("expected legacyLatestVersion to be %d, got %d", legacyLatestVersion, curVersion) | |
// Delete all roots of the legacy versions | |
for _, rootKey := range rootKeys { | |
if err := ndb.batch.Delete(rootKey); err != nil { | |
ndb.logger.Error("failed to clean legacy orphans root keys", "err", err) | |
return | |
} | |
} | |
if err := ndb.traverseOrphans(curVersion, curVersion+1, func(orphan *Node) error { | |
return ndb.batch.Delete(ndb.nodeKey(orphan.GetKey())) | |
}); err != nil { | |
return err | |
// Initialize the legacy latest version to -1 to demonstrate that all legacy versions have been deleted | |
ndb.legacyLatestVersion = -1 | |
// Delete all orphan nodes of the legacy versions | |
// TODO: Is this just deadcode????? | |
if err := ndb.deleteOrphans(); err != nil { | |
ndb.logger.Error("failed to clean legacy orphans", "err", err) | |
return | |
} | |
}() | |
return nil | |
} | |
isDeletingLegacyVersionsMutex.Lock() | |
if isDeletingLegacyVersions { | |
isDeletingLegacyVersionsMutex.Unlock() | |
return nil | |
} | |
isDeletingLegacyVersions = true | |
isDeletingLegacyVersionsMutex.Unlock() | |
go func() { | |
defer func() { | |
isDeletingLegacyVersionsMutex.Lock() | |
isDeletingLegacyVersions = false | |
isDeletingLegacyVersionsMutex.Unlock() | |
}() | |
// Check if we have a legacy version | |
itr, err := dbm.IteratePrefix(ndb.db, legacyRootKeyFormat.Key()) | |
if err != nil { | |
ndb.logger.Error(err.Error()) | |
return | |
} | |
defer itr.Close() | |
// Delete orphans for all legacy versions | |
var prevVersion, curVersion int64 | |
var rootKeys [][]byte | |
counter := 0 | |
for ; itr.Valid(); itr.Next() { | |
legacyRootKeyFormat.Scan(itr.Key(), &curVersion) | |
rootKeys = append(rootKeys, itr.Key()) | |
if prevVersion > 0 { | |
if err := ndb.traverseOrphans(prevVersion, curVersion, func(orphan *Node) error { | |
counter++ | |
if counter == 1000 { | |
counter = 0 | |
// Consider implementing a more efficient synchronization or batching strategy. | |
fmt.Println("IAVL sleep happening") | |
} | |
return ndb.batch.Delete(ndb.nodeKey(orphan.GetKey())) | |
}); err != nil { | |
ndb.logger.Error(err.Error()) | |
return | |
} | |
} | |
prevVersion = curVersion | |
} | |
// Delete the last version for the legacyLastVersion | |
if curVersion > 0 { | |
legacyLatestVersion, err := ndb.getLegacyLatestVersion() | |
if err != nil { | |
ndb.logger.Error(err.Error()) | |
return | |
} | |
if curVersion != legacyLatestVersion { | |
ndb.logger.Error("expected legacyLatestVersion to be %d, got %d", legacyLatestVersion, curVersion) | |
return | |
} | |
if err := ndb.traverseOrphans(curVersion, curVersion+1, func(orphan *Node) error { | |
return ndb.batch.Delete(ndb.nodeKey(orphan.GetKey())) | |
}); err != nil { | |
ndb.logger.Error("failed to clean legacy orphans between versions", "err", err) | |
return | |
} | |
} | |
// Delete all roots of the legacy versions | |
for _, rootKey := range rootKeys { | |
if err := ndb.batch.Delete(rootKey); err != nil { | |
ndb.logger.Error("failed to clean legacy orphans root keys", "err", err) | |
return | |
} | |
} | |
// Initialize the legacy latest version to -1 to demonstrate that all legacy versions have been deleted | |
ndb.legacyLatestVersion = -1 | |
// Delete all orphan nodes of the legacy versions | |
// TODO: Is this just deadcode????? | |
if err := ndb.deleteOrphans(); err != nil { | |
ndb.logger.Error("failed to clean legacy orphans", "err", err) | |
return | |
} | |
}() | |
return nil | |
} |
Co-authored-by: cool-developer <[email protected]>
legacyRootKeyFormat.Scan(itr.Key(), &curVersion) | ||
rootKeys = append(rootKeys, itr.Key()) | ||
if prevVersion > 0 { | ||
if err := ndb.traverseOrphans(prevVersion, curVersion, func(orphan *Node) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cool-develope Isn't it true that v1 does not store orphans explicitly, and identifies them by traversing trees at version n and n+1? This being the case, why can deleteLegacyVersions
not simply iterate on the orphan key prefix in leveldb and delete all of them? Am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it can simple iterate orphan values
@@ -46,6 +49,9 @@ func (b *BatchWithFlusher) estimateSizeAfterSetting(key []byte, value []byte) (i | |||
// the batch is flushed to disk, cleared, and a new one is created with buffer pre-allocated to threshold. | |||
// The addition entry is then added to the batch. | |||
func (b *BatchWithFlusher) Set(key, value []byte) error { | |||
b.mtx.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If each batch accumulates in memory before flushing to disk doesn't it make sense to lock at write instead of Set/Delete?
counter++ | ||
if counter == 1000 { | ||
counter = 0 | ||
time.Sleep(1000 * time.Millisecond) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this sleep to create a write gap so that SaveVersion
doesn't block too frequently from the newly introduced mutex in BatchWithFlusher
while a long prune operation is ongoing?
If deleteLegacyVersions
is only called once on migration I guess it's OK, but this sleep is hard to understand and could result in unexpected wait times from the main I/o thread. I'm also curious about the overhead of mutex/set release on every Set/Delete
call.
@czarcas7ic could you please wire up this PR? only encoding and fastKeyFormatter features |
@cool-develope Just verifying, you only want the FastPrefixFormatter change as well as encoding.go change, and remove everything else? My apologies, I am between multiple tasks right now so am losing context on this. |
we can close it, -> #923 |
Thanks, my apologies for the extra overhead that this may have caused! |
This feature prevents the multiple hour long waiting period when chains upgrade from previous versions of IAVL to IAVL v1. The time comes from pruning orphan nodes. This synchronously prunes them. This branch has been NOT been tested against osmosis mainnet, however the v1.x.x version of this PR has.