Skip to content

Commit

Permalink
Run Member tests in parallel
Browse files Browse the repository at this point in the history
Introduce port allocator and remove unused MemberNumber.
Add UniquePortAlloc config field so that default behavior will stay
unchanged.

On my local machine it brings down execution time from 5m to 32s.

Issue: etcd-io#18983
Change-Id: Ibd49dafecaaab85966934ed9d314d23912fbac32
Signed-off-by: Aleksander Mistewicz <[email protected]>
  • Loading branch information
AwesomePatrol committed Jan 3, 2025
1 parent fce823a commit 6d941b9
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 12 deletions.
9 changes: 9 additions & 0 deletions tests/common/member_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,11 @@ func TestMemberList(t *testing.T) {

for _, tc := range clusterTestCases() {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
tc.config.UniquePortAlloc = true
clus := testRunner.NewCluster(ctx, t, config.WithClusterConfig(tc.config))
defer clus.Close()
cc := testutils.MustClient(clus.Client())
Expand Down Expand Up @@ -113,6 +116,8 @@ func TestMemberAdd(t *testing.T) {
for _, quorumTc := range quorumTcs {
for _, clusterTc := range clusterTestCases() {
t.Run(learnerTc.name+"/"+quorumTc.name+"/"+clusterTc.name, func(t *testing.T) {
t.Parallel()

ctxTimeout := 10 * time.Second
if quorumTc.waitForQuorum {
ctxTimeout += etcdserver.HealthInterval
Expand All @@ -121,6 +126,7 @@ func TestMemberAdd(t *testing.T) {
defer cancel()
c := clusterTc.config
c.StrictReconfigCheck = quorumTc.strictReconfigCheck
c.UniquePortAlloc = true
clus := testRunner.NewCluster(ctx, t, config.WithClusterConfig(c))
defer clus.Close()
cc := testutils.MustClient(clus.Client())
Expand Down Expand Up @@ -198,10 +204,13 @@ func TestMemberRemove(t *testing.T) {
continue
}
t.Run(quorumTc.name+"/"+clusterTc.name, func(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithTimeout(context.Background(), 14*time.Second)
defer cancel()
c := clusterTc.config
c.StrictReconfigCheck = quorumTc.strictReconfigCheck
c.UniquePortAlloc = true
clus := testRunner.NewCluster(ctx, t, config.WithClusterConfig(c))
defer clus.Close()
// client connects to a specific member which won't be removed from cluster
Expand Down
5 changes: 5 additions & 0 deletions tests/framework/config/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type ClusterConfig struct {
StrictReconfigCheck bool
AuthToken string
SnapshotCount uint64
UniquePortAlloc bool

// ClusterContext is used by "e2e" or "integration" to extend the
// ClusterConfig. The common test cases shouldn't care about what
Expand Down Expand Up @@ -88,3 +89,7 @@ func WithSnapshotCount(count uint64) ClusterOption {
func WithStrictReconfigCheck(strict bool) ClusterOption {
return func(c *ClusterConfig) { c.StrictReconfigCheck = strict }
}

func WithUniquePortAlloc() ClusterOption {
return func(c *ClusterConfig) { c.UniquePortAlloc = true }
}
14 changes: 13 additions & 1 deletion tests/framework/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,16 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
peer2Port := port + 3
clientHTTPPort := port + 4

var allocatedPorts []int
if cfg.BasePort == -1 {
clientPort = uniquePorts.Alloc()
peerPort = uniquePorts.Alloc()
metricsPort = uniquePorts.Alloc()
peer2Port = uniquePorts.Alloc()
clientHTTPPort = uniquePorts.Alloc()
allocatedPorts = []int{clientPort, peerPort, metricsPort, peer2Port, clientHTTPPort}
}

if cfg.Client.ConnectionType == ClientTLSAndNonTLS {
curl = clientURL(cfg.ClientScheme(), clientPort, ClientNonTLS)
curls = []string{curl, clientURL(cfg.ClientScheme(), clientPort, ClientTLS)}
Expand Down Expand Up @@ -639,7 +649,8 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
}
var gofailPort int
if cfg.GoFailEnabled {
gofailPort = (i+1)*10000 + 2381
gofailPort = uniquePorts.Alloc()
allocatedPorts = append(allocatedPorts, gofailPort)
envVars["GOFAIL_HTTP"] = fmt.Sprintf("127.0.0.1:%d", gofailPort)
}

Expand All @@ -662,6 +673,7 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
GoFailClientTimeout: cfg.GoFailClientTimeout,
Proxy: proxyCfg,
LazyFSEnabled: cfg.LazyFSEnabled,
AllocatedPorts: allocatedPorts,
}
}

Expand Down
3 changes: 3 additions & 0 deletions tests/framework/e2e/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ func (e e2eRunner) NewCluster(ctx context.Context, t testing.TB, opts ...config.
default:
t.Fatalf("PeerTLS config %q not supported", cfg.PeerTLS)
}
if cfg.UniquePortAlloc {
e2eConfig.BasePort = -1
}
epc, err := NewEtcdProcessCluster(ctx, t, WithConfig(e2eConfig))
if err != nil {
t.Fatalf("could not start etcd integrationCluster: %s", err)
Expand Down
14 changes: 10 additions & 4 deletions tests/framework/e2e/etcd_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,11 @@ type EtcdServerProcessConfig struct {

Name string

PeerURL url.URL
ClientURL string
ClientHTTPURL string
MetricsURL string
PeerURL url.URL
ClientURL string
ClientHTTPURL string
MetricsURL string
AllocatedPorts []int

InitialToken string
InitialCluster string
Expand Down Expand Up @@ -248,6 +249,11 @@ func (ep *EtcdServerProcess) Close() error {
ep.cfg.lg.Info("removing directory", zap.String("data-dir", ep.cfg.DataDirPath))
return os.RemoveAll(ep.cfg.DataDirPath)
}

for _, port := range ep.cfg.AllocatedPorts {
uniquePorts.Free(port)
}
ep.cfg.AllocatedPorts = nil
return nil
}

Expand Down
58 changes: 58 additions & 0 deletions tests/framework/e2e/port_alloc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2024 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package e2e

import "sync"

// uniquePorts is a global instance of testPorts.
var uniquePorts *testPorts

func init() {
uniquePorts = newTestPorts(11000, 19000)
}

// testPorts is used to allocate listen ports for etcd instance in tests
// in a safe way for concurrent use (i.e. running tests in parallel).
type testPorts struct {
mux sync.Mutex
unused map[int]bool
}

// newTestPorts keeps track of unused ports in the specified range.
func newTestPorts(start, end int) *testPorts {
m := make(map[int]bool, end-start)
for i := start; i < end; i++ {
m[i] = true
}
return &testPorts{unused: m}
}

// Alloc allocates a new port or panics if none is available.
func (pa *testPorts) Alloc() int {
pa.mux.Lock()
defer pa.mux.Unlock()
for port := range pa.unused {
delete(pa.unused, port)
return port
}
panic("all ports are used")
}

// Free makes port available for allocation through Alloc.
func (pa *testPorts) Free(port int) {
pa.mux.Lock()
defer pa.mux.Unlock()
pa.unused[port] = true
}
10 changes: 3 additions & 7 deletions tests/framework/integration/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,13 +260,12 @@ func (c *Cluster) ProtoMembers() []*pb.Member {
}

func (c *Cluster) mustNewMember(t testutil.TB) *Member {
memberNumber := c.LastMemberNum
uniqueNumber := atomic.AddInt32(&UniqueNumber, 1)*10 + int32(c.LastMemberNum)
c.LastMemberNum++

m := MustNewMember(t,
MemberConfig{
Name: fmt.Sprintf("m%v", memberNumber),
MemberNumber: memberNumber,
Name: fmt.Sprintf("m%v", uniqueNumber),
AuthToken: c.Cfg.AuthToken,
PeerTLS: c.Cfg.PeerTLS,
ClientTLS: c.Cfg.ClientTLS,
Expand Down Expand Up @@ -549,7 +548,6 @@ func NewListenerWithAddr(t testutil.TB, addr string) net.Listener {
type Member struct {
config.ServerConfig
UniqNumber int
MemberNumber int
Port string
PeerListeners, ClientListeners []net.Listener
GRPCListener net.Listener
Expand Down Expand Up @@ -591,7 +589,6 @@ type Member struct {
type MemberConfig struct {
Name string
UniqNumber int64
MemberNumber int
PeerTLS *transport.TLSInfo
ClientTLS *transport.TLSInfo
AuthToken string
Expand Down Expand Up @@ -624,8 +621,7 @@ type MemberConfig struct {
func MustNewMember(t testutil.TB, mcfg MemberConfig) *Member {
var err error
m := &Member{
MemberNumber: mcfg.MemberNumber,
UniqNumber: int(atomic.AddInt32(&UniqueCount, 1)),
UniqNumber: int(atomic.AddInt32(&UniqueCount, 1)),
}

peerScheme := SchemeFromTLSInfo(mcfg.PeerTLS)
Expand Down

0 comments on commit 6d941b9

Please sign in to comment.