Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcdserver: add auto compaction interval option #18472

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ type ServerConfig struct {

AutoCompactionRetention time.Duration
AutoCompactionMode string
AutoCompactionInterval time.Duration
CompactionBatchLimit int
CompactionSleepInterval time.Duration
QuotaBackendBytes int64
Expand Down
4 changes: 4 additions & 0 deletions server/embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,9 @@ type Config struct {
// If no time unit is provided and compaction mode is 'periodic',
// the unit defaults to hour. For example, '5' translates into 5-hour.
AutoCompactionRetention string `json:"auto-compaction-retention"`
// AutoCompactionInterval is the delay between compaction runs.
// If no interval is specified 'periodic' defaults to retention, revision defaults to 5 minutes
AutoCompactionInterval string `json:"auto-compaction-interval"`

// GRPCKeepAliveMinTime is the minimum interval that a client should
// wait before pinging server. When client pings "too fast", server
Expand Down Expand Up @@ -747,6 +750,7 @@ func (cfg *Config) AddFlags(fs *flag.FlagSet) {

fs.StringVar(&cfg.AutoCompactionRetention, "auto-compaction-retention", "0", "Auto compaction retention for mvcc key value store. 0 means disable auto compaction.")
fs.StringVar(&cfg.AutoCompactionMode, "auto-compaction-mode", "periodic", "interpret 'auto-compaction-retention' one of: periodic|revision. 'periodic' for duration based retention, defaulting to hours if no time unit is provided (e.g. '5m'). 'revision' for revision number based retention.")
fs.StringVar(&cfg.AutoCompactionInterval, "auto-compaction-interval", "", "Auto compaction interval for mvcc key value store. Default is based on mode selected.")

// pprof profiler via HTTP
fs.BoolVar(&cfg.EnablePprof, "enable-pprof", false, "Enable runtime profiling data via HTTP server. Address is at client URL + \"/debug/pprof/\"")
Expand Down
29 changes: 29 additions & 0 deletions server/embed/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,35 @@ func TestAutoCompactionModeParse(t *testing.T) {
}
}

func TestAutoCompactionIntervalParse(t *testing.T) {
tests := []struct {
interval string
werr bool
wdur time.Duration
}{
{"", false, 0},
{"1", true, 0},
{"1h", false, time.Hour},
{"1s", false, time.Second},
{"a", true, 0},
{"-1", true, 0},
}

hasErr := func(err error) bool {
return err != nil
}

for i, tt := range tests {
dur, err := parseCompactionInterval(tt.interval)
if hasErr(err) != tt.werr {
t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
}
if dur != tt.wdur {
t.Errorf("#%d: duration = %s, want %s", i, dur, tt.wdur)
}
}
}

func TestPeerURLsMapAndTokenFromSRV(t *testing.T) {
defer func() { getCluster = srv.GetCluster }()

Expand Down
13 changes: 13 additions & 0 deletions server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,11 @@
return e, err
}

autoCompactionInterval, err := parseCompactionInterval(cfg.AutoCompactionInterval)
if err != nil {
return e, err

Check warning on line 171 in server/embed/etcd.go

View check run for this annotation

Codecov / codecov/patch

server/embed/etcd.go#L171

Added line #L171 was not covered by tests
}

backendFreelistType := parseBackendFreelistType(cfg.BackendFreelistType)

srvcfg := config.ServerConfig{
Expand All @@ -190,6 +195,7 @@
InitialElectionTickAdvance: cfg.InitialElectionTickAdvance,
AutoCompactionRetention: autoCompactionRetention,
AutoCompactionMode: cfg.AutoCompactionMode,
AutoCompactionInterval: autoCompactionInterval,
QuotaBackendBytes: cfg.QuotaBackendBytes,
BackendBatchLimit: cfg.BackendBatchLimit,
BackendFreelistType: backendFreelistType,
Expand Down Expand Up @@ -897,6 +903,13 @@
return l
}

func parseCompactionInterval(interval string) (ret time.Duration, err error) {
if interval == "" {
return ret, nil
}
return time.ParseDuration(interval)
}

func parseCompactionRetention(mode, retention string) (ret time.Duration, err error) {
h, err := strconv.Atoi(retention)
if err == nil && h >= 0 {
Expand Down
2 changes: 2 additions & 0 deletions server/etcdmain/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ Clustering:
Auto compaction retention length. 0 means disable auto compaction.
--auto-compaction-mode 'periodic'
Interpret 'auto-compaction-retention' one of: periodic|revision. 'periodic' for duration based retention, defaulting to hours if no time unit is provided (e.g. '5m'). 'revision' for revision number based retention.
--auto-compaction-interval ''
Auto compaction interval. Empty means use default based on mode selected.
--v2-deprecation '` + string(cconfig.V2DeprDefault) + `'
Phase of v2store deprecation. Deprecated and scheduled for removal in v3.8. The default value is enforced, ignoring user input.
Supported values:
Expand Down
5 changes: 3 additions & 2 deletions server/etcdserver/api/v3compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
lg *zap.Logger,
mode string,
retention time.Duration,
interval time.Duration,
rg RevGetter,
c Compactable,
) (Compactor, error) {
Expand All @@ -64,9 +65,9 @@
}
switch mode {
case ModePeriodic:
return newPeriodic(lg, clockwork.NewRealClock(), retention, rg, c), nil
return newPeriodic(lg, clockwork.NewRealClock(), retention, interval, rg, c), nil
case ModeRevision:
return newRevision(lg, clockwork.NewRealClock(), int64(retention), rg, c), nil
return newRevision(lg, clockwork.NewRealClock(), int64(retention), interval, rg, c), nil

Check warning on line 70 in server/etcdserver/api/v3compactor/compactor.go

View check run for this annotation

Codecov / codecov/patch

server/etcdserver/api/v3compactor/compactor.go#L70

Added line #L70 was not covered by tests
default:
return nil, fmt.Errorf("unsupported compaction mode %s", mode)
}
Expand Down
24 changes: 15 additions & 9 deletions server/etcdserver/api/v3compactor/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ import (
// Periodic compacts the log by purging revisions older than
// the configured retention time.
type Periodic struct {
lg *zap.Logger
clock clockwork.Clock
period time.Duration
lg *zap.Logger
clock clockwork.Clock
period time.Duration
interval time.Duration

rg RevGetter
c Compactable
Expand All @@ -48,13 +49,14 @@ type Periodic struct {

// newPeriodic creates a new instance of Periodic compactor that purges
// the log older than h Duration.
func newPeriodic(lg *zap.Logger, clock clockwork.Clock, h time.Duration, rg RevGetter, c Compactable) *Periodic {
func newPeriodic(lg *zap.Logger, clock clockwork.Clock, h time.Duration, interval time.Duration, rg RevGetter, c Compactable) *Periodic {
pc := &Periodic{
lg: lg,
clock: clock,
period: h,
rg: rg,
c: c,
lg: lg,
clock: clock,
period: h,
interval: interval,
rg: rg,
c: c,
}
// revs won't be longer than the retentions.
pc.revs = make([]int64, 0, pc.getRetentions())
Expand Down Expand Up @@ -162,11 +164,15 @@ func (pc *Periodic) Run() {
}()
}

// if static interval is provided, compact every x duration.
// if given compaction period x is <1-hour, compact every x duration.
// (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='10m', then compact every 10-minute)
// if given compaction period x is >1-hour, compact every hour.
// (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='2h', then compact every 1-hour)
func (pc *Periodic) getCompactInterval() time.Duration {
if pc.interval != 0 {
return pc.interval
}
itv := pc.period
if itv > time.Hour {
itv = time.Hour
Expand Down
63 changes: 59 additions & 4 deletions server/etcdserver/api/v3compactor/periodic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ import (
func TestPeriodicHourly(t *testing.T) {
retentionHours := 2
retentionDuration := time.Duration(retentionHours) * time.Hour
intervalDuration := time.Duration(0)

fc := clockwork.NewFakeClock()
// TODO: Do not depand or real time (Recorder.Wait) in unit tests.
rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(0), 0}
compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable)
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, intervalDuration, rg, compactable)

tb.Run()
defer tb.Stop()
Expand Down Expand Up @@ -82,11 +83,12 @@ func TestPeriodicHourly(t *testing.T) {
func TestPeriodicMinutes(t *testing.T) {
retentionMinutes := 5
retentionDuration := time.Duration(retentionMinutes) * time.Minute
intervalDuration := time.Duration(0)

fc := clockwork.NewFakeClock()
rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(0), 0}
compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable)
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, intervalDuration, rg, compactable)

tb.Run()
defer tb.Stop()
Expand Down Expand Up @@ -129,12 +131,64 @@ func TestPeriodicMinutes(t *testing.T) {
}
}

func TestPeriodicMinutesWithInterval(t *testing.T) {
retentionMinutes := 10
retentionDuration := time.Duration(retentionMinutes) * time.Minute
intervalDuration := 2 * time.Minute

fc := clockwork.NewFakeClock()
rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0}
compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, intervalDuration, rg, compactable)

tb.Run()
defer tb.Stop()

// compaction doesn't happen til 10 minutes elapse
for i := 0; i < retentionMinutes; i++ {
rg.Wait(1)
fc.Advance(tb.getRetryInterval())
}

// very first compaction
a, err := compactable.Wait(1)
if err != nil {
t.Fatal(err)
}
expectedRevision := int64(1)
if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
}

for i := 0; i < 10; i++ {
// advance 20 minutes, one revision for each minute
for j := 0; j < 20; j++ {
rg.Wait(1)
fc.Advance(1 * time.Minute)
}

// compact
a, err := compactable.Wait(1)
if err != nil {
t.Fatal(err)
}

// the expected revision is the current revision minus the retention duration
// since we made a revision every minute
expectedRevision := rg.rev - int64(retentionDuration.Minutes())
if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
}
}
}

func TestPeriodicPause(t *testing.T) {
fc := clockwork.NewFakeClock()
retentionDuration := time.Hour
intervalDuration := time.Duration(0)
rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(0), 0}
compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable)
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, intervalDuration, rg, compactable)

tb.Run()
tb.Pause()
Expand Down Expand Up @@ -177,11 +231,12 @@ func TestPeriodicPause(t *testing.T) {
func TestPeriodicSkipRevNotChange(t *testing.T) {
retentionMinutes := 5
retentionDuration := time.Duration(retentionMinutes) * time.Minute
intervalDuration := time.Duration(0)

fc := clockwork.NewFakeClock()
rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(0), 0}
compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(20 * time.Millisecond)}
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable)
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, intervalDuration, rg, compactable)

tb.Run()
defer tb.Stop()
Expand Down
14 changes: 9 additions & 5 deletions server/etcdserver/api/v3compactor/revision.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

clock clockwork.Clock
retention int64
interval time.Duration

rg RevGetter
c Compactable
Expand All @@ -47,20 +48,23 @@

// newRevision creates a new instance of Revisonal compactor that purges
// the log older than retention revisions from the current revision.
func newRevision(lg *zap.Logger, clock clockwork.Clock, retention int64, rg RevGetter, c Compactable) *Revision {
func newRevision(lg *zap.Logger, clock clockwork.Clock, retention int64, interval time.Duration, rg RevGetter, c Compactable) *Revision {
// default revision interval to 5 minutes
if interval == 0 {
interval = time.Minute * 5

Check warning on line 54 in server/etcdserver/api/v3compactor/revision.go

View check run for this annotation

Codecov / codecov/patch

server/etcdserver/api/v3compactor/revision.go#L54

Added line #L54 was not covered by tests
}
rc := &Revision{
lg: lg,
clock: clock,
retention: retention,
interval: interval,
rg: rg,
c: c,
}
rc.ctx, rc.cancel = context.WithCancel(context.Background())
return rc
}

const revInterval = 5 * time.Minute

// Run runs revision-based compactor.
func (rc *Revision) Run() {
prev := int64(0)
Expand All @@ -69,7 +73,7 @@
select {
case <-rc.ctx.Done():
return
case <-rc.clock.After(revInterval):
case <-rc.clock.After(rc.interval):
rc.mu.Lock()
p := rc.paused
rc.mu.Unlock()
Expand Down Expand Up @@ -103,7 +107,7 @@
"failed auto revision compaction",
zap.Int64("revision", rev),
zap.Int64("revision-compaction-retention", rc.retention),
zap.Duration("retry-interval", revInterval),
zap.Duration("retry-interval", rc.interval),

Check warning on line 110 in server/etcdserver/api/v3compactor/revision.go

View check run for this annotation

Codecov / codecov/patch

server/etcdserver/api/v3compactor/revision.go#L110

Added line #L110 was not covered by tests
zap.Error(err),
)
}
Expand Down
Loading
Loading