Skip to content

Commit

Permalink
refactored kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
Er3shk1gal committed Nov 21, 2024
1 parent 27a256f commit a93c227
Show file tree
Hide file tree
Showing 14 changed files with 111 additions and 42 deletions.
11 changes: 8 additions & 3 deletions ApiGatewayService/Kafka/KafkaRequestService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ public async Task<bool> Produce(string topicName, Message<string, string> messag
bool IsTopicExists = IsTopicAvailable(topicName);
if (IsTopicExists && IsTopicPendingMessageBusExist( responseTopic))
{
var deliveryResult = await _producer.ProduceAsync(topicName, message);
var deliveryResult = await _producer.ProduceAsync(
new TopicPartition(topicName, new Partition(0));
if (deliveryResult.Status == PersistenceStatus.Persisted)
{
_logger.LogInformation("Message delivery status: Persisted {Result}", deliveryResult.Value);
Expand All @@ -200,7 +201,8 @@ public async Task<bool> Produce(string topicName, Message<string, string> messag
bool IsTopicCreated = _kafkaTopicManager.CreateTopic(topicName, Convert.ToInt32(Environment.GetEnvironmentVariable("PARTITIONS_STANDART")), Convert.ToInt16(Environment.GetEnvironmentVariable("REPLICATION_FACTOR_STANDART")));
if (IsTopicCreated && IsTopicPendingMessageBusExist( responseTopic))
{
var deliveryResult = await _producer.ProduceAsync(topicName, message);
var deliveryResult = await _producer.ProduceAsync(
new TopicPartition(topicName, new Partition(0));
if (deliveryResult.Status == PersistenceStatus.Persisted)
{
_logger.LogInformation("Message delivery status: Persisted {Result}", deliveryResult.Value);
Expand Down Expand Up @@ -235,7 +237,10 @@ private bool IsTopicPendingMessageBusExist(string responseTopic)
private async Task Consume(IConsumer<string,string> localConsumer,string topicName)
{
topicCount++;
localConsumer.Subscribe(topicName);
var partitions = new List<TopicPartition>();
partitions.Add(new TopicPartition(topicName, 0));

localConsumer.Assign(partitions);
while (true)
{
ConsumeResult<string, string> result = localConsumer.Consume();
Expand Down
11 changes: 8 additions & 3 deletions ApiGatewayService/Kafka/KafkaService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ protected void ConfigureConsumer(string topicName)
_consumer = new ConsumerBuilder<string, string>(config).Build();
if(IsTopicAvailable(topicName))
{
_consumer.Subscribe(topicName);
var partitions = new List<TopicPartition>();
partitions.Add(new TopicPartition(topicName, 0));

_consumer.Assign(partitions);
return;
}
throw new ConsumerTopicUnavailableException("Topic unavailable");
Expand Down Expand Up @@ -78,7 +81,8 @@ public async Task<bool> Produce( string topicName,Message<string, string> messag
bool IsTopicExists = IsTopicAvailable(topicName);
if (IsTopicExists)
{
var deliveryResult = await _producer.ProduceAsync(topicName, message);
var deliveryResult = await _producer.ProduceAsync(
new TopicPartition(topicName, new Partition(0));
if (deliveryResult.Status == PersistenceStatus.Persisted)
{

Expand All @@ -94,7 +98,8 @@ public async Task<bool> Produce( string topicName,Message<string, string> messag
bool IsTopicCreated = _kafkaTopicManager.CreateTopic(topicName, Convert.ToInt32(Environment.GetEnvironmentVariable("PARTITIONS_STANDART")), Convert.ToInt16(Environment.GetEnvironmentVariable("REPLICATION_FACTOR_STANDART")));
if (IsTopicCreated)
{
var deliveryResult = await _producer.ProduceAsync(topicName, message);
var deliveryResult = await _producer.ProduceAsync(
new TopicPartition(topicName, new Partition(0));
if (deliveryResult.Status == PersistenceStatus.Persisted)
{
_logger.LogInformation("Message delivery status: Persisted {Result}", deliveryResult.Value);
Expand Down
11 changes: 8 additions & 3 deletions AuthService/Kafka/KafkaRequestService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ public async Task<bool> Produce(string topicName, Message<string, string> messag
bool IsTopicExists = IsTopicAvailable(topicName);
if (IsTopicExists && IsTopicPendingMessageBusExist( responseTopic))
{
var deliveryResult = await _producer.ProduceAsync(topicName, message);
var deliveryResult = await _producer.ProduceAsync(
new TopicPartition(topicName, new Partition(0));
if (deliveryResult.Status == PersistenceStatus.Persisted)
{
_logger.LogInformation("Message delivery status: Persisted {Result}", deliveryResult.Value);
Expand All @@ -200,7 +201,8 @@ public async Task<bool> Produce(string topicName, Message<string, string> messag
bool IsTopicCreated = _kafkaTopicManager.CreateTopic(topicName, Convert.ToInt32(Environment.GetEnvironmentVariable("PARTITIONS_STANDART")), Convert.ToInt16(Environment.GetEnvironmentVariable("REPLICATION_FACTOR_STANDART")));
if (IsTopicCreated && IsTopicPendingMessageBusExist( responseTopic))
{
var deliveryResult = await _producer.ProduceAsync(topicName, message);
var deliveryResult = await _producer.ProduceAsync(
new TopicPartition(topicName, new Partition(0));
if (deliveryResult.Status == PersistenceStatus.Persisted)
{
_logger.LogInformation("Message delivery status: Persisted {Result}", deliveryResult.Value);
Expand Down Expand Up @@ -235,7 +237,10 @@ private bool IsTopicPendingMessageBusExist(string responseTopic)
private async Task Consume(IConsumer<string,string> localConsumer,string topicName)
{
topicCount++;
localConsumer.Subscribe(topicName);
var partitions = new List<TopicPartition>();
partitions.Add(new TopicPartition(topicName, 0));

localConsumer.Assign(partitions);
while (true)
{
ConsumeResult<string, string> result = localConsumer.Consume();
Expand Down
11 changes: 8 additions & 3 deletions AuthService/Kafka/KafkaService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ protected void ConfigureConsumer(string topicName)
_consumer = new ConsumerBuilder<string, string>(config).Build();
if(IsTopicAvailable(topicName))
{
_consumer.Subscribe(topicName);
var partitions = new List<TopicPartition>();
partitions.Add(new TopicPartition(topicName, 0));

_consumer.Assign(partitions);
return;
}
throw new ConsumerTopicUnavailableException("Topic unavailable");
Expand Down Expand Up @@ -77,7 +80,8 @@ public async Task<bool> Produce( string topicName,Message<string, string> messag
bool IsTopicExists = IsTopicAvailable(topicName);
if (IsTopicExists)
{
var deliveryResult = await _producer.ProduceAsync(topicName, message);
var deliveryResult = await _producer.ProduceAsync(
new TopicPartition(topicName, new Partition(0));
if (deliveryResult.Status == PersistenceStatus.Persisted)
{

Expand All @@ -93,7 +97,8 @@ public async Task<bool> Produce( string topicName,Message<string, string> messag
bool IsTopicCreated = _kafkaTopicManager.CreateTopic(topicName, Convert.ToInt32(Environment.GetEnvironmentVariable("PARTITIONS_STANDART")), Convert.ToInt16(Environment.GetEnvironmentVariable("REPLICATION_FACTOR_STANDART")));
if (IsTopicCreated)
{
var deliveryResult = await _producer.ProduceAsync(topicName, message);
var deliveryResult = await _producer.ProduceAsync(
new TopicPartition(topicName, new Partition(0));
if (deliveryResult.Status == PersistenceStatus.Persisted)
{
_logger.LogInformation("Message delivery status: Persisted {Result}", deliveryResult.Value);
Expand Down
10 changes: 7 additions & 3 deletions EntertaimentService/Kafka/KafkaRequestService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,8 @@ public async Task<bool> Produce(string topicName, Message<string, string> messag
bool IsTopicExists = IsTopicAvailable(topicName);
if (IsTopicExists && IsTopicPendingMessageBusExist( responseTopic))
{
var deliveryResult = await _producer.ProduceAsync(topicName, message);
var deliveryResult = await _producer.ProduceAsync(
new TopicPartition(topicName, new Partition(0));;
if (deliveryResult.Status == PersistenceStatus.Persisted)
{
_logger.LogInformation("Message delivery status: Persisted {Result}", deliveryResult.Value);
Expand All @@ -201,7 +202,8 @@ public async Task<bool> Produce(string topicName, Message<string, string> messag
bool IsTopicCreated = _kafkaTopicManager.CreateTopic(topicName, Convert.ToInt32(Environment.GetEnvironmentVariable("PARTITIONS_STANDART")), Convert.ToInt16(Environment.GetEnvironmentVariable("REPLICATION_FACTOR_STANDART")));
if (IsTopicCreated && IsTopicPendingMessageBusExist( responseTopic))
{
var deliveryResult = await _producer.ProduceAsync(topicName, message);
var deliveryResult = await _producer.ProduceAsync(
new TopicPartition(topicName, new Partition(0));;
if (deliveryResult.Status == PersistenceStatus.Persisted)
{
_logger.LogInformation("Message delivery status: Persisted {Result}", deliveryResult.Value);
Expand Down Expand Up @@ -236,7 +238,9 @@ private bool IsTopicPendingMessageBusExist(string responseTopic)
private async Task Consume(IConsumer<string,string> localConsumer,string topicName)
{
topicCount++;
localConsumer.Subscribe(topicName);
var partitions = new List<TopicPartition>();
partitions.Add(new TopicPartition(topicName, 0));
localConsumer.Assign(partitions);
while (true)
{
ConsumeResult<string, string> result = localConsumer.Consume();
Expand Down
11 changes: 8 additions & 3 deletions EntertaimentService/Kafka/KafkaService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ protected void ConfigureConsumer(string topicName)
_consumer = new ConsumerBuilder<string, string>(config).Build();
if(IsTopicAvailable(topicName))
{
_consumer.Subscribe(topicName);
var partitions = new List<TopicPartition>();
partitions.Add(new TopicPartition(topicName, 0));

_consumer.Assign(partitions);
return;
}
throw new ConsumerTopicUnavailableException("Topic unavailable");
Expand Down Expand Up @@ -77,7 +80,8 @@ public async Task<bool> Produce( string topicName,Message<string, string> messag
bool IsTopicExists = IsTopicAvailable(topicName);
if (IsTopicExists)
{
var deliveryResult = await _producer.ProduceAsync(topicName, message);
var deliveryResult = await _producer.ProduceAsync(
new TopicPartition(topicName, new Partition(0));
if (deliveryResult.Status == PersistenceStatus.Persisted)
{

Expand All @@ -93,7 +97,8 @@ public async Task<bool> Produce( string topicName,Message<string, string> messag
bool IsTopicCreated = _kafkaTopicManager.CreateTopic(topicName, Convert.ToInt32(Environment.GetEnvironmentVariable("PARTITIONS_STANDART")), Convert.ToInt16(Environment.GetEnvironmentVariable("REPLICATION_FACTOR_STANDART")));
if (IsTopicCreated)
{
var deliveryResult = await _producer.ProduceAsync(topicName, message);
var deliveryResult = await _producer.ProduceAsync(
new TopicPartition(topicName, new Partition(0));
if (deliveryResult.Status == PersistenceStatus.Persisted)
{
_logger.LogInformation("Message delivery status: Persisted {Result}", deliveryResult.Value);
Expand Down
11 changes: 8 additions & 3 deletions MailService/Kafka/KafkaRequestService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ public async Task<bool> Produce(string topicName, Message<string, string> messag
bool IsTopicExists = IsTopicAvailable(topicName);
if (IsTopicExists && IsTopicPendingMessageBusExist( responseTopic))
{
var deliveryResult = await _producer.ProduceAsync(topicName, message);
var deliveryResult = await _producer.ProduceAsync(
new TopicPartition(topicName, new Partition(0));
if (deliveryResult.Status == PersistenceStatus.Persisted)
{
_logger.LogInformation("Message delivery status: Persisted {Result}", deliveryResult.Value);
Expand All @@ -200,7 +201,8 @@ public async Task<bool> Produce(string topicName, Message<string, string> messag
bool IsTopicCreated = _kafkaTopicManager.CreateTopic(topicName, Convert.ToInt32(Environment.GetEnvironmentVariable("PARTITIONS_STANDART")), Convert.ToInt16(Environment.GetEnvironmentVariable("REPLICATION_FACTOR_STANDART")));
if (IsTopicCreated && IsTopicPendingMessageBusExist( responseTopic))
{
var deliveryResult = await _producer.ProduceAsync(topicName, message);
var deliveryResult = await _producer.ProduceAsync(
new TopicPartition(topicName, new Partition(0));
if (deliveryResult.Status == PersistenceStatus.Persisted)
{
_logger.LogInformation("Message delivery status: Persisted {Result}", deliveryResult.Value);
Expand Down Expand Up @@ -235,7 +237,10 @@ private bool IsTopicPendingMessageBusExist(string responseTopic)
private async Task Consume(IConsumer<string,string> localConsumer,string topicName)
{
topicCount++;
localConsumer.Subscribe(topicName);
var partitions = new List<TopicPartition>();
partitions.Add(new TopicPartition(topicName, 0));

localConsumer.Assign(partitions);
while (true)
{
ConsumeResult<string, string> result = localConsumer.Consume();
Expand Down
11 changes: 8 additions & 3 deletions MailService/Kafka/KafkaService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ protected void ConfigureConsumer(string topicName)
_consumer = new ConsumerBuilder<string, string>(config).Build();
if(IsTopicAvailable(topicName))
{
_consumer.Subscribe(topicName);
var partitions = new List<TopicPartition>();
partitions.Add(new TopicPartition(topicName, 0));

_consumer.Assign(partitions);
return;
}
throw new ConsumerTopicUnavailableException("Topic unavailable");
Expand Down Expand Up @@ -77,7 +80,8 @@ public async Task<bool> Produce( string topicName,Message<string, string> messag
bool IsTopicExists = IsTopicAvailable(topicName);
if (IsTopicExists)
{
var deliveryResult = await _producer.ProduceAsync(topicName, message);
var deliveryResult = await _producer.ProduceAsync(
new TopicPartition(topicName, new Partition(0));
if (deliveryResult.Status == PersistenceStatus.Persisted)
{

Expand All @@ -93,7 +97,8 @@ public async Task<bool> Produce( string topicName,Message<string, string> messag
bool IsTopicCreated = _kafkaTopicManager.CreateTopic(topicName, Convert.ToInt32(Environment.GetEnvironmentVariable("PARTITIONS_STANDART")), Convert.ToInt16(Environment.GetEnvironmentVariable("REPLICATION_FACTOR_STANDART")));
if (IsTopicCreated)
{
var deliveryResult = await _producer.ProduceAsync(topicName, message);
var deliveryResult = await _producer.ProduceAsync(
new TopicPartition(topicName, new Partition(0));
if (deliveryResult.Status == PersistenceStatus.Persisted)
{
_logger.LogInformation("Message delivery status: Persisted {Result}", deliveryResult.Value);
Expand Down
11 changes: 8 additions & 3 deletions PromoService/Kafka/KafkaRequestService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ public async Task<bool> Produce(string topicName, Message<string, string> messag
bool IsTopicExists = IsTopicAvailable(topicName);
if (IsTopicExists && IsTopicPendingMessageBusExist( responseTopic))
{
var deliveryResult = await _producer.ProduceAsync(topicName, message);
var deliveryResult = await _producer.ProduceAsync(
new TopicPartition(topicName, new Partition(0));
if (deliveryResult.Status == PersistenceStatus.Persisted)
{
_logger.LogInformation("Message delivery status: Persisted {Result}", deliveryResult.Value);
Expand All @@ -200,7 +201,8 @@ public async Task<bool> Produce(string topicName, Message<string, string> messag
bool IsTopicCreated = _kafkaTopicManager.CreateTopic(topicName, Convert.ToInt32(Environment.GetEnvironmentVariable("PARTITIONS_STANDART")), Convert.ToInt16(Environment.GetEnvironmentVariable("REPLICATION_FACTOR_STANDART")));
if (IsTopicCreated && IsTopicPendingMessageBusExist( responseTopic))
{
var deliveryResult = await _producer.ProduceAsync(topicName, message);
var deliveryResult = await _producer.ProduceAsync(
new TopicPartition(topicName, new Partition(0));
if (deliveryResult.Status == PersistenceStatus.Persisted)
{
_logger.LogInformation("Message delivery status: Persisted {Result}", deliveryResult.Value);
Expand Down Expand Up @@ -235,7 +237,10 @@ private bool IsTopicPendingMessageBusExist(string responseTopic)
private async Task Consume(IConsumer<string,string> localConsumer,string topicName)
{
topicCount++;
localConsumer.Subscribe(topicName);
var partitions = new List<TopicPartition>();
partitions.Add(new TopicPartition(topicName, 0));

localConsumer.Assign(partitions);
while (true)
{
ConsumeResult<string, string> result = localConsumer.Consume();
Expand Down
Loading

0 comments on commit a93c227

Please sign in to comment.