Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pluggable Kafka methods #1123

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,13 @@
<version>${micrometer-version}</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>

</dependencies>
<build>
<plugins>
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/com/autotune/operator/InitializeDeployment.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ public static void setup_deployment_info() throws Exception, K8sTypeNotSupported
String monitoring_agent_endpoint = KruizeDeploymentInfo.monitoring_agent_endpoint;
String monitoring_agent = KruizeDeploymentInfo.monitoring_agent;
String monitoring_agent_service = KruizeDeploymentInfo.monitoring_service;
String kafka_bootstrap_servers = System.getenv("KAFKA_BOOTSTRAP_SERVERS");
String kafka_consumer_group_id = System.getenv("KAFKA_CONSUMER_GROUP_ID");
String kafka_topic_inbound = System.getenv("INGRESS_KAFKA_TOPIC");
String kafka_topic_outbound = System.getenv("EGRESS_KAFKA_TOPIC");
//If no endpoint was specified in the configmap
if (monitoring_agent_endpoint == null || monitoring_agent_endpoint.isEmpty()) {
if (monitoring_agent == null || monitoring_agent_service == null) {
Expand All @@ -70,6 +74,11 @@ public static void setup_deployment_info() throws Exception, K8sTypeNotSupported

KruizeDeploymentInfo.logDeploymentInfo();

KruizeDeploymentInfo.setKafkaBootstrapServers(kafka_bootstrap_servers);
KruizeDeploymentInfo.setKafkaGroupID(kafka_consumer_group_id);
KruizeDeploymentInfo.setInboundKafkaTopic(kafka_topic_inbound);
KruizeDeploymentInfo.setOutboundKafkaTopic(kafka_topic_outbound);


}

Expand Down
33 changes: 33 additions & 0 deletions src/main/java/com/autotune/operator/KruizeDeploymentInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ public class KruizeDeploymentInfo {
//private static KubernetesClient kubernetesClient;
private static KubeEventLogger kubeEventLogger;

private static String kafkaBootstrapServers = null;
private static String kafkaGroupID = null;
private static String inboundKafkaTopic = null;
private static String outboundKafkaTopic = null;


private KruizeDeploymentInfo() {
}
Expand Down Expand Up @@ -164,6 +169,34 @@ public static void setMonitoringAgentService(String monitoringAgentService) {
KruizeDeploymentInfo.monitoring_service = monitoringAgentService.toLowerCase();
}

public static synchronized void setKafkaBootstrapServers(String kafkaBootstrapServers)
{
if (kafkaBootstrapServers != null)
KruizeDeploymentInfo.kafkaBootstrapServers = kafkaBootstrapServers;
}
public static String getKafkaBootstrapServers() {return KruizeDeploymentInfo.kafkaBootstrapServers;}

public static void setKafkaGroupID(String kafkaGroupID)
{
if (kafkaGroupID != null)
KruizeDeploymentInfo.kafkaGroupID = kafkaGroupID;
}
public static String getKafkaGroupID() {return KruizeDeploymentInfo.kafkaGroupID;}

public static void setInboundKafkaTopic(String inboundKafkaTopic)
{
if (inboundKafkaTopic != null)
KruizeDeploymentInfo.inboundKafkaTopic = inboundKafkaTopic;
}
public static String getInboundKafkaTopic() { return KruizeDeploymentInfo.inboundKafkaTopic; }

public static void setOutboundKafkaTopic(String outboundKafkaTopic)
{
if (outboundKafkaTopic != null)
KruizeDeploymentInfo.outboundKafkaTopic = outboundKafkaTopic;
}
public static String getOutboundKafkaTopic() {return KruizeDeploymentInfo.outboundKafkaTopic;}

public static void logDeploymentInfo() {
LOGGER.info("Cluster Type: {}", KruizeDeploymentInfo.cluster_type);
LOGGER.info("Kubernetes Type: {}", KruizeDeploymentInfo.k8s_type);
Expand Down
101 changes: 101 additions & 0 deletions src/main/java/com/autotune/utils/KafkaUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package com.autotune.utils;

import com.autotune.operator.KruizeDeploymentInfo;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.Scanner;


public class KafkaUtils {

private static final Logger LOGGER = LoggerFactory.getLogger(KafkaUtils.class);

private static Consumer<String, String> consumer;
private static Producer<String, String> producer;


// Static initializer block to register shutdown hooks
static {
addConsumerShutdownHook();
addProducerShutdownHook();
}

// Kafka Producer Method
public static void produceMessage(String message) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KruizeDeploymentInfo.getKafkaBootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

try {
producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>(KruizeDeploymentInfo.getOutboundKafkaTopic(), message);
producer.send(record);
} catch (Exception e) {
LOGGER.error("Unable to produce msg: ", e);
}
}

// Kafka Consumer Method
public static String consumeMessages() {
// listener/continuous process; returns msg
Properties props = new Properties();
LOGGER.error("This is from env {}", KruizeDeploymentInfo.getKafkaBootstrapServers());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KruizeDeploymentInfo.getKafkaBootstrapServers());
props.put(ConsumerConfig.GROUP_ID_CONFIG, KruizeDeploymentInfo.getKafkaGroupID());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

// Flag to control the loop and terminate when needed
boolean continueListening = true;

try {
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(KruizeDeploymentInfo.getInboundKafkaTopic()));

while (continueListening) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received message: " + record.value());
}
if (isTerminationSignalReceived()) {
continueListening = false;
}
}
} catch (Exception e) {
LOGGER.error("Unable to consume msg: ", e);
}
return null;
}

private static boolean isTerminationSignalReceived() {
Scanner scanner = new Scanner(System.in);
return scanner.hasNext(); // This will return true if any input arrives
}

// Shutdown hook for the consumer
private static void addConsumerShutdownHook() {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (consumer != null) {
consumer.close();
}
}));
}

// Shutdown hook for the producer
private static void addProducerShutdownHook() {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (producer != null) {
producer.close();
}
}));
}
}