Skip to content

Commit

Permalink
Fixes #3610: Add advanced SQL analytics to streaming cypher rows befo…
Browse files Browse the repository at this point in the history
…re returning results to the client
  • Loading branch information
vga91 committed Jan 24, 2025
1 parent d28c393 commit f0caa38
Show file tree
Hide file tree
Showing 11 changed files with 1,022 additions and 14 deletions.
166 changes: 165 additions & 1 deletion docs/asciidoc/modules/ROOT/pages/database-integration/load-jdbc.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
= Load JDBC (RDBMS)
:description: This section describes procedures that can be used to import data from databases that have JDBC support.


If you want to use DuckDB you should download and import the driver from https://repo1.maven.org/maven2/org/duckdb/duckdb_jdbc/1.1.3/duckdb_jdbc-1.1.3.jar[Maven Repository]

Data Integration is an important topic.
Reading data from relational databases to create and augment data models is a very helpful exercise.
Expand Down Expand Up @@ -525,3 +525,167 @@ CALL apoc.load.jdbcUpdate('jdbc:derby:derbyDB','UPDATE PERSON SET NAME = ? WHERE
----
CALL apoc.load.jdbc('jdbc:derby:derbyDB', 'PERSON',[],{credentials:{user:'apoc',password:'Ap0c!#Db'}})
----

== Load JDBC - Analytics

You can use the `apoc.jdbc.analytics(<cypherQuery>, <jdbcUrl>, <sqlQueryOverTemporaryTable>, <paramsList>, $config)`
to create a temporary table starting from a Cypher query
and delegate complex analytics to the database defined JDBC URL.

Please note that the returning SQL column names have to be consistent with the one provided by the Cypher query.

In addition to the configurations of the `apoc.load.jdbc` procedure, the `apoc.jdbc.analytics` provides the following ones:

[cols="1m,2,1"]
|===
| tableName | the temporary table name | neo4j_tmp_table
| provider | the SQL provider, to handle data type based on it, possible values are "POSTGRES", "MYSQL" and "DEFAULT" | "DEFAULT"
|===


It is possible to specify a provider in the config parameters.
The default value is "DUCKDB".

You can reproduce the following queries using some nodes:

[source, cypher]
----
CREATE (:City {country: 'NL', name: 'Amsterdam', year: 2000, population: 1005})
CREATE (:City {country: 'NL', name: 'Amsterdam', year: 2010, population: 1065})
CREATE (:City {country: 'NL', name: 'Amsterdam', year: 2020, population: 1158})
CREATE (:City {country: 'US', name: 'Seattle', year: 2000, population: 564})
CREATE (:City {country: 'US', name: 'Seattle', year:2010, population: 608})
CREATE (:City {country: 'US', name: 'Seattle', year: 2020, population: 738})
CREATE (:City {country: 'US', name: 'New York City', year: 2000, population: 8015})
CREATE (:City {country: 'US', name: 'New York City', year: 2010, population: 8175})
CREATE (:City {country: 'US', name: 'New York City', year: 2020, population: 8772})
----

=== DuckDB

Example to get the rank of the current row with gaps.
Fields of the SQL query should be consistent with the Cypher query.
For detailed information go to https://duckdb.org/docs/sql/functions/window_functions.html#rank



[source,cypher]
----
CALL apoc.jdbc.analytics(
"MATCH (n:City) RETURN n.country AS country, n.name AS name, n.year AS year, n.population AS population",
$url,
"SELECT
country,
name,
year,
population,
RANK() OVER (PARTITION BY country ORDER BY year DESC) AS rank
FROM 'neo4j_tmp_table'
ORDER BY rank, country, name;"
)
----

Another example to get a Pivot table using window functions

[source,cypher]
----
CALL apoc.jdbc.analytics(
"MATCH (n:City) RETURN n.country AS country, n.name AS name, n.year AS year, n.population AS population",
$url,
"WITH ranked_data AS (
SELECT
country,
name,
year,
population,
ROW_NUMBER() OVER (PARTITION BY country ORDER BY year DESC) AS rank
FROM 'neo4j_tmp_table'
ORDER BY rank, country, name
)
SELECT *
FROM ranked_data
PIVOT (
sum(population)
FOR country IN ('NL', 'US')
GROUP BY year
)"
)
----

Or using directly a `PIVOT <table> ON <column>` clause:

[source,cypher]
----
CALL apoc.jdbc.analytics(
"MATCH (n:City) RETURN n.country AS country, n.name AS name, n.year AS year, n.population AS population",
$url,
"PIVOT 'neo4j_tmp_table'
ON year
USING sum(population)
ORDER by name"
)
----


=== MySQL

Returns the rank of the current row within its partition, with gaps.
For further information go to
https://dev.mysql.com/doc/refman/8.4/en/window-function-descriptions.html#function_rank

[source,cypher]
----
CALL apoc.jdbc.analytics(
"MATCH (n:City) RETURN n.country AS country, n.name AS name, n.year AS year, n.population AS population",
$url,
"SELECT
country,
name,
year,
population,
RANK() OVER (PARTITION BY country ORDER BY year DESC) AS 'rank'
FROM 'neo4j_tmp_table'
ORDER BY country, name;",
$params,
{ provider: "MYSQL" })
----

Here an example of ROW_NUMBER window function with MySQL:

[source,cypher]
----
CALL apoc.jdbc.analytics(
"MATCH (n:City) RETURN n.country AS country, n.name AS name, n.year AS year, n.population AS population",
$url,
"SELECT
country,
name,
year,
population,
ROW_NUMBER() OVER (PARTITION BY country ORDER BY year DESC) AS 'rank'
FROM 'neo4j_tmp_table'
ORDER BY country, name;",
$params,
{ provider: "MYSQL" })
----

=== PostgreSQL

Here an example with Window functions.

[source,cypher]
----
CALL apoc.jdbc.analytics(
"MATCH (n:City) RETURN n.country AS country, n.name AS name, n.year AS year, n.population AS population",
$url,
"SELECT
country,
name,
year,
population,
RANK() OVER (PARTITION BY country ORDER BY year DESC) rank
FROM 'neo4j_tmp_table'
ORDER BY rank, country, name;",
$params,
{ provider: "POSTGRES" })
----
77 changes: 76 additions & 1 deletion extended-it/src/test/java/apoc/load/MySQLJdbcTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
import org.neo4j.graphdb.Result;
import org.neo4j.graphdb.Transaction;
import org.neo4j.test.rule.DbmsRule;
import org.neo4j.test.rule.ImpermanentDbmsRule;

Expand All @@ -18,8 +20,12 @@
import java.time.ZonedDateTime;
import java.util.Map;

import static apoc.load.Analytics.PROVIDER_CONF_KEY;
import static apoc.util.MapUtil.map;
import static apoc.util.TestUtil.testCall;
import static apoc.util.TestUtil.testResult;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

@RunWith(Enclosed.class)
Expand All @@ -36,7 +42,12 @@ public static class MySQLJdbcLatestVersionTest {
@BeforeClass
public static void setUpContainer() {
mysql.start();
TestUtil.registerProcedure(db, Jdbc.class);
TestUtil.registerProcedure(db, Jdbc.class, Analytics.class);
String movies = Util.readResourceFile(ANALYTICS_CYPHER_FILE);
try (Transaction tx = db.beginTx()) {
tx.execute(movies);
tx.commit();
}
}

@AfterClass
Expand All @@ -54,6 +65,70 @@ public void testLoadJdbc() {
public void testIssue3496() {
MySQLJdbcTest.testIssue3496(db, mysql);
}

@Test
public void testLoadJdbcAnalytics() {
String cypher = "MATCH (n:City) RETURN n.country AS country, n.name AS name, n.year AS year, n.population AS population";

String sql = """
SELECT
country,
name,
year,
population,
RANK() OVER (PARTITION BY country ORDER BY year DESC) AS 'rank'
FROM %s
ORDER BY country, name;
""".formatted(Analytics.TABLE_NAME_DEFAULT_CONF_KEY);
testResult(db, "CALL apoc.jdbc.analytics($queryCypher, $url, $sql, [], $config)",
map(
"queryCypher", cypher,
"sql", sql,
"url", mysql.getJdbcUrl(),
"config", map(PROVIDER_CONF_KEY, Analytics.Provider.MYSQL.name())
),
r -> commonAnalyticsAssertions(r, "1", "3", "5"));
}

@Test
public void testLoadJdbcAnalyticsWindow() {
String cypher = "MATCH (n:City) RETURN n.country AS country, n.name AS name, n.year AS year, n.population AS population";

String sql = """
SELECT
country,
name,
year,
population,
ROW_NUMBER() OVER (PARTITION BY country ORDER BY year DESC) AS 'rank'
FROM %s
ORDER BY country, name;
""".formatted(Analytics.TABLE_NAME_DEFAULT_CONF_KEY);

testResult(db, "CALL apoc.jdbc.analytics($queryCypher, $url, $sql, [], $config)",
map(
"queryCypher", cypher,
"sql", sql,
"url", mysql.getJdbcUrl(),
"config", map(PROVIDER_CONF_KEY, Analytics.Provider.MYSQL.name())
),
r -> commonAnalyticsAssertions(r, "2", "4", "6"));
}

private static void commonAnalyticsAssertions(Result r,
String expected4thResult, String expected5thResult, String expected6thResult) {
assertRowRank(r.next(), "1");
assertRowRank(r.next(), "2");
assertRowRank(r.next(), "3");
assertRowRank(r.next(), expected4thResult);
assertRowRank(r.next(), expected5thResult);
assertRowRank(r.next(), expected6thResult);
assertRowRank(r.next(), "1");
assertRowRank(r.next(), "3");
assertRowRank(r.next(), "5");

assertFalse(r.hasNext());
}
}

public static class MySQLJdbcFiveVersionTest {
Expand Down
75 changes: 74 additions & 1 deletion extended-it/src/test/java/apoc/load/PostgresJdbcTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.junit.Ignore;
import org.junit.Test;
import org.neo4j.graphdb.Result;
import org.neo4j.graphdb.Transaction;
import org.neo4j.test.rule.DbmsRule;
import org.neo4j.test.rule.ImpermanentDbmsRule;
import org.testcontainers.containers.JdbcDatabaseContainer;
Expand All @@ -19,6 +20,8 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static apoc.load.Analytics.PROVIDER_CONF_KEY;
import static apoc.util.MapUtil.map;
import static apoc.util.TestUtil.testCall;
import static apoc.util.TestUtil.testResult;
import static org.junit.Assert.assertArrayEquals;
Expand All @@ -41,8 +44,14 @@ public class PostgresJdbcTest extends AbstractJdbcTest {
public static void setUp() throws Exception {
postgress = new PostgreSQLContainer().withInitScript("init_postgres.sql");
postgress.start();
TestUtil.registerProcedure(db,Jdbc.class, Periodic.class, Strings.class);
TestUtil.registerProcedure(db,Jdbc.class, Periodic.class, Strings.class, Analytics.class);
db.executeTransactionally("CALL apoc.load.driver('org.postgresql.Driver')");

String movies = Util.readResourceFile(ANALYTICS_CYPHER_FILE);
try (Transaction tx = db.beginTx()) {
tx.execute(movies);
tx.commit();
}
}

@AfterClass
Expand Down Expand Up @@ -134,6 +143,70 @@ public void testIssue4141PeriodicIterateWithJdbc() throws Exception {
assertPgStatActivityHasOnlyActiveState();
}

@Test
public void testLoadJdbcAnalytics() {
String cypher = "MATCH (n:City) RETURN n.country AS country, n.name AS name, n.year AS year, n.population AS population";

String sql = """
SELECT
country,
name,
year,
population,
RANK() OVER (PARTITION BY country ORDER BY year DESC) rank
FROM %s
ORDER BY rank, country, name;
""".formatted(Analytics.TABLE_NAME_DEFAULT_CONF_KEY);

testResult(db, "CALL apoc.jdbc.analytics($queryCypher, $url, $sql, [], $config)",
map(
"queryCypher", cypher,
"sql", sql,
"url", getUrl(postgress),
"config", map(PROVIDER_CONF_KEY, Analytics.Provider.POSTGRES.name())
),
r -> commonAnalyticsAssertions(r, 1));
}

@Test
public void testLoadJdbcAnalyticsWindow() {
String cypher = "MATCH (n:City) RETURN n.country AS country, n.name AS name, n.year AS year, n.population AS population";

String sql = """
SELECT
country,
name,
year,
population,
ROW_NUMBER() OVER (PARTITION BY country ORDER BY year DESC) rank
FROM %s
ORDER BY rank, country, name;
""".formatted(Analytics.TABLE_NAME_DEFAULT_CONF_KEY);

testResult(db, "CALL apoc.jdbc.analytics($queryCypher, $url, $sql, [], $config)",
map(
"queryCypher", cypher,
"sql", sql,
"url", getUrl(postgress),
"config", map(PROVIDER_CONF_KEY, Analytics.Provider.MYSQL.name())
),
r -> commonAnalyticsAssertions(r, 2));
}

private static void commonAnalyticsAssertions(Result r, int expected3rdResult) {
assertRowRank(r.next(), 1);
assertRowRank(r.next(), 1);
assertRowRank(r.next(), expected3rdResult);
assertRowRank(r.next(), 2);
assertRowRank(r.next(), 3);
assertRowRank(r.next(), 3);
assertRowRank(r.next(), 4);
assertRowRank(r.next(), 5);
assertRowRank(r.next(), 6);

assertFalse(r.hasNext());
}

private static void assertPgStatActivityHasOnlyActiveState() {
assertEventually(() -> {
// connect to postgres and execute the query `select state from pg_stat_activity`
Expand Down
1 change: 1 addition & 0 deletions extended/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ dependencies {
testImplementation group: 'com.opencsv', name: 'opencsv', version: '5.7.1', {
exclude group: 'org.apache.commons', module: 'commons-collections4'
}
testImplementation group: 'org.duckdb', name: 'duckdb_jdbc', version: '1.1.3'

configurations.all {
exclude group: 'org.slf4j', module: 'slf4j-nop'
Expand Down
Loading

0 comments on commit f0caa38

Please sign in to comment.