Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor/eliminating traffic on root partition #6167

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 26 additions & 14 deletions client/matching/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,10 @@ type (
}

defaultLoadBalancer struct {
nReadPartitions dynamicconfig.IntPropertyFnWithTaskListInfoFilters
nWritePartitions dynamicconfig.IntPropertyFnWithTaskListInfoFilters
domainIDToName func(string) (string, error)
nReadPartitions dynamicconfig.IntPropertyFnWithTaskListInfoFilters
nWritePartitions dynamicconfig.IntPropertyFnWithTaskListInfoFilters
domainIDToName func(string) (string, error)
maxChildrenPerNode dynamicconfig.IntPropertyFnWithTaskListInfoFilters
}
)

Expand All @@ -73,9 +74,10 @@ func NewLoadBalancer(
dc *dynamicconfig.Collection,
) LoadBalancer {
return &defaultLoadBalancer{
domainIDToName: domainIDToName,
nReadPartitions: dc.GetIntPropertyFilteredByTaskListInfo(dynamicconfig.MatchingNumTasklistReadPartitions),
nWritePartitions: dc.GetIntPropertyFilteredByTaskListInfo(dynamicconfig.MatchingNumTasklistWritePartitions),
domainIDToName: domainIDToName,
nReadPartitions: dc.GetIntPropertyFilteredByTaskListInfo(dynamicconfig.MatchingNumTasklistReadPartitions),
nWritePartitions: dc.GetIntPropertyFilteredByTaskListInfo(dynamicconfig.MatchingNumTasklistWritePartitions),
maxChildrenPerNode: dc.GetIntPropertyFilteredByTaskListInfo(dynamicconfig.MatchingForwarderMaxChildrenPerNode),
}
}

Expand All @@ -90,13 +92,13 @@ func (lb *defaultLoadBalancer) PickWritePartition(
return taskList.GetName()
}
nPartitions := lb.nWritePartitions(domainName, taskList.GetName(), taskListType)
maxChildrenPerNode := lb.maxChildrenPerNode(domainName, taskList.GetName(), taskListType)

// checks to make sure number of writes never exceeds number of reads
if nRead := lb.nReadPartitions(domainName, taskList.GetName(), taskListType); nPartitions > nRead {
nPartitions = nRead
}
return lb.pickPartition(taskList, forwardedFrom, nPartitions)

return lb.pickPartition(taskList, forwardedFrom, nPartitions, maxChildrenPerNode)
}

func (lb *defaultLoadBalancer) PickReadPartition(
Expand All @@ -110,14 +112,16 @@ func (lb *defaultLoadBalancer) PickReadPartition(
return taskList.GetName()
}
n := lb.nReadPartitions(domainName, taskList.GetName(), taskListType)
return lb.pickPartition(taskList, forwardedFrom, n)
maxChildrenPerNode := lb.maxChildrenPerNode(domainName, taskList.GetName(), taskListType)

return lb.pickPartition(taskList, forwardedFrom, n, maxChildrenPerNode)
}

func (lb *defaultLoadBalancer) pickPartition(
taskList types.TaskList,
forwardedFrom string,
nPartitions int,
maxChildrenPerNode int,
) string {

if forwardedFrom != "" || taskList.GetKind() == types.TaskListKindSticky {
Expand All @@ -129,14 +133,22 @@ func (lb *defaultLoadBalancer) pickPartition(
return taskList.GetName()
}

if nPartitions <= 0 {
return taskList.GetName()
}

p := rand.Intn(nPartitions)
p := lb.generateRandomPartitionID(nPartitions, maxChildrenPerNode)
if p == 0 {
return taskList.GetName()
}

return fmt.Sprintf("%v%v/%v", common.ReservedTaskListPrefix, taskList.GetName(), p)
}

func (lb *defaultLoadBalancer) generateRandomPartitionID(nPartitions int, maxChildrenPerNode int) int {
if nPartitions <= 0 {
return 0
}
if nPartitions == 1 {
return 1
}
largestParent := int(nPartitions / maxChildrenPerNode)
p := rand.Intn(nPartitions - largestParent - 1)
return p + largestParent + 1
}
89 changes: 82 additions & 7 deletions client/matching/loadbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ import (
"github.com/uber/cadence/common/types"
)

func TestConstructor(t *testing.T) {
assert.NotPanics(t, func() {
lb := NewLoadBalancer(func(string) (string, error) { return "", nil }, dynamicconfig.NewNopCollection())
assert.NotNil(t, lb)
})
}

func Test_defaultLoadBalancer_PickReadPartition(t *testing.T) {
type fields struct {
nReadPartitions dynamicconfig.IntPropertyFnWithTaskListInfoFilters
Expand Down Expand Up @@ -72,9 +79,10 @@ func Test_defaultLoadBalancer_PickReadPartition(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
lb := &defaultLoadBalancer{
nReadPartitions: tt.fields.nReadPartitions,
nWritePartitions: tt.fields.nWritePartitions,
domainIDToName: tt.fields.domainIDToName,
nReadPartitions: tt.fields.nReadPartitions,
nWritePartitions: tt.fields.nWritePartitions,
domainIDToName: tt.fields.domainIDToName,
maxChildrenPerNode: func(domain string, taskList string, taskType int) int { return 20 },
}
for i := 0; i < 100; i++ {
got := lb.PickReadPartition(tt.args.domainID, tt.args.taskList, tt.args.taskListType, tt.args.forwardedFrom)
Expand Down Expand Up @@ -154,9 +162,10 @@ func Test_defaultLoadBalancer_PickWritePartition(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
lb := &defaultLoadBalancer{
nReadPartitions: tt.fields.nReadPartitions,
nWritePartitions: tt.fields.nWritePartitions,
domainIDToName: tt.fields.domainIDToName,
nReadPartitions: tt.fields.nReadPartitions,
nWritePartitions: tt.fields.nWritePartitions,
domainIDToName: tt.fields.domainIDToName,
maxChildrenPerNode: func(domain string, taskList string, taskType int) int { return 20 },
}
for i := 0; i < 100; i++ {
got := lb.PickWritePartition(tt.args.domainID, tt.args.taskList, tt.args.taskListType, tt.args.forwardedFrom)
Expand Down Expand Up @@ -224,6 +233,18 @@ func Test_defaultLoadBalancer_pickPartition(t *testing.T) {
},
want: common.ReservedTaskListPrefix + "taskList3",
},
// This behaviour will be preserved, but this is being removed as an expected value
// for the partitioning. The function to generate a partition count *should never
// send traffic to the root partition directly* so therefore this is an invalid value.
// All traffic should be directed to root partitions and then only if there's a problem
// finding an immediate match, to therefore forward to the root partition.
//
// The reason for this is that it adds needless complexity to the partitioned tasklists -
// it's hard to follow what's going on when 1/n of traffic is being forwarded directly and the
// rest is being forwarded, so spotting misconfigurations due to overpartitioning is harder
// and it introduces additional load on the root partition which risks being a heavy contention
// point if the number of partitions is too great for the number of pollers (a very
// frequent problem with zonal isolation).
{
name: "Test: nPartitions <= 0",
fields: fields{},
Expand All @@ -246,8 +267,62 @@ func Test_defaultLoadBalancer_pickPartition(t *testing.T) {
nWritePartitions: tt.fields.nWritePartitions,
domainIDToName: tt.fields.domainIDToName,
}
got := lb.pickPartition(tt.args.taskList, tt.args.forwardedFrom, tt.args.nPartitions)
got := lb.pickPartition(tt.args.taskList, tt.args.forwardedFrom, tt.args.nPartitions, 20)
assert.Equal(t, tt.want, got)
})
}
}

func TestGeneratingAPartitionCount(t *testing.T) {

tests := map[string]struct {
numberOfPartitions int
expectedResultLowerBound int
expectedResultUpperBound int
}{
"valid partition count = 5, expected values are between 1 to 4": {
numberOfPartitions: 5,
expectedResultLowerBound: 1,
expectedResultUpperBound: 4,
},
"min sane partition count = 2 expected values are between 1 to 2": {
numberOfPartitions: 2,
expectedResultLowerBound: 1,
expectedResultUpperBound: 2,
},
"large partition count": {
numberOfPartitions: 30,
expectedResultLowerBound: 1,
expectedResultUpperBound: 29,
},
"weird, probably a mistake, partition count 1, just ensure that everything is sent to root": {
numberOfPartitions: 1,
expectedResultLowerBound: 0,
expectedResultUpperBound: 1,
},
"weird, probably a mistake, partition count 0, just ensure that everything is sent to root": {
numberOfPartitions: 0,
expectedResultLowerBound: 0,
expectedResultUpperBound: 0,
},
"invalid partition count, just ensure that everything is sent to root": {
numberOfPartitions: -1,
expectedResultLowerBound: 0,
expectedResultUpperBound: 0,
},
}

for name, td := range tests {
t.Run(name, func(t *testing.T) {
for i := 0; i < 100; i++ {
lb := defaultLoadBalancer{}

p := lb.generateRandomPartitionID(td.numberOfPartitions, 20)

assert.LessOrEqual(t, p, td.expectedResultUpperBound)

assert.GreaterOrEqual(t, p, td.expectedResultLowerBound)
}
})
}
}
19 changes: 19 additions & 0 deletions config/dynamicconfig/development.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,25 @@ history.enableStrongIdempotency:
constraints: {}
frontend.globalRatelimiterMode:
- value: local

matching.numTasklistWritePartitions:
- value: 10
matching.numTasklistReadPartitions:
- value: 10

system.allIsolationGroups:
- value:
- "zone1"
- "zone2"
- "zone3"
- "zone4"
- "zone5"
- "zone6"
- "zone7"

system.enableTasklistIsolation:
- value: true

frontend.validSearchAttributes:
- value:
BinaryChecksums: 1
Expand Down
Loading