Skip to content

Commit

Permalink
AutoCompact & CompactWithDeprecateTable Support (#165)
Browse files Browse the repository at this point in the history
* fix the deadlock when closing db

* Add deprecatedtable and test, import google.uuid.

* Add vlog&bptree uuid support, and update some related tests.

* Add deprecatedtable and test, import google.uuid.

* Add vlog&bptree uuid support, and update some related tests.

* deprecatedtable add lower and upper threshold, which used for notify autoCompact.
And add default value of  lower and upper threshold in options.

* Add autocompact framwork, this commit just for merge TestComapct bug.

* This commit just for comparison, output:
=== RUN   TestDBCompact/test_compaction
shard:1 Function took existTime:395.82922ms,reader:535.043275ms,rewriteTime:790.151805ms,all:1.945597922s
shard:2 Function took existTime:409.16579ms,reader:547.093471ms,rewriteTime:810.14034ms,all:1.996946539s
shard:0 Function took existTime:414.753128ms,reader:547.752194ms,rewriteTime:826.628021ms,all:2.024067234s

=== RUN   TestDBCompactWitchDeprecateable/test_compaction
shard:1 Function took existTime:15.4204ms,reader:563.989168ms,rewriteTime:910.374564ms,all:1.67209614s
shard:0 Function took existTime:21.662887ms,reader:587.597162ms,rewriteTime:996.416705ms,all:1.794619638s
shard:2 Function took existTime:16.989118ms,reader:627.684099ms,rewriteTime:996.287388ms,all:1.833219848s

* add iostate support

* This commit can be fully tested by adding uuid and deprecateable table features.
Support printing performance data for two different types of compacts and TestautoCompact.

* this commit for pr.

* fix some formats

* fix format

* fix format for lint

* fix format

* golint disable gosec

* gomat files

* bptree put return old uuid.

* bptree delete return old entry.

* format golint.

* fix test db path bug.

* fix test filename bug.

* support diskio monitor.

* format.

* IO monitoring set threshold by collecting IO bandwith in flushmemtable.

* fmt

* IO monitor uses IoTime rate.

* use rate to check deprecatedtable, persist total number in vlog.

* fmt

* fix testdb name

* fix deadlock in autocompact

* reduce test size

* autocompact coroutine graceful exit

* try fix err:segment file is closed when close db

* fmt

* send compact msg with db.mu

* fmt

* without busyIO

* diskio only work in linux

* golint

* fmt

* fix deadlock in flushmemtable

* fmt

* Encapsulate meta load/store functions. Use struct{} as dptable value.

* support longer testkey.

* fix deadlock in close

* fmt

* change compactChan buffer

* remove test log

* change flushmemtable lock range

* compactChan noblocking

* rebase for main

* fmt

* sync go.sum

* add compact optimization, using BatchCapacity to submit rewriting.

* go fmt and golint.
  • Loading branch information
KANIOYH authored Oct 14, 2024
1 parent fb79d20 commit d2bad43
Show file tree
Hide file tree
Showing 24 changed files with 1,376 additions and 171 deletions.
4 changes: 2 additions & 2 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -227,10 +227,10 @@ linters:
- godot # checks if comments end in a period
# - goimports # in addition to fixing imports, goimports also formats your code in the same style as gofmt
- gomnd # detects magic numbers
- gomoddirectives # manages the use of 'replace', 'retract', and 'excludes' directives in go.mod
# - gomoddirectives # manages the use of 'replace', 'retract', and 'excludes' directives in go.mod
- gomodguard # allow and block lists linter for direct Go module dependencies. This is different from depguard where there are different block types for example version constraints and module recommendations
- goprintffuncname # checks that printf-like functions are named with f at the end
- gosec # inspects source code for security problems
# - gosec # inspects source code for security problems
- intrange # finds places where for loops could make use of an integer range
- lll # reports long lines
- loggercheck # checks key value pairs for common logger libraries (kitlog,klog,logr,zap)
Expand Down
6 changes: 3 additions & 3 deletions benchmark/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func BenchmarkPut(b *testing.B) {
b.ResetTimer()

for i := 0; i < b.N; i++ {
err := db.Put(util.GetTestKey(i), util.RandomValue(1024))
err := db.Put(util.GetTestKey(int64(i)), util.RandomValue(1024))
//nolint:testifylint // benchmark
assert.Nil(b, err)
}
Expand All @@ -45,15 +45,15 @@ func BenchmarkGet(b *testing.B) {
destroy := openDB()
defer destroy()
for i := 0; i < 1000000; i++ {
err := db.Put(util.GetTestKey(i), util.RandomValue(128))
err := db.Put(util.GetTestKey(int64(i)), util.RandomValue(128))
//nolint:testifylint // benchmark
assert.Nil(b, err)
}
b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
val, err := db.Get(util.GetTestKey(i))
val, err := db.Get(util.GetTestKey(int64(i)))
if err == nil {
assert.NotNil(b, val)
} else if errors.Is(err, lotusdb.ErrKeyNotFound) {
Expand Down
101 changes: 89 additions & 12 deletions bptree.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"path/filepath"

"github.com/google/uuid"
"github.com/rosedblabs/diskhash"
"github.com/rosedblabs/wal"
"go.etcd.io/bbolt"
Expand Down Expand Up @@ -81,7 +82,11 @@ func (bt *BPTree) Get(key []byte, _ ...diskhash.MatchKeyFunc) (*KeyPosition, err
if len(value) != 0 {
keyPos = new(KeyPosition)
keyPos.key, keyPos.partition = key, uint32(p)
keyPos.position = wal.DecodeChunkPosition(value)
err := keyPos.uid.UnmarshalBinary(value[:len(keyPos.uid)])
if err != nil {
return err
}
keyPos.position = wal.DecodeChunkPosition(value[len(keyPos.uid):])
}
return nil
}); err != nil {
Expand All @@ -92,9 +97,11 @@ func (bt *BPTree) Get(key []byte, _ ...diskhash.MatchKeyFunc) (*KeyPosition, err
}

// PutBatch puts the specified key positions into the index.
func (bt *BPTree) PutBatch(positions []*KeyPosition, _ ...diskhash.MatchKeyFunc) error {
//
//nolint:gocognit
func (bt *BPTree) PutBatch(positions []*KeyPosition, _ ...diskhash.MatchKeyFunc) ([]*KeyPosition, error) {
if len(positions) == 0 {
return nil
return nil, nil
}

// group positions by partition
Expand All @@ -104,7 +111,10 @@ func (bt *BPTree) PutBatch(positions []*KeyPosition, _ ...diskhash.MatchKeyFunc)
partitionRecords[p] = append(partitionRecords[p], pos)
}

// create chan to collect deprecated entry
deprecatedChan := make(chan []*KeyPosition, len(partitionRecords))
g, ctx := errgroup.WithContext(context.Background())

for i := range partitionRecords {
partition := i
if len(partitionRecords[partition]) == 0 {
Expand All @@ -113,34 +123,68 @@ func (bt *BPTree) PutBatch(positions []*KeyPosition, _ ...diskhash.MatchKeyFunc)
g.Go(func() error {
// get the bolt db instance for this partition
tree := bt.trees[partition]
return tree.Update(func(tx *bbolt.Tx) error {
partitionDeprecatedKeyPosition := make([]*KeyPosition, 0)
err := tree.Update(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(indexBucketName)
// put each record into the bucket
for _, record := range partitionRecords[partition] {
select {
case <-ctx.Done():
return ctx.Err()
default:
uidBytes, _ := record.uid.MarshalBinary()
encPos := record.position.Encode()
if err := bucket.Put(record.key, encPos); err != nil {
//nolint:gocritic // Need to combine uidbytes with encPos and place them in bptree
valueBytes := append(uidBytes, encPos...)
if err, oldValue := bucket.Put(record.key, valueBytes); err != nil {
if errors.Is(err, bbolt.ErrKeyRequired) {
return ErrKeyIsEmpty
}
return err
} else if oldValue != nil {
keyPos := new(KeyPosition)
keyPos.key, keyPos.partition = record.key, record.partition
err = keyPos.uid.UnmarshalBinary(oldValue[:len(keyPos.uid)])
if err != nil {
return err
}
keyPos.position = wal.DecodeChunkPosition(oldValue[len(keyPos.uid):])
partitionDeprecatedKeyPosition = append(partitionDeprecatedKeyPosition, keyPos)
}
}
}
return nil
})
// send deprecateduuid uuid slice to chan
deprecatedChan <- partitionDeprecatedKeyPosition
return err
})
}
return g.Wait()
// Close the channel after all goroutines are done
go func() {
_ = g.Wait()
close(deprecatedChan)
}()

var deprecatedKeyPosition []*KeyPosition
for partitionDeprecatedKeyPosition := range deprecatedChan {
deprecatedKeyPosition = append(deprecatedKeyPosition, partitionDeprecatedKeyPosition...)
}

// Wait for all goroutines to finish
if err := g.Wait(); err != nil {
return nil, err
}

return deprecatedKeyPosition, nil
}

// DeleteBatch deletes the specified keys from the index.
func (bt *BPTree) DeleteBatch(keys [][]byte, _ ...diskhash.MatchKeyFunc) error {
//
//nolint:gocognit
func (bt *BPTree) DeleteBatch(keys [][]byte, _ ...diskhash.MatchKeyFunc) ([]*KeyPosition, error) {
if len(keys) == 0 {
return nil
return nil, nil
}

// group keys by partition
Expand All @@ -150,6 +194,9 @@ func (bt *BPTree) DeleteBatch(keys [][]byte, _ ...diskhash.MatchKeyFunc) error {
partitionKeys[p] = append(partitionKeys[p], key)
}

// create chan to collect deprecated entry
deprecatedChan := make(chan []*KeyPosition, len(partitionKeys))

// delete keys from each partition
g, ctx := errgroup.WithContext(context.Background())
for i := range partitionKeys {
Expand All @@ -159,7 +206,8 @@ func (bt *BPTree) DeleteBatch(keys [][]byte, _ ...diskhash.MatchKeyFunc) error {
}
g.Go(func() error {
tree := bt.trees[partition]
return tree.Update(func(tx *bbolt.Tx) error {
partitionDeprecatedKeyPosition := make([]*KeyPosition, 0)
err := tree.Update(func(tx *bbolt.Tx) error {
// get the bolt db instance for this partition
bucket := tx.Bucket(indexBucketName)
// delete each key from the bucket
Expand All @@ -171,16 +219,44 @@ func (bt *BPTree) DeleteBatch(keys [][]byte, _ ...diskhash.MatchKeyFunc) error {
if len(key) == 0 {
return ErrKeyIsEmpty
}
if err := bucket.Delete(key); err != nil {
if err, oldValue := bucket.Delete(key); err != nil {
return err
} else if oldValue != nil {
keyPos := new(KeyPosition)
keyPos.key, keyPos.partition = key, uint32(partition)
err = keyPos.uid.UnmarshalBinary(oldValue[:len(keyPos.uid)])
if err != nil {
return err
}
keyPos.position = wal.DecodeChunkPosition(oldValue[len(keyPos.uid):])
partitionDeprecatedKeyPosition = append(partitionDeprecatedKeyPosition, keyPos)
}
}
}
return nil
})
// send deprecateduuid uuid slice to chan
deprecatedChan <- partitionDeprecatedKeyPosition
return err
})
}
return g.Wait()
// Close the channel after all goroutines are done
go func() {
_ = g.Wait()
close(deprecatedChan)
}()

var deprecatedKeyPosition []*KeyPosition
for partitionDeprecatedKeyPosition := range deprecatedChan {
deprecatedKeyPosition = append(deprecatedKeyPosition, partitionDeprecatedKeyPosition...)
}

// Wait for all goroutines to finish
if err := g.Wait(); err != nil {
return nil, err
}

return deprecatedKeyPosition, nil
}

// Close releases all boltdb database resources.
Expand Down Expand Up @@ -291,7 +367,8 @@ func (bi *bptreeIterator) Key() []byte {

// Value get the current value.
func (bi *bptreeIterator) Value() any {
return bi.value
var uid uuid.UUID
return bi.value[len(uid):]
}

// Valid returns whether the iterator is exhausted.
Expand Down
Loading

0 comments on commit d2bad43

Please sign in to comment.