Skip to content

Commit

Permalink
Fixes 3610: Add advanced SQL analytics to streaming cypher rows befor…
Browse files Browse the repository at this point in the history
…e returning results to the client
  • Loading branch information
vga91 committed Dec 10, 2024
1 parent c686650 commit a2eb1ac
Show file tree
Hide file tree
Showing 5 changed files with 567 additions and 6 deletions.
1 change: 1 addition & 0 deletions extended/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ dependencies {
testImplementation group: 'com.opencsv', name: 'opencsv', version: '5.7.1', {
exclude group: 'org.apache.commons', module: 'commons-collections4'
}
testImplementation group: 'org.duckdb', name: 'duckdb_jdbc', version: '1.1.3'

configurations.all {
exclude group: 'org.slf4j', module: 'slf4j-nop'
Expand Down
172 changes: 172 additions & 0 deletions extended/src/main/java/apoc/agg/Analytics.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
//package apoc.agg;
//
//import apoc.Extended;
//import apoc.result.RowResult;
//import apoc.util.ExtendedListUtils;
//import apoc.util.Util;
//import org.neo4j.graphdb.Entity;
//import org.neo4j.logging.Log;
//import org.neo4j.procedure.Context;
//import org.neo4j.procedure.Description;
//import org.neo4j.procedure.Name;
//import org.neo4j.procedure.Procedure;
//import org.neo4j.procedure.UserAggregationFunction;
//import org.neo4j.procedure.UserAggregationResult;
//import org.neo4j.procedure.UserAggregationUpdate;
//
//import java.util.ArrayList;
//import java.util.Collections;
//import java.util.HashMap;
//import java.util.List;
//import java.util.Map;
//import java.util.stream.Stream;
//
//import static apoc.agg.AggregationUtil.updateAggregationValues;
//import static apoc.load.Jdbc.executeUpdate;
//
//
//// TODO TODO - spostarlo, non in agg, ma in jdbc
//
//@Extended
//public class Analytics {
//
// @Procedure
// @Description("apoc.agg.rollup(<ANY>, [groupKeys], [aggKeys])" +
// "\n Emulate an Oracle/Mysql rollup command: `ROLLUP groupKeys, SUM(aggKey1), AVG(aggKey1), COUNT(aggKey1), SUM(aggKey2), AVG(aggKey2), ... `")
// public Stream<RowResult> aggregate(
// @Name("neo4jQuery") String neo4jQuery,
// @Name("jdbc") String urlOrKey,
// @Name("sqlQuery") String sqlQuery,
// @Name(value = "params", defaultValue = "[]") List<Object> params,
// @Name(value = "config",defaultValue = "{}") Map<String, Object> config) {
//
// // TODO - scrivere sulla PR: add handling: some SQL database like Microsoft SQL Server create temp table in a different way
// // e.g. CREATE TABLE #table_name (column_name datatype);
// // document it
//
// // step 1: temp table creation from neo4jQuery
//
// // step 2:
// executeUpdate(urlOrKey, sqlQuery, config, log, params);
//
// // step 3: return result
// }
//
//
// /* TODO
// MATCH (p:Person)-[r:ACTED_IN]->(m:Movie)
// RETURN
// p.name AS actor,
// m.genre AS genre,
// r.roles AS roles,
// COUNT(m) AS movies_count
//
// TODO - meglio non aggregation, così è più personalizzabile, posso scegliere quali risultati ottenere e come ottenerli
// altrimenti per fare qualcosa come sopra, con movies_count dovrei mettere un parametri aggKeys e fare cose strane
// */
//
//
// @Context
// public Log log;
//
// @UserAggregationFunction("apoc.agg.analytics")
// // TODO - descriptuin
// @Description("apoc.agg.rollup(<ANY>, [groupKeys], [aggKeys])" +
// "\n Emulate an Oracle/Mysql rollup command: `ROLLUP groupKeys, SUM(aggKey1), AVG(aggKey1), COUNT(aggKey1), SUM(aggKey2), AVG(aggKey2), ... `")
// public AnalyticsFunction rollup() {
// return new AnalyticsFunction();
// }
//
// public static class AnalyticsFunction {
//
// private final Map<List<Object>, Map<String, Number>> rolledUpData = new HashMap<>();
// private List<String> groupKeysRes = null;
//
// @UserAggregationUpdate
// public void aggregate(
// @Name("jdbc") String urlOrKey,
// @Name("query") String query,
// @Name(value = "params", defaultValue = "[]") List<Object> params,
// @Name(value = "config",defaultValue = "{}") Map<String, Object> config) {
//
// executeUpdate(urlOrKey, query, config, log, params);
// }
//
// private void rollupAggregationProperties(List<String> aggKeys, Entity entity, List<Object> partialKey) {
// Map<String, Number> partialResult = rolledUpData.get(partialKey);
// for(var aggKey: aggKeys) {
// if (!entity.hasProperty(aggKey)) {
// continue;
// }
//
// Object property = entity.getProperty(aggKey);
//
// String countKey = "COUNT(%s)".formatted(aggKey);
// String sumKey = "SUM(%s)".formatted(aggKey);
// String avgKey = "AVG(%s)".formatted(aggKey);
//
// updateAggregationValues(partialResult, property, countKey, sumKey, avgKey);
// }
// }
//
// /**
// * Transform a Map.of(ListGroupKeys, MapOfAggResults) in a List of Map.of(AggResult + ListGroupKeyToMap)
// */
// @UserAggregationResult
// public Object result() {
// List<HashMap<String, Object>> list = rolledUpData.entrySet().stream()
// .map(e -> {
// HashMap<String, Object> map = new HashMap<>();
// for (int i = 0; i < groupKeysRes.size(); i++) {
// map.put(groupKeysRes.get(i), e.getKey().get(i));
// }
// map.putAll(e.getValue());
// return map;
// })
// .sorted((m1, m2) -> {
// for (String key : groupKeysRes) {
// Object value1 = m1.get(key);
// Object value2 = m2.get(key);
// int cmp = compareValues(value1, value2);
// if (cmp != 0) {
// return cmp;
// }
// }
// return 0;
// })
// .toList();
//
// return list;
// }
//
// /**
// * We use this instead of e.g. apoc.coll.sortMulti
// * since we have to handle the NULL_ROLLUP values as well
// */
// private static int compareValues(Object value1, Object value2) {
// if (value1 == null && value2 == null) {
// return 0;
// } else if (value1 == null) {
// return 1;
// } else if (value2 == null) {
// return -1;
// } else if (NULL_ROLLUP.equals(value1) && NULL_ROLLUP.equals(value2)) {
// return 0;
// } else if (NULL_ROLLUP.equals(value1)) {
// return 1;
// } else if (NULL_ROLLUP.equals(value2)) {
// return -1;
// } else if (value1 instanceof Comparable && value2 instanceof Comparable) {
// try {
// return ((Comparable<Object>) value1).compareTo(value2);
// } catch (Exception e) {
// // e.g. different data types, like int and strings
// return 0;
// }
//
// } else {
// return 0;
// }
// }
// }
//}
15 changes: 11 additions & 4 deletions extended/src/main/java/apoc/load/Jdbc.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,10 @@ private Stream<RowResult> executeQuery(String urlOrKey, String tableOrSelect, Ma
@Description("apoc.load.jdbcUpdate('key or url','statement',[params],config) YIELD row - update relational database, from a SQL statement with optional parameters")
public Stream<RowResult> jdbcUpdate(@Name("jdbc") String urlOrKey, @Name("query") String query, @Name(value = "params", defaultValue = "[]") List<Object> params, @Name(value = "config",defaultValue = "{}") Map<String, Object> config) {
log.info( String.format( "Executing SQL update: %s", query ) );
return executeUpdate(urlOrKey, query, config, params.toArray(new Object[params.size()]));
return executeUpdate(urlOrKey, query, config, log, params.toArray(new Object[params.size()]));
}

private Stream<RowResult> executeUpdate(String urlOrKey, String query, Map<String, Object> config, Object...params) {
public static Stream<RowResult> executeUpdate(String urlOrKey, String query, Map<String, Object> config, Log log, Object...params) {
String url = getUrlOrKey(urlOrKey);
LoadJdbcConfig jdbcConfig = new LoadJdbcConfig(config);
try {
Expand Down Expand Up @@ -224,7 +224,10 @@ private Object convert(Object value, int sqlType) {
return value;
}
if (Types.TIME == sqlType) {
return ((java.sql.Time)value).toLocalTime();
if (value instanceof java.sql.Time time) {
return time.toLocalTime();
}
return value;
}
if (Types.TIME_WITH_TIMEZONE == sqlType) {
return OffsetTime.parse(value.toString());
Expand All @@ -248,7 +251,11 @@ private Object convert(Object value, int sqlType) {
}
}
if (Types.DATE == sqlType) {
return ((java.sql.Date)value).toLocalDate();
// Cannot cast 'java.time.LocalDate' to 'java.sql.Date' -- with DuckDB since is already a LocalDate
if (value instanceof java.sql.Date date) {
return date.toLocalDate();
}
return value;
}

if (Types.ARRAY == sqlType) {
Expand Down
10 changes: 8 additions & 2 deletions extended/src/main/java/apoc/load/util/JdbcUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,14 @@ public static Object getConnection(String jdbcUrl, LoadJdbcConfig config, Class<
if(config.hasCredentials()) {
return createConnection(jdbcUrl, config.getCredentials().getUser(), config.getCredentials().getPassword(), classType);
} else {
URI uri = new URI(jdbcUrl.substring("jdbc:".length()));
String userInfo = uri.getUserInfo();
String userInfo = null;
try {
URI uri = new URI(jdbcUrl.substring("jdbc:".length()));
userInfo = uri.getUserInfo();
} catch (Exception e) {
// with DuckDB we can pass a jdbc url like "jdbc:duckdb:"
// this will fail executing new URI(..) due to `java.net.URISyntaxException: Expected scheme-specific part at index 7: duckdb:`
}
if (userInfo != null) {
String cleanUrl = jdbcUrl.substring(0, jdbcUrl.indexOf("://") + 3) + jdbcUrl.substring(jdbcUrl.indexOf("@") + 1);
String[] user = userInfo.split(":");
Expand Down
Loading

0 comments on commit a2eb1ac

Please # to comment.