Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open api service #7

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions OpenApiService/Exceptions/RequestSender/BuildUriException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace OpenApiService.Exceptions.RequestSender;
[System.Serializable]
public class BuildUriException : RequestSenderException
{
public BuildUriException() { }
public BuildUriException(string message) : base(message) { }
public BuildUriException(string message, System.Exception inner) : base(message, inner) { }

}
14 changes: 14 additions & 0 deletions OpenApiService/Exceptions/RequestSender/RequestSenderException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace OpenApiService.Exceptions.RequestSender;

[System.Serializable]
public class RequestSenderException : System.Exception
{
public RequestSenderException() { }
public RequestSenderException(string message) : base(message) { }
public RequestSenderException(string message, System.Exception inner) : base(message, inner) { }
}
15 changes: 15 additions & 0 deletions OpenApiService/Exceptions/RequestSender/SendRequestException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace OpenApiService.Exceptions.RequestSender;

[System.Serializable]
public class SendRequestException : RequestSenderException
{
public SendRequestException() { }
public SendRequestException(string message) : base(message) { }
public SendRequestException(string message, System.Exception inner) : base(message, inner) { }

}
258 changes: 258 additions & 0 deletions OpenApiService/Kafka/KafkaRequestService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Confluent.Kafka;
using OpenApiService.Kafka;
using OpenApiService.KafkaException;
using OpenApiService.KafkaException.ConsumerException;
using OpenApiService.Kafka.Utils;
using OpenApiService.KafkaException.ConfigurationException;

namespace OpenApiService.Kafka;

public class KafkaRequestService
{
private readonly IProducer<string, string> _producer;
private readonly ILogger<KafkaRequestService> _logger;
private readonly KafkaTopicManager _kafkaTopicManager;
private readonly HashSet<PendingMessagesBus> _pendingMessagesBus;
private readonly HashSet<RecievedMessagesBus> _recievedMessagesBus;
private readonly HashSet<IConsumer<string,string>> _consumerPool;
public KafkaRequestService(
IProducer<string, string> producer,
ILogger<KafkaRequestService> logger,
KafkaTopicManager kafkaTopicManager,
List<string> responseTopics,
List<string> requestsTopics)
{
_producer = producer;
_logger = logger;
_kafkaTopicManager = kafkaTopicManager;
_recievedMessagesBus = ConfigureRecievedMessages(responseTopics);
_pendingMessagesBus = ConfigurePendingMessages(requestsTopics);
_consumerPool = ConfigureConsumers(responseTopics.Count());

}
public void BeginRecieving(List<string> responseTopics)
{
int topicCount = 0;
foreach(var consumer in _consumerPool)
{

Thread thread = new Thread( x=>{
Consume(consumer,responseTopics[topicCount]);
});
thread.Start();
topicCount++;
}
}

private HashSet<IConsumer<string,string>> ConfigureConsumers(int amount)
{
try
{
if(amount<=0)
{
throw new ConfigureConsumersException(" Amount of consumers must be above 0!");
}
HashSet<IConsumer<string,string>> consumers = new HashSet<IConsumer<string, string>>();
for (int i = 0; i < amount; i++)
{
consumers.Add(
new ConsumerBuilder<string,string>(
new ConsumerConfig()
{
BootstrapServers = Environment.GetEnvironmentVariable("KAFKA_BROKERS"),
GroupId = "gatewayConsumer"+Guid.NewGuid().ToString(),
EnableAutoCommit = true,
AutoCommitIntervalMs = 10,
EnableAutoOffsetStore = true,
AutoOffsetReset = AutoOffsetReset.Earliest

}
).Build()
);
}
return consumers;
}
catch (Exception ex)
{
if (ex is MyKafkaException)
{
_logger.LogError(ex, "Error configuring consumers");
throw new ProducerException("Error configuring consumers",ex);
}
throw;
}

}
private HashSet<PendingMessagesBus> ConfigurePendingMessages(List<string> ResponseTopics)
{
if(ResponseTopics.Count == 0)
{
throw new ConfigureMessageBusException("At least one requests topic must e provided!");
}
var PendingMessages = new HashSet<PendingMessagesBus>();
foreach(var requestTopic in ResponseTopics)
{
PendingMessages.Add(new PendingMessagesBus(){ TopicName=requestTopic, MessageKeys = new HashSet<Utils.MethodKeyPair>()});
}
return PendingMessages;
}
private HashSet<RecievedMessagesBus> ConfigureRecievedMessages(List<string> ResponseTopics)
{
if(ResponseTopics.Count == 0)
{
throw new ConfigureMessageBusException("At least one response topic must e provided!");
}
HashSet<RecievedMessagesBus> Responses = new HashSet<RecievedMessagesBus>();
foreach(var RequestTopic in ResponseTopics)
{
Responses.Add(new RecievedMessagesBus() { TopicName = RequestTopic, Messages = new HashSet<Message<string, string>>()});
}
return Responses;
}

private bool IsTopicAvailable(string topicName)
{
try
{
_logger.LogError("Unable to subscribe to topic");
throw new ConsumerTopicUnavailableException("Topic unavailable");

}
catch (Exception e)
{
if (e is MyKafkaException)
{
_logger.LogError(e,"Error checking topic");
throw new ConsumerException("Error checking topic",e);
}
_logger.LogError(e,"Unhandled error");
throw;
}
}

public bool IsMessageRecieved(string MessageKey)
{
try
{
return _recievedMessagesBus.Any(x=>x.Messages.Any(x=>x.Key==MessageKey));
}
catch (Exception e)
{
throw new ConsumerException($"Recieved message bus error",e);
}
}
public async Task<bool> Produce(string topicName, Message<string, string> message, string responseTopic)
{
try
{
bool IsTopicExists = IsTopicAvailable(topicName);
if (IsTopicExists && IsTopicPendingMessageBusExist( responseTopic))
{
var deliveryResult = await _producer.ProduceAsync(topicName, message);
if (deliveryResult.Status == PersistenceStatus.Persisted)
{
_logger.LogInformation("Message delivery status: Persisted {Result}", deliveryResult.Value);

_pendingMessagesBus.FirstOrDefault(x=>x.TopicName == responseTopic)!.MessageKeys.Add(new MethodKeyPair(){
MessageKey = message.Key,
MessageMethod = Encoding.UTF8.GetString(message.Headers.FirstOrDefault(x => x.Key.Equals("method"))!.GetValueBytes())
});
return true;


}

_logger.LogError("Message delivery status: Not persisted {Result}", deliveryResult.Value);
throw new MessageProduceException("Message delivery status: Not persisted" + deliveryResult.Value);

}

bool IsTopicCreated = _kafkaTopicManager.CreateTopic(topicName, Convert.ToInt32(Environment.GetEnvironmentVariable("PARTITIONS_STANDART")), Convert.ToInt16(Environment.GetEnvironmentVariable("REPLICATION_FACTOR_STANDART")));
if (IsTopicCreated && IsTopicPendingMessageBusExist( responseTopic))
{
var deliveryResult = await _producer.ProduceAsync(topicName, message);
if (deliveryResult.Status == PersistenceStatus.Persisted)
{
_logger.LogInformation("Message delivery status: Persisted {Result}", deliveryResult.Value);
_pendingMessagesBus.FirstOrDefault(x=>x.TopicName == responseTopic)!.MessageKeys.Add(new MethodKeyPair(){
MessageKey = message.Key,
MessageMethod = Encoding.UTF8.GetString(message.Headers.FirstOrDefault(x => x.Key.Equals("method"))!.GetValueBytes())
});
return true;
}

_logger.LogError("Message delivery status: Not persisted {Result}", deliveryResult.Value);
throw new MessageProduceException("Message delivery status: Not persisted");

}
_logger.LogError("Topic unavailable");
throw new MessageProduceException("Topic unavailable");
}
catch (Exception e)
{
if (e is MyKafkaException)
{
_logger.LogError(e, "Error producing message");
throw new ProducerException("Error producing message",e);
}
throw;
}
}
private bool IsTopicPendingMessageBusExist(string responseTopic)
{
return _pendingMessagesBus.Any(x => x.TopicName == responseTopic);
}
private void Consume(IConsumer<string,string> localConsumer,string topicName)
{
localConsumer.Subscribe(topicName);
while (true)
{
ConsumeResult<string, string> result = localConsumer.Consume();

if (result != null)
{
try
{
if( _pendingMessagesBus.FirstOrDefault(x=>x.TopicName==topicName)!.MessageKeys.Any(x=>x.MessageKey==result.Message.Key))
{
if(result.Message.Headers.Any(x => x.Key.Equals("errors")))
{
var errors = Encoding.UTF8.GetString(result.Message.Headers.FirstOrDefault(x => x.Key.Equals("errors"))!.GetValueBytes());
_logger.LogError(errors);

throw new ConsumerException(errors);
}

MethodKeyPair pendingMessage = _pendingMessagesBus.FirstOrDefault(x=>x.TopicName==topicName)!.MessageKeys.FirstOrDefault(x=>x.MessageKey==result.Message.Key)!;
if(_pendingMessagesBus.FirstOrDefault(x=>x.TopicName==topicName)!.MessageKeys.Any(x=>x.MessageMethod== Encoding.UTF8.GetString(result.Message.Headers.FirstOrDefault(x => x.Key.Equals("method"))!.GetValueBytes())))
{

localConsumer.Commit(result);
_recievedMessagesBus.FirstOrDefault(x=>x.TopicName== topicName)!.Messages.Add(result.Message);
_pendingMessagesBus.FirstOrDefault(x=>x.TopicName==topicName)!.MessageKeys.Remove(pendingMessage);
}
_logger.LogError("Wrong message method");
throw new ConsumerException("Wrong message method");
}
}
catch (Exception e)
{
if (e is MyKafkaException)
{
_logger.LogError(e,"Consumer error");
throw new ConsumerException("Consumer error ",e);
}
_logger.LogError(e,"Unhandled error");
localConsumer.Commit(result);
throw;
}

}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
using System.ComponentModel;
using System.Text;
using Confluent.Kafka;
using TourService.KafkaException;
using TourService.KafkaException.ConsumerException;
using OpenApiService.KafkaException;
using OpenApiService.KafkaException.ConsumerException;
using Newtonsoft.Json;
namespace TourService.Kafka;
namespace OpenApiService.Kafka;

public abstract class KafkaService(ILogger<KafkaService> logger, IProducer<string, string> producer, KafkaTopicManager kafkaTopicManager)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
using Confluent.Kafka;
using Confluent.Kafka.Admin;
using TourService.KafkaException;
using OpenApiService.KafkaException;

namespace TourService.Kafka;
namespace OpenApiService.Kafka;

public class KafkaTopicManager(IAdminClient adminClient)
{
Expand Down
12 changes: 12 additions & 0 deletions OpenApiService/Kafka/Utils/MethodKeyPair.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace OpenApiService.Kafka.Utils;

public class MethodKeyPair
{
public string MessageKey { get; set; } = null!;
public string MessageMethod {get;set;} = null!;
}
14 changes: 14 additions & 0 deletions OpenApiService/Kafka/Utils/PendingMessagesBus.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using OpenApiService.Kafka.Utils;

namespace OpenApiService.Kafka;

public class PendingMessagesBus
{
public string TopicName {get;set;} = null!;
public HashSet<MethodKeyPair> MessageKeys {get;set;} = null!;
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@
using System.Threading.Tasks;
using Confluent.Kafka;

namespace TourService.Kafka
{
namespace OpenApiService.Kafka;

public class RecievedMessagesBus
{
public string TopicName { get; set; } = "";
public HashSet<Message<string,string>> Messages { get; set;} = new HashSet<Message<string,string>>();
public string TopicName { get; set; } = null!;
public HashSet<Message<string,string>> Messages { get; set;} = null!;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using OpenApiService.KafkaException;

namespace OpenApiService.KafkaException.ConfigurationException;
public class ConfigureConsumersException : MyKafkaException
{
public ConfigureConsumersException() {}
public ConfigureConsumersException(string message) : base(message) {}
public ConfigureConsumersException(string message, System.Exception inner) : base(message, inner) {}
}
Loading
Loading