Skip to content

Commit

Permalink
feat: Modify optimized compaction to cover edge cases (#25594)
Browse files Browse the repository at this point in the history
* feat: Modify optimized compaction to cover edge cases
This PR changes the algorithm for compaction to account for the following
cases that were not previously accounted for:

- Many generations with a groupsize over 2 GB
- Single generation with many files and a groupsize under 2 GB
- Where groupsize is the total size of the TSM files in said shard directory.
- shards that may have over a 2 GB group size but
many fragmented files (under 2 GB and under aggressive
point per block count)

closes #25666
  • Loading branch information
devanbenz authored Jan 14, 2025
1 parent e2d76ed commit f04105b
Show file tree
Hide file tree
Showing 5 changed files with 664 additions and 208 deletions.
16 changes: 16 additions & 0 deletions tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ const (
// block in a TSM file
DefaultMaxPointsPerBlock = 1000

// AggressiveMaxPointsPerBlock is used when we want to further compact blocks
// it is 100 times the default amount of points we use per block
AggressiveMaxPointsPerBlock = DefaultMaxPointsPerBlock * 100

// DefaultMaxSeriesPerDatabase is the maximum number of series a node can hold per database.
// This limit only applies to the "inmem" index.
DefaultMaxSeriesPerDatabase = 1000000
Expand All @@ -77,8 +81,20 @@ const (
// partition snapshot compactions that can run at one time.
// A value of 0 results in runtime.GOMAXPROCS(0).
DefaultSeriesFileMaxConcurrentSnapshotCompactions = 0

// MaxTSMFileSize is the maximum size of TSM files.
MaxTSMFileSize = uint32(2048 * 1024 * 1024) // 2GB
)

var SingleGenerationReasonText string = SingleGenerationReason()

// SingleGenerationReason outputs a log message for our single generation compaction
// when checked for full compaction.
// 1048576000 is a magic number for bytes per gigabyte.
func SingleGenerationReason() string {
return fmt.Sprintf("not fully compacted and not idle because single generation with more than 2 files under %d GB and more than 1 file(s) under aggressive compaction points per block count (%d points)", int(MaxTSMFileSize/1048576000), AggressiveMaxPointsPerBlock)
}

// Config holds the configuration for the tsbd package.
type Config struct {
Dir string `toml:"dir"`
Expand Down
82 changes: 53 additions & 29 deletions tsdb/engine/tsm1/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"go.uber.org/zap"
)

const maxTSMFileSize = uint32(2048 * 1024 * 1024) // 2GB
const logEvery = 2 * DefaultSegmentSize

const (
Expand Down Expand Up @@ -101,7 +100,13 @@ type CompactionGroup []string
type CompactionPlanner interface {
Plan(lastWrite time.Time) ([]CompactionGroup, int64)
PlanLevel(level int) ([]CompactionGroup, int64)
PlanOptimize() ([]CompactionGroup, int64)
// PlanOptimize will return the groups for compaction, the compaction group length,
// and the amount of generations within the compaction group.
// generationCount needs to be set to decide how many points per block during compaction.
// This value is mostly ignored in normal compaction code paths, but,
// for the edge case where there is a single generation with many
// files under 2 GB this value is an important indicator.
PlanOptimize() (compactGroup []CompactionGroup, compactionGroupLen int64, generationCount int64)
Release(group []CompactionGroup)
FullyCompacted() (bool, string)

Expand Down Expand Up @@ -230,6 +235,27 @@ func (c *DefaultPlanner) FullyCompacted() (bool, string) {
} else if gens.hasTombstones() {
return false, "not fully compacted and not idle because of tombstones"
} else {
// For planning we want to ensure that if there is a single generation
// shard, but it has many files that are under 2 GB and many files that are
// not at the aggressive compaction points per block count (100,000) we further
// compact the shard. It is okay to stop compaction if there are many
// files that are under 2 GB but at the aggressive points per block count.
if len(gens) == 1 && len(gens[0].files) > 1 {
aggressivePointsPerBlockCount := 0
filesUnderMaxTsmSizeCount := 0
for _, tsmFile := range gens[0].files {
if c.FileStore.BlockCount(tsmFile.Path, 1) >= tsdb.AggressiveMaxPointsPerBlock {
aggressivePointsPerBlockCount++
}
if tsmFile.Size < tsdb.MaxTSMFileSize {
filesUnderMaxTsmSizeCount++
}
}

if filesUnderMaxTsmSizeCount > 1 && aggressivePointsPerBlockCount < len(gens[0].files) {
return false, tsdb.SingleGenerationReasonText
}
}
return true, ""
}
}
Expand Down Expand Up @@ -340,25 +366,24 @@ func (c *DefaultPlanner) PlanLevel(level int) ([]CompactionGroup, int64) {
// PlanOptimize returns all TSM files if they are in different generations in order
// to optimize the index across TSM files. Each returned compaction group can be
// compacted concurrently.
func (c *DefaultPlanner) PlanOptimize() ([]CompactionGroup, int64) {
func (c *DefaultPlanner) PlanOptimize() (compactGroup []CompactionGroup, compactionGroupLen int64, generationCount int64) {
// If a full plan has been requested, don't plan any levels which will prevent
// the full plan from acquiring them.
c.mu.RLock()
if c.forceFull {
c.mu.RUnlock()
return nil, 0
return nil, 0, 0
}
c.mu.RUnlock()

// Determine the generations from all files on disk. We need to treat
// a generation conceptually as a single file even though it may be
// split across several files in sequence.
generations := c.findGenerations(true)
fullyCompacted, _ := c.FullyCompacted()

// If there is only one generation and no tombstones, then there's nothing to
// do.
if len(generations) <= 1 && !generations.hasTombstones() {
return nil, 0
if fullyCompacted {
return nil, 0, 0
}

// Group each generation by level such that two adjacent generations in the same
Expand All @@ -368,11 +393,6 @@ func (c *DefaultPlanner) PlanOptimize() ([]CompactionGroup, int64) {
for i := 0; i < len(generations); i++ {
cur := generations[i]

// Skip the file if it's over the max size and contains a full block and it does not have any tombstones
if cur.count() > 2 && cur.size() > uint64(maxTSMFileSize) && c.FileStore.BlockCount(cur.files[0].Path, 1) == tsdb.DefaultMaxPointsPerBlock && !cur.hasTombstones() {
continue
}

// See if this generation is orphan'd which would prevent it from being further
// compacted until a final full compactin runs.
if i < len(generations)-1 {
Expand All @@ -382,7 +402,7 @@ func (c *DefaultPlanner) PlanOptimize() ([]CompactionGroup, int64) {
}
}

if len(currentGen) == 0 || currentGen.level() == cur.level() {
if len(currentGen) == 0 || currentGen.level() >= cur.level() {
currentGen = append(currentGen, cur)
continue
}
Expand All @@ -397,21 +417,21 @@ func (c *DefaultPlanner) PlanOptimize() ([]CompactionGroup, int64) {
}

// Only optimize level 4 files since using lower-levels will collide
// with the level planners
// with the level planners. If this is a single generation optimization
// do not skip any levels.
var levelGroups []tsmGenerations
for _, cur := range groups {
if cur.level() == 4 {
levelGroups = append(levelGroups, cur)
if len(generations) == 1 {
levelGroups = append(levelGroups, groups...)
} else {
for _, cur := range groups {
if cur.level() == 4 {
levelGroups = append(levelGroups, cur)
}
}
}

var cGroups []CompactionGroup
for _, group := range levelGroups {
// Skip the group if it's not worthwhile to optimize it
if len(group) < 4 && !group.hasTombstones() {
continue
}

var cGroup CompactionGroup
for _, gen := range group {
for _, file := range gen.files {
Expand All @@ -423,10 +443,10 @@ func (c *DefaultPlanner) PlanOptimize() ([]CompactionGroup, int64) {
}

if !c.acquire(cGroups) {
return nil, int64(len(cGroups))
return nil, int64(len(cGroups)), int64(len(generations))
}

return cGroups, int64(len(cGroups))
return cGroups, int64(len(cGroups)), int64(len(generations))
}

// Plan returns a set of TSM files to rewrite for level 4 or higher. The planning returns
Expand Down Expand Up @@ -454,7 +474,7 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) ([]CompactionGroup, int64) {
var skip bool

// Skip the file if it's over the max size and contains a full block and it does not have any tombstones
if len(generations) > 2 && group.size() > uint64(maxTSMFileSize) && c.FileStore.BlockCount(group.files[0].Path, 1) == tsdb.DefaultMaxPointsPerBlock && !group.hasTombstones() {
if len(generations) > 2 && group.size() > uint64(tsdb.MaxTSMFileSize) && c.FileStore.BlockCount(group.files[0].Path, 1) >= tsdb.DefaultMaxPointsPerBlock && !group.hasTombstones() {
skip = true
}

Expand Down Expand Up @@ -530,7 +550,7 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) ([]CompactionGroup, int64) {
// Skip the file if it's over the max size and contains a full block or the generation is split
// over multiple files. In the latter case, that would mean the data in the file spilled over
// the 2GB limit.
if g.size() > uint64(maxTSMFileSize) && c.FileStore.BlockCount(g.files[0].Path, 1) == tsdb.DefaultMaxPointsPerBlock {
if g.size() > uint64(tsdb.MaxTSMFileSize) && c.FileStore.BlockCount(g.files[0].Path, 1) >= tsdb.DefaultMaxPointsPerBlock {
start = i + 1
}

Expand Down Expand Up @@ -574,7 +594,7 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) ([]CompactionGroup, int64) {
}

// Skip the file if it's over the max size and it contains a full block
if gen.size() >= uint64(maxTSMFileSize) && c.FileStore.BlockCount(gen.files[0].Path, 1) == tsdb.DefaultMaxPointsPerBlock && !gen.hasTombstones() {
if gen.size() >= uint64(tsdb.MaxTSMFileSize) && c.FileStore.BlockCount(gen.files[0].Path, 1) >= tsdb.DefaultMaxPointsPerBlock && !gen.hasTombstones() {
startIndex++
continue
}
Expand Down Expand Up @@ -910,6 +930,10 @@ func (c *Compactor) WriteSnapshot(cache *Cache, logger *zap.Logger) ([]string, e

// compact writes multiple smaller TSM files into 1 or more larger files.
func (c *Compactor) compact(fast bool, tsmFiles []string, logger *zap.Logger) ([]string, error) {
// Sets the points per block size. The larger this value is set
// the more points there will be in a single index. Under normal
// conditions this should always be 1000 but there is an edge case
// where this is increased.
size := c.Size
if size <= 0 {
size = tsdb.DefaultMaxPointsPerBlock
Expand Down Expand Up @@ -1199,7 +1223,7 @@ func (c *Compactor) write(path string, iter KeyIterator, throttle bool, logger *

// If we're over maxTSMFileSize, close out the file
// and return the error.
if w.Size() > maxTSMFileSize {
if w.Size() > tsdb.MaxTSMFileSize {
if err := w.WriteIndex(); err != nil {
return false, err
}
Expand Down
Loading

0 comments on commit f04105b

Please sign in to comment.