Skip to content

Commit

Permalink
compactChan noblocking
Browse files Browse the repository at this point in the history
  • Loading branch information
KANIOYH committed Sep 24, 2024
1 parent 5085612 commit 7d1ad08
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 17 deletions.
19 changes: 15 additions & 4 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func Open(options Options) (*DB, error) {
flushChan: make(chan *memtable, options.MemtableNums-1),
closeflushChan: make(chan struct{}),
closeCompactChan: make(chan struct{}),
compactChan: make(chan deprecatedState, len(memtables)), // asynchronous chan for noblocking recv
compactChan: make(chan deprecatedState),
diskIO: diskIO,
options: options,
batchPool: sync.Pool{New: makeBatch},
Expand Down Expand Up @@ -200,12 +200,14 @@ func (db *DB) Close() error {
return err
}

db.flushLock.Lock()
// persist deprecated number and total entry number
deprecatedMetaPath := filepath.Join(db.options.DirPath, deprecatedMetaName)
err := storeDeprecatedEntryMeta(deprecatedMetaPath, db.vlog.deprecatedNumber, db.vlog.totalNumber)
if err != nil {
return err
}
defer db.flushLock.Unlock()

// close value log
if err = db.vlog.close(); err != nil {
Expand Down Expand Up @@ -521,21 +523,30 @@ func (db *DB) flushMemtable(table *memtable) {
db.immuMems = db.immuMems[1:]
}
}
db.sendThresholdState()
}

func (db *DB) sendThresholdState() {
if db.options.AutoCompactSupport {
// check deprecatedtable size
lowerThreshold := uint32((float32)(db.vlog.totalNumber) * db.options.AdvisedCompactionRate)
upperThreshold := uint32((float32)(db.vlog.totalNumber) * db.options.ForceCompactionRate)

thresholdState := deprecatedState{
thresholdState: ThresholdState(UnarriveThreshold),
}
if db.vlog.deprecatedNumber >= upperThreshold {
db.compactChan <- deprecatedState{
thresholdState = deprecatedState{
thresholdState: ThresholdState(ArriveForceThreshold),
}
} else if db.vlog.deprecatedNumber > lowerThreshold {
db.compactChan <- deprecatedState{
thresholdState = deprecatedState{
thresholdState: ThresholdState(ArriveAdvisedThreshold),
}
}
select {
case db.compactChan <- thresholdState:
default: // this compacting, just do nothing.
}
}
}

Expand Down
19 changes: 6 additions & 13 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1069,22 +1069,15 @@ func TestDeprecatetableMetaPersist(t *testing.T) {
require.NoError(t, err)

t.Run("test same deprecated number", func(t *testing.T) {
for i := 0; i <= 10; i++ {
// write logs and flush
logs := produceAndWriteLogs(20000, int64(i)*20000, db)
// delete logs
for idx, log := range logs {
if idx%5 == 0 {
_ = db.DeleteWithOptions(log.key, WriteOptions{
Sync: true,
DisableWal: false,
})
}
}
produceAndWriteLogs(100000, 0, db)
// overwrite half. background busy flushing.
for i := 0; i < 3; i++ {
time.Sleep(500 * time.Microsecond)
produceAndWriteLogs(50000, 0, db)
}
db.Close()
deprecatedNumberFirst := db.vlog.deprecatedNumber
totalNumberFirst := db.vlog.totalNumber
db.Close()
db, err = Open(options)
deprecatedNumberSecond := db.vlog.deprecatedNumber
totalNumberSecond := db.vlog.totalNumber
Expand Down

0 comments on commit 7d1ad08

Please sign in to comment.