Skip to content

Commit

Permalink
support diskio monitor.
Browse files Browse the repository at this point in the history
  • Loading branch information
KANIOYH committed Sep 4, 2024
1 parent 0f3f828 commit b5738fc
Show file tree
Hide file tree
Showing 6 changed files with 204 additions and 6 deletions.
29 changes: 23 additions & 6 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type DB struct {
flushChan chan *memtable // flushChan is used to notify the flush goroutine to flush memtable to disk.
flushLock sync.Mutex // flushLock is to prevent flush running while compaction doesn't occur
compactChan chan deprecatedState // compactChan is used to notify the shard need to compact
diskIO DiskIO // monitoring the IO status of disks and allowing autoCompact when appropriate
mu sync.RWMutex
closed bool
closeChan chan struct{}
Expand All @@ -62,6 +63,8 @@ type DB struct {
//
// It will first open the wal to rebuild the memtable, then open the index and value log.
// Return the DB object if succeeded, otherwise return the error.
//
//nolint:funlen // default
func Open(options Options) (*DB, error) {
// check whether all options are valid
if err := validateOptions(&options); err != nil {
Expand Down Expand Up @@ -145,6 +148,13 @@ func Open(options Options) (*DB, error) {
return nil, err
}

diskIO := DiskIO{
targetPath: options.DirPath,
samplingInterval: options.diskIOSamplingInterval,
readBusyThreshold: options.diskIOReadBusyThreshold,
writeBusyThreshold: options.diskIOWriteBusyThreshold,
}

db := &DB{
activeMem: memtables[len(memtables)-1],
immuMems: memtables[:len(memtables)-1],
Expand All @@ -154,6 +164,7 @@ func Open(options Options) (*DB, error) {
flushChan: make(chan *memtable, options.MemtableNums-1),
closeChan: make(chan struct{}),
compactChan: make(chan deprecatedState),
diskIO: diskIO,
options: options,
batchPool: sync.Pool{New: makeBatch},
}
Expand Down Expand Up @@ -611,12 +622,18 @@ func (db *DB) listenAutoCompact() {
} else if state.thresholdState == ThresholdState(ArriveLowerThreshold) {
// determine whether to do compact based on the current IO state
log.Println("ArriveLowerThreshold")
// TODO: since the IO state module has not been implemented yet, we just compare it here
if firstCompact {
firstCompact = false
err = db.Compact()
} else {
err = db.CompactWithDeprecatedtable()
var free bool
free, err = db.diskIO.IsFree()
if err != nil {
panic(err)
}
if free {
if firstCompact {
firstCompact = false
err = db.Compact()
} else {
err = db.CompactWithDeprecatedtable()
}
}
}
if err != nil {
Expand Down
59 changes: 59 additions & 0 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package lotusdb

import (
"bytes"
"log"
"os"
"sync"
"testing"
Expand Down Expand Up @@ -1034,3 +1035,61 @@ func TestDeprecatetableMetaPersist(t *testing.T) {
assert.Equal(t, deprecatedNumberFirst, deprecatedNumberSecond)
})
}

func SimpleIO(targetPath string, interval int, count int) {
data := []byte("This is a test I/O operation.\n")
pl := make([]byte, 30)
for count > 0 {
count--
file, err := os.Create(targetPath)
if err != nil {
log.Println("Error creating file:", err)
return
}

_, err = file.Write(data)
if err != nil {
log.Println("Error writing to file:", err)
return
}

err = file.Sync()
if err != nil {
log.Println("Error syncing file:", err)
return
}

_, err = file.Read(pl)
if err != nil {
return
}
file.Close()
time.Sleep(time.Duration(interval) * time.Millisecond)
}
log.Println("quit io")
}

func TestDiskIO(t *testing.T) {
options := DefaultOptions
options.autoCompact = true
path, err := os.MkdirTemp("", "db-test-diskio")
require.NoError(t, err)
options.DirPath = path
options.CompactBatchCount = 2 << 5

db, err := Open(options)
require.NoError(t, err)
defer destroyDB(db)

t.Run("test diskio", func(t *testing.T) {
go SimpleIO(options.DirPath+"iofile", 5, 2000)
var free bool
free = true
tryCount := 200
for free && tryCount > 0 {
free, _ = db.diskIO.IsFree()
tryCount--
}
assert.False(t, free)
})
}
105 changes: 105 additions & 0 deletions diskio.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package lotusdb

import (
"errors"
"log"
"path/filepath"
"strings"
"time"

"github.com/shirou/gopsutil/disk"
)

type DiskIO struct {
targetPath string
samplingInterval int // unit millisecond
readBusyThreshold uint64 // read bytes during samplingInterval > readBusyThreshold is busy
writeBusyThreshold uint64 // write bytes during samplingInterval > writeBusyThreshold is busy
}

func (io DiskIO) IsFree() (bool, error) {
var ioStart disk.IOCountersStat
var ioEnd disk.IOCountersStat
var err error
ioStart, err = GetDiskIOInfo((io.targetPath))
if err != nil {
return false, err
}
time.Sleep(time.Duration(io.samplingInterval) * time.Millisecond)
ioEnd, err = GetDiskIOInfo((io.targetPath))
if err != nil {
return false, err
}
readBytes := ioEnd.ReadBytes - ioStart.ReadBytes
writeBytes := ioEnd.WriteBytes - ioStart.WriteBytes
log.Println("RdThreshold:", io.readBusyThreshold, "readBytes:", readBytes,
"WtThreshold:", io.writeBusyThreshold, "writeBytes:", writeBytes)
if io.readBusyThreshold < readBytes || io.writeBusyThreshold < writeBytes {
return false, nil
}
return true, nil
}

func GetDiskIOInfo(targetPath string) (disk.IOCountersStat, error) {
var io disk.IOCountersStat
// Get all mounting points
partitions, err := disk.Partitions(false)
if err != nil {
log.Println("Error getting partitions:", err)
return io, err
}

var targetDevice string

// Find the mount point where the target path is located
for _, partition := range partitions {
if isPathOnDevice(targetPath, partition.Mountpoint) {
targetDevice = partition.Device
break
}
}

targetDevice = getDeviceName(targetDevice)

// Get the I/O status of the device
ioCounters, err := disk.IOCounters()
if err != nil {
return io, err
}

var exists bool
if io, exists = ioCounters[targetDevice]; !exists {
return io, errors.New("No I/O stats available for device" + targetDevice)
}
return io, nil
}

// Check if the path is on the specified mount point.
func getDeviceName(devicePath string) string {
parts := strings.Split(devicePath, "/")
if len(parts) > 0 {
return parts[len(parts)-1]
}
return devicePath
}

// Check if the path is on the specified mount point.
func isPathOnDevice(path, mountpoint string) bool {
absPath, err := filepath.Abs(path)
if err != nil {
log.Println("Error getting absolute path:", err)
return false
}

absMountpoint, err := filepath.Abs(mountpoint)
if err != nil {
log.Println("Error getting absolute mountpoint:", err)
return false
}

// Ensure paths are normalized for comparison
absPath = filepath.Clean(absPath)
absMountpoint = filepath.Clean(absMountpoint)

return strings.HasPrefix(absPath, absMountpoint)
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ require (
golang.org/x/sync v0.5.0
)

require github.com/shirou/gopsutil v3.21.11+incompatible // indirect

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgraph-io/ristretto v0.1.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ github.com/rosedblabs/diskhash v0.0.0-20230910084041-289755737e2a h1:BNp46nsknQi
github.com/rosedblabs/diskhash v0.0.0-20230910084041-289755737e2a/go.mod h1:3xvIg+7iOFUL/vMCE/6DwE6Yecb0okVYJBEfpdC/E+8=
github.com/rosedblabs/wal v1.3.6 h1:oxZYTPX/u4JuGDW98wQ1YamWqerlrlSUFKhgP6Gd/Ao=
github.com/rosedblabs/wal v1.3.6/go.mod h1:wdq54KJUyVTOv1uddMc6Cdh2d/YCIo8yjcwJAb1RCEM=
github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=
github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
13 changes: 13 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,15 @@ type Options struct {
// deprecatedtable force compaction size
deprecatedtableUpperThreshold uint32

// sampling interval of diskIO, unit is millisecond
diskIOSamplingInterval int

// read bytes during samplingInterval > readBusyThreshold is busy
diskIOReadBusyThreshold uint64

// write bytes during samplingInterval > writeBusyThreshold is busy
diskIOWriteBusyThreshold uint64

// autoCompact support
autoCompact bool

Expand Down Expand Up @@ -140,6 +149,10 @@ var DefaultOptions = Options{
CompactBatchCount: 10000,
deprecatedtableLowerThreshold: 2 * 100 * KB, // 200K
deprecatedtableUpperThreshold: 4 * 100 * KB, // 400K
//nolint:gomnd // default
diskIOSamplingInterval: 10,
diskIOReadBusyThreshold: 1 * KB,
diskIOWriteBusyThreshold: 1 * KB,
autoCompact: true,
//nolint:gomnd // default
WaitMemSpaceTimeout: 100 * time.Millisecond,
Expand Down

0 comments on commit b5738fc

Please sign in to comment.