diff --git a/db.go b/db.go index 94ae191..b19aa1b 100644 --- a/db.go +++ b/db.go @@ -143,7 +143,7 @@ func Open(options Options) (*DB, error) { flushChan: make(chan *memtable, options.MemtableNums-1), closeflushChan: make(chan struct{}), closeCompactChan: make(chan struct{}), - compactChan: make(chan deprecatedState, len(memtables)), // asynchronous chan for noblocking recv + compactChan: make(chan deprecatedState), diskIO: diskIO, options: options, batchPool: sync.Pool{New: makeBatch}, @@ -200,12 +200,14 @@ func (db *DB) Close() error { return err } + db.flushLock.Lock() // persist deprecated number and total entry number deprecatedMetaPath := filepath.Join(db.options.DirPath, deprecatedMetaName) err := storeDeprecatedEntryMeta(deprecatedMetaPath, db.vlog.deprecatedNumber, db.vlog.totalNumber) if err != nil { return err } + defer db.flushLock.Unlock() // close value log if err = db.vlog.close(); err != nil { @@ -521,21 +523,30 @@ func (db *DB) flushMemtable(table *memtable) { db.immuMems = db.immuMems[1:] } } + db.sendThresholdState() +} +func (db *DB) sendThresholdState() { if db.options.AutoCompactSupport { // check deprecatedtable size lowerThreshold := uint32((float32)(db.vlog.totalNumber) * db.options.AdvisedCompactionRate) upperThreshold := uint32((float32)(db.vlog.totalNumber) * db.options.ForceCompactionRate) - + thresholdState := deprecatedState{ + thresholdState: ThresholdState(UnarriveThreshold), + } if db.vlog.deprecatedNumber >= upperThreshold { - db.compactChan <- deprecatedState{ + thresholdState = deprecatedState{ thresholdState: ThresholdState(ArriveForceThreshold), } } else if db.vlog.deprecatedNumber > lowerThreshold { - db.compactChan <- deprecatedState{ + thresholdState = deprecatedState{ thresholdState: ThresholdState(ArriveAdvisedThreshold), } } + select { + case db.compactChan <- thresholdState: + default: // this compacting, just do nothing. + } } } diff --git a/db_test.go b/db_test.go index 43b7ee0..c174ad2 100644 --- a/db_test.go +++ b/db_test.go @@ -1069,22 +1069,15 @@ func TestDeprecatetableMetaPersist(t *testing.T) { require.NoError(t, err) t.Run("test same deprecated number", func(t *testing.T) { - for i := 0; i <= 10; i++ { - // write logs and flush - logs := produceAndWriteLogs(20000, int64(i)*20000, db) - // delete logs - for idx, log := range logs { - if idx%5 == 0 { - _ = db.DeleteWithOptions(log.key, WriteOptions{ - Sync: true, - DisableWal: false, - }) - } - } + produceAndWriteLogs(100000, 0, db) + // overwrite half. background busy flushing. + for i := 0; i < 3; i++ { + time.Sleep(500 * time.Microsecond) + produceAndWriteLogs(50000, 0, db) } + db.Close() deprecatedNumberFirst := db.vlog.deprecatedNumber totalNumberFirst := db.vlog.totalNumber - db.Close() db, err = Open(options) deprecatedNumberSecond := db.vlog.deprecatedNumber totalNumberSecond := db.vlog.totalNumber