Skip to content

Commit

Permalink
Applying styler. Mentioning Spark JDBC drivers in documentation.
Browse files Browse the repository at this point in the history
  • Loading branch information
Admin_mschuemi authored and Admin_mschuemi committed Nov 18, 2021
1 parent c5e0c66 commit 3b7c5a6
Show file tree
Hide file tree
Showing 61 changed files with 2,356 additions and 1,763 deletions.
238 changes: 142 additions & 96 deletions R/Andromeda.R
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@ lowLevelQuerySqlToAndromeda <- function(connection,
andromedaTableName,
datesAsString = FALSE,
integerAsNumeric = getOption("databaseConnectorIntegerAsNumeric",
default = TRUE),
default = TRUE
),
integer64AsNumeric = getOption("databaseConnectorInteger64AsNumeric",
default = TRUE)) {
default = TRUE
)) {
UseMethod("lowLevelQuerySqlToAndromeda", connection)
}

Expand All @@ -65,39 +67,48 @@ lowLevelQuerySqlToAndromeda.default <- function(connection,
andromedaTableName,
datesAsString = FALSE,
integerAsNumeric = getOption("databaseConnectorIntegerAsNumeric",
default = TRUE),
default = TRUE
),
integer64AsNumeric = getOption("databaseConnectorInteger64AsNumeric",
default = TRUE)) {
if (rJava::is.jnull(connection@jConnection))
default = TRUE
)) {
if (rJava::is.jnull(connection@jConnection)) {
stop("Connection is closed")
}

batchedQuery <- rJava::.jnew("org.ohdsi.databaseConnector.BatchedQuery",
connection@jConnection,
query,
connection@dbms)
batchedQuery <- rJava::.jnew(
"org.ohdsi.databaseConnector.BatchedQuery",
connection@jConnection,
query,
connection@dbms
)

on.exit(rJava::.jcall(batchedQuery, "V", "clear"))

columnTypes <- rJava::.jcall(batchedQuery, "[I", "getColumnTypes")
if (length(columnTypes) == 0)
if (length(columnTypes) == 0) {
stop("No columns found")
}
if (any(columnTypes == 5)) {
validateInt64Query()
}
first <- TRUE
while (!rJava::.jcall(batchedQuery, "Z", "isDone")) {
rJava::.jcall(batchedQuery, "V", "fetchBatch")
batch <- parseJdbcColumnData(batchedQuery,
columnTypes = columnTypes,
datesAsString = datesAsString,
integer64AsNumeric = integer64AsNumeric,
integerAsNumeric = integerAsNumeric)
columnTypes = columnTypes,
datesAsString = datesAsString,
integer64AsNumeric = integer64AsNumeric,
integerAsNumeric = integerAsNumeric
)

RSQLite::dbWriteTable(conn = andromeda,
name = andromedaTableName,
value = batch,
overwrite = first,
append = !first)
RSQLite::dbWriteTable(
conn = andromeda,
name = andromedaTableName,
value = batch,
overwrite = first,
append = !first
)
first <- FALSE
}
invisible(andromeda)
Expand All @@ -110,20 +121,24 @@ lowLevelQuerySqlToAndromeda.DatabaseConnectorDbiConnection <- function(connectio
andromedaTableName,
datesAsString = FALSE,
integerAsNumeric = getOption("databaseConnectorIntegerAsNumeric",
default = TRUE),
default = TRUE
),
integer64AsNumeric = getOption("databaseConnectorInteger64AsNumeric",
default = TRUE)) {

default = TRUE
)) {
results <- lowLevelQuerySql(connection,
query,
integerAsNumeric = integerAsNumeric,
integer64AsNumeric = integer64AsNumeric)
query,
integerAsNumeric = integerAsNumeric,
integer64AsNumeric = integer64AsNumeric
)

RSQLite::dbWriteTable(conn = andromeda,
name = andromedaTableName,
value = results,
overwrite = TRUE,
append = FALSE)
RSQLite::dbWriteTable(
conn = andromeda,
name = andromedaTableName,
value = results,
overwrite = TRUE,
append = FALSE
)
invisible(andromeda)
}

Expand Down Expand Up @@ -164,16 +179,20 @@ lowLevelQuerySqlToAndromeda.DatabaseConnectorDbiConnection <- function(connectio
#' @examples
#' \dontrun{
#' andromeda <- Andromeda::andromeda()
#' connectionDetails <- createConnectionDetails(dbms = "postgresql",
#' server = "localhost",
#' user = "root",
#' password = "blah",
#' schema = "cdm_v4")
#' connectionDetails <- createConnectionDetails(
#' dbms = "postgresql",
#' server = "localhost",
#' user = "root",
#' password = "blah",
#' schema = "cdm_v4"
#' )
#' conn <- connect(connectionDetails)
#' querySqlToAndromeda(connection = conn,
#' sql = "SELECT * FROM person;",
#' andromeda = andromeda,
#' andromedaTableName = "foo")
#' querySqlToAndromeda(
#' connection = conn,
#' sql = "SELECT * FROM person;",
#' andromeda = andromeda,
#' andromedaTableName = "foo"
#' )
#' disconnect(conn)
#'
#' andromeda$foo
Expand All @@ -186,45 +205,61 @@ querySqlToAndromeda <- function(connection,
errorReportFile = file.path(getwd(), "errorReportSql.txt"),
snakeCaseToCamelCase = FALSE,
integerAsNumeric = getOption("databaseConnectorIntegerAsNumeric",
default = TRUE),
default = TRUE
),
integer64AsNumeric = getOption("databaseConnectorInteger64AsNumeric",
default = TRUE)) {
if (inherits(connection,
"DatabaseConnectorJdbcConnection") && rJava::is.jnull(connection@jConnection))
default = TRUE
)) {
if (inherits(
connection,
"DatabaseConnectorJdbcConnection"
) && rJava::is.jnull(connection@jConnection)) {
stop("Connection is closed")
if (!inherits(andromeda, "SQLiteConnection"))
}
if (!inherits(andromeda, "SQLiteConnection")) {
stop("The andromeda argument must be an Andromeda object (or SQLiteConnection objecT).")
}

# Calling splitSql, because this will also strip trailing semicolons (which cause Oracle to crash).
sqlStatements <- SqlRender::splitSql(sql)
if (length(sqlStatements) > 1)
stop(paste("A query that returns a result can only consist of one SQL statement, but",
length(sqlStatements),
"statements were found"))
tryCatch({
lowLevelQuerySqlToAndromeda(connection = connection,
query = sqlStatements[1],
andromeda = andromeda,
andromedaTableName = andromedaTableName,
integerAsNumeric = integerAsNumeric,
integer64AsNumeric = integer64AsNumeric)
columnNames <- RSQLite::dbListFields(andromeda, andromedaTableName)
newColumnNames <- toupper(columnNames)
if (snakeCaseToCamelCase) {
newColumnNames <- SqlRender::snakeCaseToCamelCase(newColumnNames)
}
idx <- columnNames != newColumnNames
if (any(idx)) {
sql <- sprintf("ALTER TABLE %s RENAME COLUMN %s TO %s;",
andromedaTableName,
columnNames[idx],
newColumnNames[idx])
lapply(sql, function(x) RSQLite::dbExecute(andromeda, x))
if (length(sqlStatements) > 1) {
stop(paste(
"A query that returns a result can only consist of one SQL statement, but",
length(sqlStatements),
"statements were found"
))
}
tryCatch(
{
lowLevelQuerySqlToAndromeda(
connection = connection,
query = sqlStatements[1],
andromeda = andromeda,
andromedaTableName = andromedaTableName,
integerAsNumeric = integerAsNumeric,
integer64AsNumeric = integer64AsNumeric
)
columnNames <- RSQLite::dbListFields(andromeda, andromedaTableName)
newColumnNames <- toupper(columnNames)
if (snakeCaseToCamelCase) {
newColumnNames <- SqlRender::snakeCaseToCamelCase(newColumnNames)
}
idx <- columnNames != newColumnNames
if (any(idx)) {
sql <- sprintf(
"ALTER TABLE %s RENAME COLUMN %s TO %s;",
andromedaTableName,
columnNames[idx],
newColumnNames[idx]
)
lapply(sql, function(x) RSQLite::dbExecute(andromeda, x))
}
invisible(andromeda)
},
error = function(err) {
.createErrorReport(connection@dbms, err$message, sql, errorReportFile)
}
invisible(andromeda)
}, error = function(err) {
.createErrorReport(connection@dbms, err$message, sql, errorReportFile)
})
)
}

#' Render, translate, and query to local Andromeda
Expand Down Expand Up @@ -265,17 +300,20 @@ querySqlToAndromeda <- function(connection,
#'
#' @examples
#' \dontrun{
#' connectionDetails <- createConnectionDetails(dbms = "postgresql",
#' server = "localhost",
#' user = "root",
#' password = "blah",
#' schema = "cdm_v4")
#' connectionDetails <- createConnectionDetails(
#' dbms = "postgresql",
#' server = "localhost",
#' user = "root",
#' password = "blah",
#' schema = "cdm_v4"
#' )
#' conn <- connect(connectionDetails)
#' renderTranslatequerySqlToAndromeda(conn,
#' sql = "SELECT * FROM @@schema.person",
#' schema = "cdm_synpuf",
#' andromeda = andromeda,
#' andromedaTableName = "foo")
#' sql = "SELECT * FROM @@schema.person",
#' schema = "cdm_synpuf",
#' andromeda = andromeda,
#' andromedaTableName = "foo"
#' )
#' disconnect(conn)
#'
#' andromeda$foo
Expand All @@ -285,32 +323,40 @@ renderTranslateQuerySqlToAndromeda <- function(connection,
sql,
andromeda,
andromedaTableName,
errorReportFile = file.path(getwd(),
"errorReportSql.txt"),
errorReportFile = file.path(
getwd(),
"errorReportSql.txt"
),
snakeCaseToCamelCase = FALSE,
oracleTempSchema = NULL,
tempEmulationSchema = getOption("sqlRenderTempEmulationSchema"),
integerAsNumeric = getOption("databaseConnectorIntegerAsNumeric",
default = TRUE),
default = TRUE
),
integer64AsNumeric = getOption("databaseConnectorInteger64AsNumeric",
default = TRUE),
default = TRUE
),
...) {
if (!is.null(oracleTempSchema) && oracleTempSchema != "") {
warn("The 'oracleTempSchema' argument is deprecated. Use 'tempEmulationSchema' instead.",
.frequency = "regularly",
.frequency_id = "oracleTempSchema")
.frequency = "regularly",
.frequency_id = "oracleTempSchema"
)
tempEmulationSchema <- oracleTempSchema
}
sql <- SqlRender::render(sql, ...)
sql <- SqlRender::translate(sql,
targetDialect = connection@dbms,
tempEmulationSchema = tempEmulationSchema)
return(querySqlToAndromeda(connection = connection,
sql = sql,
andromeda = andromeda,
andromedaTableName = andromedaTableName,
errorReportFile = errorReportFile,
snakeCaseToCamelCase = snakeCaseToCamelCase,
integerAsNumeric = integerAsNumeric,
integer64AsNumeric = integer64AsNumeric))
targetDialect = connection@dbms,
tempEmulationSchema = tempEmulationSchema
)
return(querySqlToAndromeda(
connection = connection,
sql = sql,
andromeda = andromeda,
andromedaTableName = andromedaTableName,
errorReportFile = errorReportFile,
snakeCaseToCamelCase = snakeCaseToCamelCase,
integerAsNumeric = integerAsNumeric,
integer64AsNumeric = integer64AsNumeric
))
}
Loading

0 comments on commit 3b7c5a6

Please # to comment.