Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev basic msg relay test #105

Merged
merged 4 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/cgw_connection_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,10 @@ impl CGWConnectionServer {
debug!("Received {num_of_msg_read} messages from NB API, processing...");

let partition_mapping = self.nb_api_client.get_partition_to_local_shard_mapping();
debug!("Kafka partitions idx:key mapping info: {:?}", partition_mapping);
debug!(
"Kafka partitions idx:key mapping info: {:?}",
partition_mapping
);
if !partition_mapping.is_empty() {
partition_array_idx += 1;
if partition_array_idx >= partition_mapping.len() {
Expand Down
15 changes: 15 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from client_simulator.src.simulation_runner import Device as DeviceSimulator
from kafka_producer.src.producer import Producer as KafkaProducer
from kafka_producer.src.consumer import Consumer as KafkaConsumer
from kafka_producer.src.admin import Admin as KafkaAdmin
from psql_client.psql_client import PostgreSQLClient as PSQLClient
from redis_client.redis_client import RedisClient as RedisClient
import requests
Expand All @@ -26,6 +27,10 @@ def default_kafka_group() -> str:
def default_shard_id() -> int:
return 0

@staticmethod
def default_producer_topic() -> str:
return 'CnC'

def __init__(self):
device = DeviceSimulator(
mac=self.default_dev_sim_mac(),
Expand Down Expand Up @@ -53,9 +58,11 @@ def __init__(self):

producer = KafkaProducer(db='localhost:9092', topic='CnC')
consumer = KafkaConsumer(db='localhost:9092', topic='CnC_Res', consumer_timeout=12000)
admin = KafkaAdmin(host='localhost', port=9092)

self.kafka_producer = producer
self.kafka_consumer = consumer
self.kafka_admin = admin

psql_client = PSQLClient(host="localhost", port=5432, database="cgw", user="cgw", password="123")
self.psql_client = psql_client
Expand Down Expand Up @@ -90,6 +97,7 @@ def test_context():

ctx.kafka_producer.disconnect()
ctx.kafka_consumer.disconnect()
ctx.kafka_admin.disconnect()

ctx.psql_client.disconnect()
ctx.redis_client.disconnect()
Expand Down Expand Up @@ -118,6 +126,13 @@ def kafka_probe(test_context):
# We have to clear any messages before we can work with kafka
test_context.kafka_consumer.flush()

@pytest.fixture(scope='function')
def kafka_admin_probe(test_context):
try:
test_context.kafka_admin.connect()
except:
raise Exception('Failed to connect to Kafka broker!')

@pytest.fixture(scope='function')
def device_sim_connect(test_context):
# Make sure we initiate connect;
Expand Down
2 changes: 2 additions & 0 deletions tests/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ ln -sf ../utils/client_simulator/sim_data sim_data
ln -sf ../utils/kafka_producer/kafka_data kafka_data
ln -sf ../utils/cert_generator/certs/client/ certs
ln -sf ../utils/cert_generator/certs/ca/ ca-certs
ln -sf ../utils/client_simulator/ client_simulator
ln -sf ../utils/kafka_producer/ kafka_producer
ln -sf ../utils/psql_client/ psql_client
ln -sf ../utils/redis_client/ redis_client

Expand Down
Loading
Loading