Skip to content

Commit

Permalink
Added group capacity overflow test case & fix metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
SviatoslavBoichuk committed Nov 20, 2024
1 parent dd01a13 commit cac3c9a
Show file tree
Hide file tree
Showing 4 changed files with 218 additions and 11 deletions.
15 changes: 12 additions & 3 deletions src/cgw_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,8 @@ pub enum CGWMetricsCounterType {

pub enum CGWMetricsCounterOpType {
Inc,
#[allow(dead_code)]
IncBy(i64),
Dec,
#[allow(dead_code)]
DecBy(i64),
Set(i64),
}
Expand Down Expand Up @@ -244,14 +242,25 @@ impl CGWMetrics {
CGWMetricsCounterOpType::Dec => {
counter.dec();
}
CGWMetricsCounterOpType::IncBy(inc_val) => {
counter.add(inc_val);
}
CGWMetricsCounterOpType::DecBy(dec_val) => {
counter.sub(dec_val);
}
_ => {}
}
} else if let Ok(counter) = IntGauge::new(
format!("cgw_group_{group_id}_infras_assigned_num"),
"Number of infras assigned to this particular group",
) {
if REGISTRY.register(Box::new(counter.clone())).is_ok() {
counter.set(1);
match op {
CGWMetricsCounterOpType::Inc => counter.set(1),
CGWMetricsCounterOpType::IncBy(set_val)
| CGWMetricsCounterOpType::Set(set_val) => counter.set(set_val),
_ => counter.set(0),
}
lock.insert(group_id, counter);
} else {
error!("Failed to register GroupInfrasAssignedNum metric for GID {group_id}!");
Expand Down
17 changes: 11 additions & 6 deletions src/cgw_remote_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ impl CGWRemoteDiscovery {

CGWMetrics::get_ref().change_counter(
CGWMetricsCounterType::GroupsThreshold,
CGWMetricsCounterOpType::Set(app_args.cgw_groups_capacity.into()),
CGWMetricsCounterOpType::Set(app_args.cgw_groups_threshold.into()),
);

let redis_req_data: Vec<String> = redisdb_shard_info.into();
Expand Down Expand Up @@ -601,7 +601,9 @@ impl CGWRemoteDiscovery {
gid: i32,
incremet_value: i32,
) -> Result<()> {
debug!("Incrementing assigned infras num group_id_{gid}");
debug!(
"Incrementing assigned infras num group_id_{gid}: increment value: {incremet_value}"
);

let mut con = self.redis_client.clone();
let res: RedisResult<()> = redis::cmd("HINCRBY")
Expand All @@ -620,11 +622,12 @@ impl CGWRemoteDiscovery {
));
}

debug!("Incrementing assigned infras num group_id_{gid}: increment value: {incremet_value} - metrics");
CGWMetrics::get_ref()
.change_group_counter(
gid,
CGWMetricsCounterType::GroupInfrasAssignedNum,
CGWMetricsCounterOpType::Inc,
CGWMetricsCounterOpType::IncBy(incremet_value as i64),
)
.await;

Expand All @@ -636,7 +639,9 @@ impl CGWRemoteDiscovery {
gid: i32,
decremet_value: i32,
) -> Result<()> {
debug!("Decrementing assigned infras num group_id_{gid}");
debug!(
"Decrementing assigned infras num group_id_{gid}: decrement_value: {decremet_value}"
);

let mut con = self.redis_client.clone();
let res: RedisResult<()> = redis::cmd("HINCRBY")
Expand All @@ -659,7 +664,7 @@ impl CGWRemoteDiscovery {
.change_group_counter(
gid,
CGWMetricsCounterType::GroupInfrasAssignedNum,
CGWMetricsCounterOpType::Dec,
CGWMetricsCounterOpType::DecBy(decremet_value as i64),
)
.await;

Expand Down Expand Up @@ -1005,7 +1010,7 @@ impl CGWRemoteDiscovery {
.increment_group_assigned_infras_num(gid, assigned_infras_num)
.await
{
error!("create_ifras_list: failed to decrement assigned infras num! Error: {e}");
error!("create_ifras_list: failed to increment assigned infras num! Error: {e}");
}

if !failed_infras.is_empty() {
Expand Down
28 changes: 28 additions & 0 deletions tests/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,31 @@ def cgw_metrics_get_group_infras_assigned_num(group_id: int) -> int:
print(f"Group {group_id} infras assigned num not found.")

return group_infras_assigned_num


def cgw_metrics_get_groups_capacity() -> int:
groups_capacity = 0
metrics = cgw_metric_get()

match = re.search(r"cgw_groups_capacity (\d+)", metrics)
if match:
groups_capacity = int(match.group(1))
print(f"Groups capacity: {groups_capacity}")
else:
print("Groups capacity.")

return groups_capacity


def cgw_metrics_get_groups_threshold() -> int:
groups_threshold = 0
metrics = cgw_metric_get()

match = re.search(r"cgw_groups_threshold (\d+)", metrics)
if match:
groups_threshold = int(match.group(1))
print(f"Groups assigned num: {groups_threshold}")
else:
print("Groups assigned num not found.")

return groups_threshold
169 changes: 167 additions & 2 deletions tests/test_cgw_infra_groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
import random

from metrics import cgw_metrics_get_active_shards_num, \
cgw_metrics_get_groups_assigned_num

cgw_metrics_get_groups_assigned_num, \
cgw_metrics_get_groups_capacity, \
cgw_metrics_get_groups_threshold

class TestCgwInfraGroup:
@pytest.mark.usefixtures("test_context",
Expand All @@ -18,6 +19,7 @@ def test_single_infra_group_add_del(self, test_context):
f'Cannot create default group: kafka consumer is not connected to Kafka'

assert cgw_metrics_get_active_shards_num() == 1
assert cgw_metrics_get_groups_assigned_num() == 0

uuid_val = uuid.uuid4()
group_id = 100
Expand Down Expand Up @@ -70,6 +72,7 @@ def test_multiple_infra_group_add_del(self, test_context):
f'Cannot create default group: kafka consumer is not connected to Kafka'

assert cgw_metrics_get_active_shards_num() == 1
assert cgw_metrics_get_groups_assigned_num() == 0

groups_num = random.randint(1, 10)

Expand All @@ -94,6 +97,9 @@ def test_multiple_infra_group_add_del(self, test_context):

assert cgw_metrics_get_groups_assigned_num() == (group + 1)

# Make sure assigned groups number from CGW side is expected
assert cgw_metrics_get_groups_assigned_num() == groups_num

for group in range(0, groups_num):
# Delete single group
uuid_val = uuid.uuid4()
Expand All @@ -114,6 +120,9 @@ def test_multiple_infra_group_add_del(self, test_context):
raise Exception('Infra group creation failed!')

assert cgw_metrics_get_groups_assigned_num() == (groups_num - (group + 1))

# Make sure after clean-up assigned group num is zero
assert cgw_metrics_get_groups_assigned_num() == 0


@pytest.mark.usefixtures("test_context",
Expand All @@ -127,6 +136,7 @@ def test_create_existing_infra_group(self, test_context):
f'Cannot create default group: kafka consumer is not connected to Kafka'

assert cgw_metrics_get_active_shards_num() == 1
assert cgw_metrics_get_groups_assigned_num() == 0

uuid_val = uuid.uuid4()
group_id = 100
Expand Down Expand Up @@ -196,6 +206,7 @@ def test_remove_not_existing_infra_group(self, test_context):
f'Cannot create default group: kafka consumer is not connected to Kafka'

assert cgw_metrics_get_active_shards_num() == 1
assert cgw_metrics_get_groups_assigned_num() == 0

group_id = 100
uuid_val = uuid.uuid4()
Expand Down Expand Up @@ -227,6 +238,7 @@ def test_single_infra_group_add_del_to_shard(self, test_context):
f'Cannot create default group: kafka consumer is not connected to Kafka'

assert cgw_metrics_get_active_shards_num() == 1
assert cgw_metrics_get_groups_assigned_num() == 0

uuid_val = uuid.uuid4()
group_id = 100
Expand Down Expand Up @@ -269,6 +281,71 @@ def test_single_infra_group_add_del_to_shard(self, test_context):
assert cgw_metrics_get_groups_assigned_num() == 0


@pytest.mark.usefixtures("test_context",
"cgw_probe",
"kafka_probe")
def test_multiple_infra_group_add_del_to_shard(self, test_context):
assert test_context.kafka_producer.is_connected(),\
f'Cannot create default group: kafka producer is not connected to Kafka'

assert test_context.kafka_consumer.is_connected(),\
f'Cannot create default group: kafka consumer is not connected to Kafka'

assert cgw_metrics_get_active_shards_num() == 1
assert cgw_metrics_get_groups_assigned_num() == 0

shard_id = 0
groups_num = random.randint(1, 10)

for group in range(0, groups_num):
uuid_val = uuid.uuid4()
group_id = (100 + group)

# Create single group
test_context.kafka_producer.handle_single_group_create_to_shard(str(group_id), shard_id, uuid_val.int)
ret_msg = test_context.kafka_consumer.get_result_msg(uuid_val.int)
if not ret_msg:
print('Failed to receive create group result, was expecting ' + str(uuid_val.int) + ' uuid reply')
raise Exception('Failed to receive create group result when expected')

assert (ret_msg.value['type'] == 'infrastructure_group_create_response')
assert (int(ret_msg.value['infra_group_id']) == group_id)
assert ((uuid.UUID(ret_msg.value['uuid']).int) == (uuid_val.int))

if ret_msg.value['success'] is False:
print(ret_msg.value['error_message'])
raise Exception('Infra group creation failed!')

assert cgw_metrics_get_groups_assigned_num() == (group + 1)

# Make sure assigned groups number from CGW side is expected
assert cgw_metrics_get_groups_assigned_num() == groups_num

for group in range(0, groups_num):
# Delete single group
uuid_val = uuid.uuid4()
group_id = (100 + group)

test_context.kafka_producer.handle_single_group_delete(str(group_id), uuid_val.int)
ret_msg = test_context.kafka_consumer.get_result_msg(uuid_val.int)
if not ret_msg:
print('Failed to receive delete group result, was expecting ' + str(uuid_val.int) + ' uuid reply')
raise Exception('Failed to receive delete group result when expected')

assert (ret_msg.value['type'] == 'infrastructure_group_delete_response')
assert (int(ret_msg.value['infra_group_id']) == group_id)
assert ((uuid.UUID(ret_msg.value['uuid']).int) == (uuid_val.int))

if ret_msg.value['success'] is False:
print(ret_msg.value['error_message'])
raise Exception('Infra group creation failed!')

assert cgw_metrics_get_groups_assigned_num() == (groups_num - (group + 1))

# Make sure after clean-up assigned group num is zero
assert cgw_metrics_get_groups_assigned_num() == 0


@pytest.mark.usefixtures("test_context",
"cgw_probe",
"kafka_probe")
Expand All @@ -280,6 +357,7 @@ def test_single_infra_group_add_to_not_existing_shard(self, test_context):
f'Cannot create default group: kafka consumer is not connected to Kafka'

assert cgw_metrics_get_active_shards_num() == 1
assert cgw_metrics_get_groups_assigned_num() == 0

uuid_val = uuid.uuid4()
group_id = 100
Expand All @@ -301,3 +379,90 @@ def test_single_infra_group_add_to_not_existing_shard(self, test_context):
raise Exception('Infra group creation completed, while expected to be failed!')

assert cgw_metrics_get_groups_assigned_num() == 0


@pytest.mark.usefixtures("test_context",
"cgw_probe",
"kafka_probe")
def test_infra_group_capacity_overflow(self, test_context):
assert test_context.kafka_producer.is_connected(),\
f'Cannot create default group: kafka producer is not connected to Kafka'

assert test_context.kafka_consumer.is_connected(),\
f'Cannot create default group: kafka consumer is not connected to Kafka'

assert cgw_metrics_get_active_shards_num() == 1
assert cgw_metrics_get_groups_assigned_num() == 0

groups_capacity = cgw_metrics_get_groups_capacity()
groups_threshold = cgw_metrics_get_groups_threshold()

groups_num = (groups_capacity + groups_threshold)
# Create maximum allowed groups number
for group in range(0, groups_num):
uuid_val = uuid.uuid4()
group_id = (100 + group)

# Create single group
test_context.kafka_producer.handle_single_group_create(str(group_id), uuid_val.int)
ret_msg = test_context.kafka_consumer.get_result_msg(uuid_val.int)
if not ret_msg:
print('Failed to receive create group result, was expecting ' + str(uuid_val.int) + ' uuid reply')
raise Exception('Failed to receive create group result when expected')

assert (ret_msg.value['type'] == 'infrastructure_group_create_response')
assert (int(ret_msg.value['infra_group_id']) == group_id)
assert ((uuid.UUID(ret_msg.value['uuid']).int) == (uuid_val.int))

if ret_msg.value['success'] is False:
print(ret_msg.value['error_message'])
raise Exception('Infra group creation failed!')

assert cgw_metrics_get_groups_assigned_num() == (group + 1)

# Make sure we reach MAX groups number assigned to CGW
assert cgw_metrics_get_groups_assigned_num() == groups_num

# Try to create additional group to simulate group capacity overflow
group_to_fail_id = 2024
uuid_val = uuid.uuid4()
test_context.kafka_producer.handle_single_group_create(str(group_to_fail_id), uuid_val.int)
ret_msg = test_context.kafka_consumer.get_result_msg(uuid_val.int)
if not ret_msg:
print('Failed to receive create group result, was expecting ' + str(uuid_val.int) + ' uuid reply')
raise Exception('Failed to receive create group result when expected')

assert (ret_msg.value['type'] == 'infrastructure_group_create_response')
assert (int(ret_msg.value['infra_group_id']) == group_to_fail_id)
assert ((uuid.UUID(ret_msg.value['uuid']).int) == (uuid_val.int))

if ret_msg.value['success'] is True:
print(ret_msg.value['error_message'])
raise Exception('Infra group creation completed, while expected to be failed due to capacity overflow!')

# Double check groups number assigned to CGW
assert cgw_metrics_get_groups_assigned_num() == groups_num

# Cleanup all the rest groups
for group in range(0, groups_num):
# Delete single group
uuid_val = uuid.uuid4()
group_id = (100 + group)

test_context.kafka_producer.handle_single_group_delete(str(group_id), uuid_val.int)
ret_msg = test_context.kafka_consumer.get_result_msg(uuid_val.int)
if not ret_msg:
print('Failed to receive delete group result, was expecting ' + str(uuid_val.int) + ' uuid reply')
raise Exception('Failed to receive delete group result when expected')

assert (ret_msg.value['type'] == 'infrastructure_group_delete_response')
assert (int(ret_msg.value['infra_group_id']) == group_id)
assert ((uuid.UUID(ret_msg.value['uuid']).int) == (uuid_val.int))

if ret_msg.value['success'] is False:
print(ret_msg.value['error_message'])
raise Exception('Infra group creation failed!')

assert cgw_metrics_get_groups_assigned_num() == (groups_num - (group + 1))

assert cgw_metrics_get_groups_assigned_num() == 0

0 comments on commit cac3c9a

Please sign in to comment.