diff --git a/tests/robustness/traffic/etcd.go b/tests/robustness/traffic/etcd.go index c426064f68f..9b42b93c190 100644 --- a/tests/robustness/traffic/etcd.go +++ b/tests/robustness/traffic/etcd.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "math/rand" + "time" "golang.org/x/time/rate" @@ -108,7 +109,7 @@ func (t etcdTraffic) Name() string { return "Etcd" } -func (t etcdTraffic) Run(ctx context.Context, c *client.RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIDStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{}) { +func (t etcdTraffic) RunTrafficLoop(ctx context.Context, c *client.RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIDStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{}) { lastOperationSucceeded := true var lastRev int64 var requestType etcdRequestType @@ -155,6 +156,35 @@ func (t etcdTraffic) Run(ctx context.Context, c *client.RecordingClient, limiter } } +func (t etcdTraffic) RunCompactLoop(ctx context.Context, c *client.RecordingClient, period time.Duration, finish <-chan struct{}) { + var lastRev int64 = 2 + timer := time.NewTimer(period) + for { + select { + case <-ctx.Done(): + return + case <-finish: + return + case <-timer.C: + } + timer.Reset(period) + statusCtx, cancel := context.WithTimeout(ctx, RequestTimeout) + resp, err := c.Status(statusCtx, c.Endpoints()[0]) + cancel() + if err != nil { + continue + } + + // Range allows for both revision has been compacted and future revision errors + compactRev := random.RandRange(lastRev, resp.Header.Revision+5) + _, err = c.Compact(ctx, compactRev) + if err != nil { + continue + } + lastRev = compactRev + } +} + func filterOutNonUniqueEtcdWrites(choices []random.ChoiceWeight[etcdRequestType]) (resp []random.ChoiceWeight[etcdRequestType]) { for _, choice := range choices { if choice.Choice != Delete && choice.Choice != LeaseRevoke { diff --git a/tests/robustness/traffic/kubernetes.go b/tests/robustness/traffic/kubernetes.go index 445f75324e6..ad1d7de4209 100644 --- a/tests/robustness/traffic/kubernetes.go +++ b/tests/robustness/traffic/kubernetes.go @@ -19,7 +19,9 @@ import ( "errors" "fmt" "math/rand" + "strconv" "sync" + "time" "golang.org/x/sync/errgroup" "golang.org/x/time/rate" @@ -56,7 +58,7 @@ func (t kubernetesTraffic) ExpectUniqueRevision() bool { return true } -func (t kubernetesTraffic) Run(ctx context.Context, c *client.RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIDStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{}) { +func (t kubernetesTraffic) RunTrafficLoop(ctx context.Context, c *client.RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIDStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{}) { kc := kubernetes.Client{Client: &clientv3.Client{KV: c}} s := newStorage() keyPrefix := "/registry/" + t.resource + "/" @@ -205,6 +207,62 @@ func (t kubernetesTraffic) generateKey() string { return fmt.Sprintf("/registry/%s/%s/%s", t.resource, t.namespace, stringutil.RandString(5)) } +func (t kubernetesTraffic) RunCompactLoop(ctx context.Context, c *client.RecordingClient, interval time.Duration, finish <-chan struct{}) { + // Based on https://github.com/kubernetes/apiserver/blob/7dd4904f1896e11244ba3c5a59797697709de6b6/pkg/storage/etcd3/compact.go#L112-L127 + var compactTime int64 + var rev int64 + var err error + for { + select { + case <-time.After(interval): + case <-ctx.Done(): + return + case <-finish: + return + } + + compactTime, rev, err = compact(ctx, c, compactTime, rev) + if err != nil { + continue + } + } +} + +// Based on https://github.com/kubernetes/apiserver/blob/7dd4904f1896e11244ba3c5a59797697709de6b6/pkg/storage/etcd3/compact.go#L30 +const ( + compactRevKey = "compact_rev_key" +) + +func compact(ctx context.Context, client *client.RecordingClient, t, rev int64) (int64, int64, error) { + // Based on https://github.com/kubernetes/apiserver/blob/7dd4904f1896e11244ba3c5a59797697709de6b6/pkg/storage/etcd3/compact.go#L133-L162 + // TODO: Use Version and not ModRevision when model supports key versioning. + resp, err := client.Txn(ctx). + If(clientv3.Compare(clientv3.ModRevision(compactRevKey), "=", t)). + Then(clientv3.OpPut(compactRevKey, strconv.FormatInt(rev, 10))). + Else(clientv3.OpGet(compactRevKey)). + Commit() + if err != nil { + return t, rev, err + } + + curRev := resp.Header.Revision + + if !resp.Succeeded { + // TODO: Use Version and not ModRevision when model supports key versioning. + curTime := resp.Responses[0].GetResponseRange().Kvs[0].ModRevision + return curTime, curRev, nil + } + curTime := t + 1 + + if rev == 0 { + return curTime, curRev, nil + } + if _, err = client.Compact(ctx, rev); err != nil { + return curTime, curRev, err + } + return curTime, curRev, nil +} + type KubernetesRequestType string const ( diff --git a/tests/robustness/traffic/traffic.go b/tests/robustness/traffic/traffic.go index 91f13070546..d86663dfffb 100644 --- a/tests/robustness/traffic/traffic.go +++ b/tests/robustness/traffic/traffic.go @@ -28,7 +28,6 @@ import ( "go.etcd.io/etcd/tests/v3/robustness/client" "go.etcd.io/etcd/tests/v3/robustness/identity" "go.etcd.io/etcd/tests/v3/robustness/model" - "go.etcd.io/etcd/tests/v3/robustness/random" "go.etcd.io/etcd/tests/v3/robustness/report" ) @@ -81,7 +80,7 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2 defer wg.Done() defer c.Close() - traffic.Run(ctx, c, limiter, ids, lm, nonUniqueWriteLimiter, finish) + traffic.RunTrafficLoop(ctx, c, limiter, ids, lm, nonUniqueWriteLimiter, finish) mux.Lock() reports = append(reports, c.Report()) mux.Unlock() @@ -101,7 +100,8 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2 if profile.CompactPeriod != time.Duration(0) { compactionPeriod = profile.CompactPeriod } - RunCompactLoop(ctx, c, compactionPeriod, finish) + + traffic.RunCompactLoop(ctx, c, compactionPeriod, finish) mux.Lock() reports = append(reports, c.Report()) mux.Unlock() @@ -195,35 +195,7 @@ func (p Profile) WithCompactionPeriod(cp time.Duration) Profile { } type Traffic interface { - Run(ctx context.Context, c *client.RecordingClient, qpsLimiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIDStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{}) + RunTrafficLoop(ctx context.Context, c *client.RecordingClient, qpsLimiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIDStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{}) + RunCompactLoop(ctx context.Context, c *client.RecordingClient, period time.Duration, finish <-chan struct{}) ExpectUniqueRevision() bool } - -func RunCompactLoop(ctx context.Context, c *client.RecordingClient, period time.Duration, finish <-chan struct{}) { - var lastRev int64 = 2 - timer := time.NewTimer(period) - for { - timer.Reset(period) - select { - case <-ctx.Done(): - return - case <-finish: - return - case <-timer.C: - } - statusCtx, cancel := context.WithTimeout(ctx, RequestTimeout) - resp, err := c.Status(statusCtx, c.Endpoints()[0]) - cancel() - if err != nil { - continue - } - - // Range allows for both revision has been compacted and future revision errors - compactRev := random.RandRange(lastRev, resp.Header.Revision+5) - _, err = c.Compact(ctx, compactRev) - if err != nil { - continue - } - lastRev = compactRev - } -}