Skip to content

Commit

Permalink
Fixes #3799: Apache Kafka procedures
Browse files Browse the repository at this point in the history
  • Loading branch information
vga91 committed Aug 27, 2024
1 parent 48f048b commit 3354762
Show file tree
Hide file tree
Showing 112 changed files with 14,576 additions and 27 deletions.
27 changes: 25 additions & 2 deletions extended/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jar {
}

compileKotlin {
kotlinOptions.jvmTarget = "17"
kotlinOptions.jvmTarget = JavaVersion.VERSION_17
}

generateGrammarSource {
Expand Down Expand Up @@ -62,6 +62,10 @@ dependencies {
exclude group: 'org.abego.treelayout'
}

def kotlinVersion = "1.6.0"
def kafkaVersion = "2.4.0"
def jacksonVersion = "2.17.2"

def withoutServers = {
exclude group: 'org.eclipse.jetty'
exclude group: 'org.eclipse.jetty.aggregate'
Expand Down Expand Up @@ -105,7 +109,10 @@ dependencies {
}
compileOnly group: 'com.couchbase.client', name: 'java-client', version: '3.3.0', withoutJacksons
compileOnly group: 'io.lettuce', name: 'lettuce-core', version: '6.1.1.RELEASE'
compileOnly group: 'com.fasterxml.jackson.module', name: 'jackson-module-kotlin', version: '2.14.0', withoutJacksons
compileOnly group: 'com.fasterxml.jackson.module', name: 'jackson-module-kotlin', version: jacksonVersion
testImplementation group: 'com.fasterxml.jackson.module', name: 'jackson-module-kotlin', version: jacksonVersion
compileOnly group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: jacksonVersion
testImplementation group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: jacksonVersion
compileOnly group: 'com.amazonaws', name: 'aws-java-sdk-s3', version: '1.11.270'
compileOnly group: 'com.amazonaws', name: 'aws-java-sdk-comprehend', version: '1.12.353' , withoutJacksons
compileOnly group: 'com.sun.mail', name: 'javax.mail', version: '1.6.0'
Expand All @@ -118,6 +125,12 @@ dependencies {
compileOnly group: 'org.apache.arrow', name: 'arrow-vector', version: '13.0.0'
compileOnly group: 'org.apache.arrow', name: 'arrow-memory-netty', version: '13.0.0'

compileOnly group: 'org.jetbrains.kotlinx', name: 'kotlinx-coroutines-core', version: '1.4.2'
compileOnly group: 'org.apache.kafka', name: 'kafka-clients', version: kafkaVersion
compileOnly group: 'com.github.conker84', name: 'neo4j-configuration-lifecycle', version: 'ad59084711'
compileOnly group: 'io.confluent', name: 'kafka-avro-serializer', version: '5.2.2'

testImplementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: '2.16.1'
testImplementation group: 'org.apache.arrow', name: 'arrow-vector', version: '13.0.0'
testImplementation group: 'org.apache.arrow', name: 'arrow-memory-netty', version: '13.0.0'

Expand Down Expand Up @@ -145,6 +158,16 @@ dependencies {
testImplementation group: 'org.mockito', name: 'mockito-core', version: '5.4.0'
testImplementation group: 'org.apache.parquet', name: 'parquet-hadoop', version: '1.13.1', withoutServers
testImplementation group: 'com.opencsv', name: 'opencsv', version: '5.7.1'
testImplementation group: 'org.jetbrains.kotlinx', name: 'kotlinx-coroutines-core', version: '1.4.2'
// testImplementation group: 'org.jetbrains.kotlin', name: 'kotlin-test-junit', version: kotlinVersion
// testImplementation group: 'org.jetbrains.kotlin', name: 'kotlin-test-junit5', version: kotlinVersion

testImplementation group: 'org.jetbrains.kotlin', name: 'kotlin-test', version: '1.6.0'

testImplementation group: 'org.apache.kafka', name: 'kafka-clients', version: kafkaVersion
testImplementation group: 'io.confluent', name: 'kafka-avro-serializer', version: '5.2.2'
testImplementation group: 'org.testcontainers', name: 'kafka', version: testContainersVersion
testImplementation group: 'com.github.conker84', name: 'neo4j-configuration-lifecycle', version: 'ad59084711'

configurations.all {
exclude group: 'org.slf4j', module: 'slf4j-nop'
Expand Down
20 changes: 20 additions & 0 deletions extended/src/main/java/apoc/ExtendedApocConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class ExtendedApocConfig extends LifecycleAdapter
public static final String APOC_ML_WATSON_URL = "apoc.ml.watson.url";
public static final String APOC_AWS_KEY_ID = "apoc.aws.key.id";
public static final String APOC_AWS_SECRET_KEY = "apoc.aws.secret.key";
public static final String APOC_KAFKA_ENABLED = "apoc.kafka.enabled";
public enum UuidFormatType { hex, base64 }

// These were earlier added via the Neo4j config using the ApocSettings.java class
Expand Down Expand Up @@ -73,6 +74,25 @@ public enum UuidFormatType { hex, base64 }

public static final String CONFIG_DIR = "config-dir=";

private static final String CONF_DIR_ARG = "config-dir=";
private static final String SOURCE_ENABLED = "apoc.kafka.source.enabled";
private static final boolean SOURCE_ENABLED_VALUE = true;
private static final String PROCEDURES_ENABLED = "apoc.kafka.procedures.enabled";
private static final boolean PROCEDURES_ENABLED_VALUE = true;
private static final String SINK_ENABLED = "apoc.kafka.sink.enabled";
private static final boolean SINK_ENABLED_VALUE = false;
private static final String CHECK_APOC_TIMEOUT = "apoc.kafka.check.apoc.timeout";
private static final String CHECK_APOC_INTERVAL = "apoc.kafka.check.apoc.interval";
private static final String CLUSTER_ONLY = "apoc.kafka.cluster.only";
private static final String CHECK_WRITEABLE_INSTANCE_INTERVAL = "apoc.kafka.check.writeable.instance.interval";
private static final String SYSTEM_DB_WAIT_TIMEOUT = "apoc.kafka.systemdb.wait.timeout";
private static final long SYSTEM_DB_WAIT_TIMEOUT_VALUE = 10000L;
private static final String POLL_INTERVAL = "apoc.kafka.sink.poll.interval";
private static final String INSTANCE_WAIT_TIMEOUT = "apoc.kafka.wait.timeout";
private static final long INSTANCE_WAIT_TIMEOUT_VALUE = 120000L;
private static final int DEFAULT_TRIGGER_PERIOD = 10000;
private static final String DEFAULT_PATH = ".";

public ExtendedApocConfig(LogService log, GlobalProcedures globalProceduresRegistry, String defaultConfigPath) {
this.log = log.getInternalLog(ApocConfig.class);
this.defaultConfigPath = defaultConfigPath;
Expand Down
66 changes: 48 additions & 18 deletions extended/src/main/java/apoc/ExtendedApocGlobalComponents.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,19 @@
import org.neo4j.kernel.availability.AvailabilityListener;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.logging.Log;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static apoc.ExtendedApocConfig.APOC_KAFKA_ENABLED;

@ServiceProvider
public class ExtendedApocGlobalComponents implements ApocGlobalComponents {

Expand All @@ -37,36 +43,60 @@ public Map<String, Lifecycle> getServices(GraphDatabaseAPI db, ApocExtensionFact
);
cypherProcedureHandlers.put(db, cypherProcedureHandler);

return Map.of(
Map<String, Lifecycle> serviceMap = new HashMap<>();
serviceMap.put("ttl", new TTLLifeCycle(dependencies.scheduler(),
db,
TTLConfig.ttlConfig(),
dependencies.log().getUserLog(TTLLifeCycle.class)));

"ttl", new TTLLifeCycle(dependencies.scheduler(),
db,
TTLConfig.ttlConfig(),
dependencies.log().getUserLog(TTLLifeCycle.class)),
serviceMap.put("uuid", new UuidHandler(db,
dependencies.databaseManagementService(),
dependencies.log().getUserLog(Uuid.class),
dependencies.apocConfig(),
dependencies.scheduler(),
dependencies.pools()));

"uuid", new UuidHandler(db,
dependencies.databaseManagementService(),
dependencies.log().getUserLog(Uuid.class),
dependencies.apocConfig(),
dependencies.scheduler(),
dependencies.pools()),
serviceMap.put("directory", new LoadDirectoryHandler(db,
dependencies.log().getUserLog(LoadDirectory.class),
dependencies.pools()));

"directory", new LoadDirectoryHandler(db,
dependencies.log().getUserLog(LoadDirectory.class),
dependencies.pools()),
serviceMap.put("cypherProcedures", cypherProcedureHandler);

if (dependencies.apocConfig().getBoolean(APOC_KAFKA_ENABLED)) {
try {
Class<?> kafkaHandlerClass = Class.forName("apoc.kafka.KafkaHandler");
Lifecycle kafkaHandler = (Lifecycle) kafkaHandlerClass
.getConstructor(GraphDatabaseAPI.class, Log.class)
.newInstance(db, dependencies.log().getUserLog(kafkaHandlerClass));

serviceMap.put("kafkaHandler", kafkaHandler);
} catch (Exception e) {
dependencies.log().getUserLog(ExtendedApocGlobalComponents.class)
.warn("""
Cannot find the Kafka extra jar.
Please put the apoc-kafka-dependencies-5.x.x-all.jar into plugin folder.
See the documentation: https://neo4j.com/labs/apoc/5/overview/apoc.kakfa""");
}
}

return serviceMap;

"cypherProcedures", cypherProcedureHandler
);
}

@Override
public Collection<Class> getContextClasses() {
return List.of(CypherProceduresHandler.class, UuidHandler.class, LoadDirectoryHandler.class);
List<Class> contextClasses = new ArrayList<>(
Arrays.asList(CypherProceduresHandler.class, UuidHandler.class, LoadDirectoryHandler.class)
);
try {
contextClasses.add(Class.forName("apoc.kafka.KafkaHandler"));
} catch (ClassNotFoundException ignored) {}
return contextClasses;
}

@Override
public Iterable<AvailabilityListener> getListeners(GraphDatabaseAPI db, ApocExtensionFactory.Dependencies dependencies) {
CypherProceduresHandler cypherProceduresHandler = cypherProcedureHandlers.get(db);
return cypherProceduresHandler==null ? Collections.emptyList() : Collections.singleton(cypherProceduresHandler);
}
}
}
2 changes: 1 addition & 1 deletion extended/src/main/java/apoc/generate/Generate.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void complete(@Name("noNodes") Long noNodes, @Name("label") String label,
@Procedure(name = "apoc.generate.simple",mode = Mode.WRITE)
@Description("apoc.generate.simple(degrees, label, type) - generates a simple random graph according to the given degree distribution")
public void simple(@Name("degrees") List<Long> degrees, @Name("label") String label, @Name("type") String relationshipType) throws IOException {
if (degrees == null) degrees = Arrays.asList(2L, 2L, 2L, 2L);
if (degrees == null) degrees = java.util.Arrays.asList(2L, 2L, 2L, 2L);

List<Integer> intDegrees = degrees.stream().map(Long::intValue).collect(Collectors.toList());

Expand Down
2 changes: 1 addition & 1 deletion extended/src/main/java/apoc/load/Jdbc.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ private Stream<RowResult> executeQuery(String urlOrKey, String tableOrSelect, Ma
}
}

@Procedure(mode = Mode.DBMS)
@Procedure(mode = Mode.WRITE)
@Description("apoc.load.jdbcUpdate('key or url','statement',[params],config) YIELD row - update relational database, from a SQL statement with optional parameters")
public Stream<RowResult> jdbcUpdate(@Name("jdbc") String urlOrKey, @Name("query") String query, @Name(value = "params", defaultValue = "[]") List<Object> params, @Name(value = "config",defaultValue = "{}") Map<String, Object> config) {
log.info( String.format( "Executing SQL update: %s", query ) );
Expand Down
50 changes: 50 additions & 0 deletions extended/src/main/kotlin/apoc/kafka/KafkaHandler.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package apoc.kafka

import apoc.ApocConfig
import apoc.ExtendedApocConfig.APOC_KAFKA_ENABLED
import apoc.kafka.config.StreamsConfig
import apoc.kafka.consumer.StreamsSinkConfigurationListener
import apoc.kafka.producer.StreamsRouterConfigurationListener
import org.neo4j.kernel.internal.GraphDatabaseAPI
import org.neo4j.kernel.lifecycle.LifecycleAdapter
import org.neo4j.logging.Log

class KafkaHandler(): LifecycleAdapter() {

private lateinit var db: GraphDatabaseAPI
private lateinit var log: Log

constructor(db: GraphDatabaseAPI, log: Log) : this() {
this.db = db
this.log = log
}

override fun start() {
if(ApocConfig.apocConfig().getBoolean(APOC_KAFKA_ENABLED)) {
// println("start db......")

try {
StreamsRouterConfigurationListener(db, log)
.start(StreamsConfig.getConfiguration())
} catch (e: Exception) {
log.error("Exception in StreamsRouterConfigurationListener {}", e.message)
}

try {
StreamsSinkConfigurationListener(db, log)
.start(StreamsConfig.getConfiguration())
} catch (e: Exception) {
log.error("Exception in StreamsSinkConfigurationListener {}", e.message)
}
}
}

override fun stop() {
if(ApocConfig.apocConfig().getBoolean(APOC_KAFKA_ENABLED)) {
// println("stop db..........")

StreamsRouterConfigurationListener(db, log).shutdown()
StreamsSinkConfigurationListener(db, log).shutdown()
}
}
}
43 changes: 43 additions & 0 deletions extended/src/main/kotlin/apoc/kafka/Neo4jStreamsStrategyStorage.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
//package apoc.kafka
//
//import apoc.kafka.consumer.StreamsSinkConfiguration
//import apoc.kafka.consumer.StreamsTopicService
//import apoc.kafka.extensions.isDefaultDb
//import apoc.kafka.service.StreamsStrategyStorage
//import apoc.kafka.service.TopicType
//import apoc.kafka.service.sink.strategy.*
//import org.neo4j.graphdb.GraphDatabaseService
//
//class Neo4jStreamsStrategyStorage(private val streamsTopicService: StreamsTopicService,
// private val streamsConfig: Map<String, String>,
// private val db: GraphDatabaseService): StreamsStrategyStorage() {
//
// override fun getTopicType(topic: String): TopicType? {
// return streamsTopicService.getTopicType(topic)
// }
//
// private fun <T> getTopicsByTopicType(topicType: TopicType): T = streamsTopicService.getByTopicType(topicType) as T
//
// override fun getStrategy(topic: String): IngestionStrategy = when (val topicType = getTopicType(topic)) {
// TopicType.CDC_SOURCE_ID -> {
// val strategyConfig = StreamsSinkConfiguration
// .createSourceIdIngestionStrategyConfig(streamsConfig, db.databaseName(), db.isDefaultDb())
// SourceIdIngestionStrategy(strategyConfig)
// }
// TopicType.CDC_SCHEMA -> SchemaIngestionStrategy()
// TopicType.CUD -> CUDIngestionStrategy()
// TopicType.PATTERN_NODE -> {
// val map = getTopicsByTopicType<Map<String, NodePatternConfiguration>>(topicType)
// NodePatternIngestionStrategy(map.getValue(topic))
// }
// TopicType.PATTERN_RELATIONSHIP -> {
// val map = getTopicsByTopicType<Map<String, RelationshipPatternConfiguration>>(topicType)
// RelationshipPatternIngestionStrategy(map.getValue(topic))
// }
// TopicType.CYPHER -> {
// CypherTemplateStrategy(streamsTopicService.getCypherTemplate(topic)!!)
// }
// else -> throw RuntimeException("Topic Type not Found")
// }
//
//}
Loading

0 comments on commit 3354762

Please sign in to comment.