From 3b7c5a661405479d51bd230611cdedc1ba7d69b3 Mon Sep 17 00:00:00 2001 From: Admin_mschuemi Date: Thu, 18 Nov 2021 03:30:30 -0500 Subject: [PATCH] Applying styler. Mentioning Spark JDBC drivers in documentation. --- R/Andromeda.R | 238 ++++--- R/BulkLoad.R | 282 ++++---- R/Compression.R | 16 +- R/Connect.R | 403 ++++++----- R/CtasHack.R | 98 +-- R/DBI.R | 403 ++++++----- R/DatabaseConnector.R | 25 +- R/Drivers.R | 103 +-- R/InsertTable.R | 226 +++--- R/ListTables.R | 20 +- R/RStudio.R | 161 +++-- R/Sql.R | 667 ++++++++++-------- README.md | 4 +- docs/articles/UsingDatabaseConnector.html | 8 +- .../header-attrs-2.11/header-attrs.js | 12 + docs/index.html | 4 +- docs/pkgdown.yml | 2 +- docs/reference/DatabaseConnector-package.html | 12 +- docs/reference/connect.html | 32 +- docs/reference/createConnectionDetails.html | 13 +- ...onnect-DatabaseConnectorDriver-method.html | 10 +- docs/reference/disconnect.html | 10 +- docs/reference/downloadJdbcDrivers.html | 2 + docs/reference/executeSql.html | 16 +- docs/reference/insertTable.html | 50 +- docs/reference/jdbcDrivers.html | 6 +- docs/reference/querySql.html | 12 +- docs/reference/querySqlToAndromeda.html | 22 +- docs/reference/renderTranslateExecuteSql.html | 27 +- .../renderTranslateQueryApplyBatched.html | 47 +- docs/reference/renderTranslateQuerySql.html | 21 +- .../renderTranslateQuerySqlToAndromeda.html | 21 +- extras/DatabaseConnector.pdf | Bin 524102 -> 524573 bytes inst/COPYRIGHTS | 9 + inst/doc/UsingDatabaseConnector.pdf | Bin 200050 -> 200472 bytes man-roxygen/DbmsDetails.R | 3 +- man/connect.Rd | 32 +- man/createConnectionDetails.Rd | 13 +- ...bConnect-DatabaseConnectorDriver-method.Rd | 10 +- man/disconnect.Rd | 10 +- man/downloadJdbcDrivers.Rd | 2 + man/executeSql.Rd | 16 +- man/insertTable.Rd | 56 +- man/jdbcDrivers.Rd | 4 +- man/querySql.Rd | 12 +- man/querySqlToAndromeda.Rd | 22 +- man/renderTranslateExecuteSql.Rd | 27 +- man/renderTranslateQueryApplyBatched.Rd | 48 +- man/renderTranslateQuerySql.Rd | 21 +- man/renderTranslateQuerySqlToAndromeda.Rd | 21 +- tests/testthat.R | 2 +- tests/testthat/setup.R | 15 +- tests/testthat/test-BatchProcess.R | 117 +-- tests/testthat/test-DBItest.R | 19 +- tests/testthat/test-compression.R | 10 +- tests/testthat/test-connection.R | 248 ++++--- tests/testthat/test-fetchResults.R | 124 ++-- tests/testthat/test-getTableNames.R | 48 +- tests/testthat/test-insertTable.R | 183 ++--- tests/testthat/test-sendUpdates.R | 72 +- vignettes/UsingDatabaseConnector.Rmd | 2 +- 61 files changed, 2356 insertions(+), 1763 deletions(-) create mode 100644 docs/articles/UsingDatabaseConnector_files/header-attrs-2.11/header-attrs.js diff --git a/R/Andromeda.R b/R/Andromeda.R index 7569b386..6fa944a9 100644 --- a/R/Andromeda.R +++ b/R/Andromeda.R @@ -52,9 +52,11 @@ lowLevelQuerySqlToAndromeda <- function(connection, andromedaTableName, datesAsString = FALSE, integerAsNumeric = getOption("databaseConnectorIntegerAsNumeric", - default = TRUE), + default = TRUE + ), integer64AsNumeric = getOption("databaseConnectorInteger64AsNumeric", - default = TRUE)) { + default = TRUE + )) { UseMethod("lowLevelQuerySqlToAndromeda", connection) } @@ -65,22 +67,28 @@ lowLevelQuerySqlToAndromeda.default <- function(connection, andromedaTableName, datesAsString = FALSE, integerAsNumeric = getOption("databaseConnectorIntegerAsNumeric", - default = TRUE), + default = TRUE + ), integer64AsNumeric = getOption("databaseConnectorInteger64AsNumeric", - default = TRUE)) { - if (rJava::is.jnull(connection@jConnection)) + default = TRUE + )) { + if (rJava::is.jnull(connection@jConnection)) { stop("Connection is closed") + } - batchedQuery <- rJava::.jnew("org.ohdsi.databaseConnector.BatchedQuery", - connection@jConnection, - query, - connection@dbms) + batchedQuery <- rJava::.jnew( + "org.ohdsi.databaseConnector.BatchedQuery", + connection@jConnection, + query, + connection@dbms + ) on.exit(rJava::.jcall(batchedQuery, "V", "clear")) columnTypes <- rJava::.jcall(batchedQuery, "[I", "getColumnTypes") - if (length(columnTypes) == 0) + if (length(columnTypes) == 0) { stop("No columns found") + } if (any(columnTypes == 5)) { validateInt64Query() } @@ -88,16 +96,19 @@ lowLevelQuerySqlToAndromeda.default <- function(connection, while (!rJava::.jcall(batchedQuery, "Z", "isDone")) { rJava::.jcall(batchedQuery, "V", "fetchBatch") batch <- parseJdbcColumnData(batchedQuery, - columnTypes = columnTypes, - datesAsString = datesAsString, - integer64AsNumeric = integer64AsNumeric, - integerAsNumeric = integerAsNumeric) + columnTypes = columnTypes, + datesAsString = datesAsString, + integer64AsNumeric = integer64AsNumeric, + integerAsNumeric = integerAsNumeric + ) - RSQLite::dbWriteTable(conn = andromeda, - name = andromedaTableName, - value = batch, - overwrite = first, - append = !first) + RSQLite::dbWriteTable( + conn = andromeda, + name = andromedaTableName, + value = batch, + overwrite = first, + append = !first + ) first <- FALSE } invisible(andromeda) @@ -110,20 +121,24 @@ lowLevelQuerySqlToAndromeda.DatabaseConnectorDbiConnection <- function(connectio andromedaTableName, datesAsString = FALSE, integerAsNumeric = getOption("databaseConnectorIntegerAsNumeric", - default = TRUE), + default = TRUE + ), integer64AsNumeric = getOption("databaseConnectorInteger64AsNumeric", - default = TRUE)) { - + default = TRUE + )) { results <- lowLevelQuerySql(connection, - query, - integerAsNumeric = integerAsNumeric, - integer64AsNumeric = integer64AsNumeric) + query, + integerAsNumeric = integerAsNumeric, + integer64AsNumeric = integer64AsNumeric + ) - RSQLite::dbWriteTable(conn = andromeda, - name = andromedaTableName, - value = results, - overwrite = TRUE, - append = FALSE) + RSQLite::dbWriteTable( + conn = andromeda, + name = andromedaTableName, + value = results, + overwrite = TRUE, + append = FALSE + ) invisible(andromeda) } @@ -164,16 +179,20 @@ lowLevelQuerySqlToAndromeda.DatabaseConnectorDbiConnection <- function(connectio #' @examples #' \dontrun{ #' andromeda <- Andromeda::andromeda() -#' connectionDetails <- createConnectionDetails(dbms = "postgresql", -#' server = "localhost", -#' user = "root", -#' password = "blah", -#' schema = "cdm_v4") +#' connectionDetails <- createConnectionDetails( +#' dbms = "postgresql", +#' server = "localhost", +#' user = "root", +#' password = "blah", +#' schema = "cdm_v4" +#' ) #' conn <- connect(connectionDetails) -#' querySqlToAndromeda(connection = conn, -#' sql = "SELECT * FROM person;", -#' andromeda = andromeda, -#' andromedaTableName = "foo") +#' querySqlToAndromeda( +#' connection = conn, +#' sql = "SELECT * FROM person;", +#' andromeda = andromeda, +#' andromedaTableName = "foo" +#' ) #' disconnect(conn) #' #' andromeda$foo @@ -186,45 +205,61 @@ querySqlToAndromeda <- function(connection, errorReportFile = file.path(getwd(), "errorReportSql.txt"), snakeCaseToCamelCase = FALSE, integerAsNumeric = getOption("databaseConnectorIntegerAsNumeric", - default = TRUE), + default = TRUE + ), integer64AsNumeric = getOption("databaseConnectorInteger64AsNumeric", - default = TRUE)) { - if (inherits(connection, - "DatabaseConnectorJdbcConnection") && rJava::is.jnull(connection@jConnection)) + default = TRUE + )) { + if (inherits( + connection, + "DatabaseConnectorJdbcConnection" + ) && rJava::is.jnull(connection@jConnection)) { stop("Connection is closed") - if (!inherits(andromeda, "SQLiteConnection")) + } + if (!inherits(andromeda, "SQLiteConnection")) { stop("The andromeda argument must be an Andromeda object (or SQLiteConnection objecT).") + } # Calling splitSql, because this will also strip trailing semicolons (which cause Oracle to crash). sqlStatements <- SqlRender::splitSql(sql) - if (length(sqlStatements) > 1) - stop(paste("A query that returns a result can only consist of one SQL statement, but", - length(sqlStatements), - "statements were found")) - tryCatch({ - lowLevelQuerySqlToAndromeda(connection = connection, - query = sqlStatements[1], - andromeda = andromeda, - andromedaTableName = andromedaTableName, - integerAsNumeric = integerAsNumeric, - integer64AsNumeric = integer64AsNumeric) - columnNames <- RSQLite::dbListFields(andromeda, andromedaTableName) - newColumnNames <- toupper(columnNames) - if (snakeCaseToCamelCase) { - newColumnNames <- SqlRender::snakeCaseToCamelCase(newColumnNames) - } - idx <- columnNames != newColumnNames - if (any(idx)) { - sql <- sprintf("ALTER TABLE %s RENAME COLUMN %s TO %s;", - andromedaTableName, - columnNames[idx], - newColumnNames[idx]) - lapply(sql, function(x) RSQLite::dbExecute(andromeda, x)) + if (length(sqlStatements) > 1) { + stop(paste( + "A query that returns a result can only consist of one SQL statement, but", + length(sqlStatements), + "statements were found" + )) + } + tryCatch( + { + lowLevelQuerySqlToAndromeda( + connection = connection, + query = sqlStatements[1], + andromeda = andromeda, + andromedaTableName = andromedaTableName, + integerAsNumeric = integerAsNumeric, + integer64AsNumeric = integer64AsNumeric + ) + columnNames <- RSQLite::dbListFields(andromeda, andromedaTableName) + newColumnNames <- toupper(columnNames) + if (snakeCaseToCamelCase) { + newColumnNames <- SqlRender::snakeCaseToCamelCase(newColumnNames) + } + idx <- columnNames != newColumnNames + if (any(idx)) { + sql <- sprintf( + "ALTER TABLE %s RENAME COLUMN %s TO %s;", + andromedaTableName, + columnNames[idx], + newColumnNames[idx] + ) + lapply(sql, function(x) RSQLite::dbExecute(andromeda, x)) + } + invisible(andromeda) + }, + error = function(err) { + .createErrorReport(connection@dbms, err$message, sql, errorReportFile) } - invisible(andromeda) - }, error = function(err) { - .createErrorReport(connection@dbms, err$message, sql, errorReportFile) - }) + ) } #' Render, translate, and query to local Andromeda @@ -265,17 +300,20 @@ querySqlToAndromeda <- function(connection, #' #' @examples #' \dontrun{ -#' connectionDetails <- createConnectionDetails(dbms = "postgresql", -#' server = "localhost", -#' user = "root", -#' password = "blah", -#' schema = "cdm_v4") +#' connectionDetails <- createConnectionDetails( +#' dbms = "postgresql", +#' server = "localhost", +#' user = "root", +#' password = "blah", +#' schema = "cdm_v4" +#' ) #' conn <- connect(connectionDetails) #' renderTranslatequerySqlToAndromeda(conn, -#' sql = "SELECT * FROM @@schema.person", -#' schema = "cdm_synpuf", -#' andromeda = andromeda, -#' andromedaTableName = "foo") +#' sql = "SELECT * FROM @@schema.person", +#' schema = "cdm_synpuf", +#' andromeda = andromeda, +#' andromedaTableName = "foo" +#' ) #' disconnect(conn) #' #' andromeda$foo @@ -285,32 +323,40 @@ renderTranslateQuerySqlToAndromeda <- function(connection, sql, andromeda, andromedaTableName, - errorReportFile = file.path(getwd(), - "errorReportSql.txt"), + errorReportFile = file.path( + getwd(), + "errorReportSql.txt" + ), snakeCaseToCamelCase = FALSE, oracleTempSchema = NULL, tempEmulationSchema = getOption("sqlRenderTempEmulationSchema"), integerAsNumeric = getOption("databaseConnectorIntegerAsNumeric", - default = TRUE), + default = TRUE + ), integer64AsNumeric = getOption("databaseConnectorInteger64AsNumeric", - default = TRUE), + default = TRUE + ), ...) { if (!is.null(oracleTempSchema) && oracleTempSchema != "") { warn("The 'oracleTempSchema' argument is deprecated. Use 'tempEmulationSchema' instead.", - .frequency = "regularly", - .frequency_id = "oracleTempSchema") + .frequency = "regularly", + .frequency_id = "oracleTempSchema" + ) tempEmulationSchema <- oracleTempSchema } sql <- SqlRender::render(sql, ...) sql <- SqlRender::translate(sql, - targetDialect = connection@dbms, - tempEmulationSchema = tempEmulationSchema) - return(querySqlToAndromeda(connection = connection, - sql = sql, - andromeda = andromeda, - andromedaTableName = andromedaTableName, - errorReportFile = errorReportFile, - snakeCaseToCamelCase = snakeCaseToCamelCase, - integerAsNumeric = integerAsNumeric, - integer64AsNumeric = integer64AsNumeric)) + targetDialect = connection@dbms, + tempEmulationSchema = tempEmulationSchema + ) + return(querySqlToAndromeda( + connection = connection, + sql = sql, + andromeda = andromeda, + andromedaTableName = andromedaTableName, + errorReportFile = errorReportFile, + snakeCaseToCamelCase = snakeCaseToCamelCase, + integerAsNumeric = integerAsNumeric, + integer64AsNumeric = integer64AsNumeric + )) } diff --git a/R/BulkLoad.R b/R/BulkLoad.R index e3e5c123..7ec1a814 100644 --- a/R/BulkLoad.R +++ b/R/BulkLoad.R @@ -26,16 +26,16 @@ checkBulkLoadCredentials <- function(connection) { } else if (connection@dbms == "redshift") { envSet <- FALSE bucket <- FALSE - + if (Sys.getenv("AWS_ACCESS_KEY_ID") != "" && Sys.getenv("AWS_SECRET_ACCESS_KEY") != "" && Sys.getenv("AWS_BUCKET_NAME") != - "" && Sys.getenv("AWS_DEFAULT_REGION") != "") { + "" && Sys.getenv("AWS_DEFAULT_REGION") != "") { envSet <- TRUE } ensure_installed("aws.s3") if (aws.s3::bucket_exists(bucket = Sys.getenv("AWS_BUCKET_NAME"))) { bucket <- TRUE } - + if (Sys.getenv("AWS_SSE_TYPE") == "") { warn("Not using Server Side Encryption for AWS S3") } @@ -56,7 +56,7 @@ checkBulkLoadCredentials <- function(connection) { warn("Using ssh password authentication, it's recommended to use keyfile instead") } return(TRUE) - } else if (connection@dbms == "postgresql") { + } else if (connection@dbms == "postgresql") { if (Sys.getenv("POSTGRES_PATH") == "") { inform("Please set environment variable POSTGRES_PATH to Postgres binary path (e.g. 'C:/Program Files/PostgreSQL/11/bin'.") return(FALSE) @@ -74,9 +74,11 @@ getHiveSshUser <- function() { countRows <- function(connection, sqlTableName) { sql <- "SELECT COUNT(*) FROM @table" - count <- renderTranslateQuerySql(connection = connection, - sql = sql, - table = sqlTableName) + count <- renderTranslateQuerySql( + connection = connection, + sql = sql, + table = sqlTableName + ) return(count[1, 1]) } @@ -92,59 +94,69 @@ bulkLoadPdw <- function(connection, sqlTableName, sqlDataTypes, data) { eol <- "\r\n" csvFileName <- tempfile("pdw_insert_", fileext = ".csv") gzFileName <- tempfile("pdw_insert_", fileext = ".gz") - write.table(x = data, - na = "", - file = csvFileName, - row.names = FALSE, - quote = FALSE, - col.names = TRUE, - sep = "~*~") + write.table( + x = data, + na = "", + file = csvFileName, + row.names = FALSE, + quote = FALSE, + col.names = TRUE, + sep = "~*~" + ) on.exit(unlink(csvFileName)) R.utils::gzip(filename = csvFileName, destname = gzFileName, remove = TRUE) on.exit(unlink(gzFileName), add = TRUE) - + if (is.null(attr(connection, "user")()) && is.null(attr(connection, "password")())) { auth <- "-W" } else { - auth <- sprintf("-U %1s -P %2s", attr(connection, "user")(), attr(connection, "password")()) + auth <- sprintf("-U %1s -P %2s", attr(connection, "user")(), attr(connection, "password")()) } - - databaseMetaData <- rJava::.jcall(connection@jConnection, - "Ljava/sql/DatabaseMetaData;", - "getMetaData") + + databaseMetaData <- rJava::.jcall( + connection@jConnection, + "Ljava/sql/DatabaseMetaData;", + "getMetaData" + ) url <- rJava::.jcall(databaseMetaData, "Ljava/lang/String;", "getURL") pdwServer <- urltools::url_parse(url)$domain - + if (pdwServer == "" | is.null(pdwServer)) { abort("PDW Server name cannot be parsed from JDBC URL string") } - - command <- sprintf("%1s -M append -e UTF8 -i %2s -T %3s -R dwloader.txt -fh 1 -t %4s -r %5s -D ymd -E -se -rv 1 -S %6s %7s", - shQuote(Sys.getenv("DWLOADER_PATH")), - shQuote(gzFileName), - sqlTableName, - shQuote("~*~"), - shQuote(eol), - pdwServer, - auth) + + command <- sprintf( + "%1s -M append -e UTF8 -i %2s -T %3s -R dwloader.txt -fh 1 -t %4s -r %5s -D ymd -E -se -rv 1 -S %6s %7s", + shQuote(Sys.getenv("DWLOADER_PATH")), + shQuote(gzFileName), + sqlTableName, + shQuote("~*~"), + shQuote(eol), + pdwServer, + auth + ) countBefore <- countRows(connection, sqlTableName) - tryCatch({ - system(command, - intern = FALSE, - ignore.stdout = FALSE, - ignore.stderr = FALSE, - wait = TRUE, - input = NULL, - show.output.on.console = FALSE, - minimized = FALSE, - invisible = TRUE) - delta <- Sys.time() - start - inform(paste("Bulk load to PDW took", signif(delta, 3), attr(delta, "units"))) - }, error = function(e) { - abort("Error in PDW bulk upload. Please check dwloader.txt and dwloader.txt.reason.") - }) + tryCatch( + { + system(command, + intern = FALSE, + ignore.stdout = FALSE, + ignore.stderr = FALSE, + wait = TRUE, + input = NULL, + show.output.on.console = FALSE, + minimized = FALSE, + invisible = TRUE + ) + delta <- Sys.time() - start + inform(paste("Bulk load to PDW took", signif(delta, 3), attr(delta, "units"))) + }, + error = function(e) { + abort("Error in PDW bulk upload. Please check dwloader.txt and dwloader.txt.reason.") + } + ) countAfter <- countRows(connection, sqlTableName) - + if (countAfter - countBefore != nrow(data)) { abort(paste("Something went wrong when bulk uploading. Data has", nrow(data), "rows, but table has", (countAfter - countBefore), "new records")) } @@ -154,7 +166,7 @@ bulkLoadRedshift <- function(connection, sqlTableName, data) { ensure_installed("R.utils") ensure_installed("aws.s3") start <- Sys.time() - + csvFileName <- tempfile("redshift_insert_", fileext = ".csv") gzFileName <- tempfile("redshift_insert_", fileext = ".gz") write.csv(x = data, na = "", file = csvFileName, row.names = FALSE, quote = TRUE) @@ -162,33 +174,43 @@ bulkLoadRedshift <- function(connection, sqlTableName, data) { R.utils::gzip(filename = csvFileName, destname = gzFileName, remove = TRUE) on.exit(unlink(gzFileName), add = TRUE) - s3Put <- aws.s3::put_object(file = gzFileName, - check_region = FALSE, - headers = list(`x-amz-server-side-encryption` = Sys.getenv("AWS_SSE_TYPE")), - object = paste(Sys.getenv("AWS_OBJECT_KEY"), basename(gzFileName), sep = "/"), - bucket = Sys.getenv("AWS_BUCKET_NAME")) + s3Put <- aws.s3::put_object( + file = gzFileName, + check_region = FALSE, + headers = list(`x-amz-server-side-encryption` = Sys.getenv("AWS_SSE_TYPE")), + object = paste(Sys.getenv("AWS_OBJECT_KEY"), basename(gzFileName), sep = "/"), + bucket = Sys.getenv("AWS_BUCKET_NAME") + ) if (!s3Put) { abort("Failed to upload data to AWS S3. Please check your credentials and access.") } - on.exit(aws.s3::delete_object(object = paste(Sys.getenv("AWS_OBJECT_KEY"), basename(gzFileName), sep = "/"), - bucket = Sys.getenv("AWS_BUCKET_NAME")), - add = TRUE) - - sql <- SqlRender::loadRenderTranslateSql(sqlFilename = "redshiftCopy.sql", - packageName = "DatabaseConnector", - dbms = "redshift", - sqlTableName = sqlTableName, - fileName = basename(gzFileName), - s3RepoName = Sys.getenv("AWS_BUCKET_NAME"), - pathToFiles = Sys.getenv("AWS_OBJECT_KEY"), - awsAccessKey = Sys.getenv("AWS_ACCESS_KEY_ID"), - awsSecretAccessKey = Sys.getenv("AWS_SECRET_ACCESS_KEY")) - - tryCatch({ - DatabaseConnector::executeSql(connection = connection, sql = sql, reportOverallTime = FALSE) - }, error = function(e) { - abort("Error in Redshift bulk upload. Please check stl_load_errors and Redshift/S3 access.") - }) + on.exit(aws.s3::delete_object( + object = paste(Sys.getenv("AWS_OBJECT_KEY"), basename(gzFileName), sep = "/"), + bucket = Sys.getenv("AWS_BUCKET_NAME") + ), + add = TRUE + ) + + sql <- SqlRender::loadRenderTranslateSql( + sqlFilename = "redshiftCopy.sql", + packageName = "DatabaseConnector", + dbms = "redshift", + sqlTableName = sqlTableName, + fileName = basename(gzFileName), + s3RepoName = Sys.getenv("AWS_BUCKET_NAME"), + pathToFiles = Sys.getenv("AWS_OBJECT_KEY"), + awsAccessKey = Sys.getenv("AWS_ACCESS_KEY_ID"), + awsSecretAccessKey = Sys.getenv("AWS_SECRET_ACCESS_KEY") + ) + + tryCatch( + { + DatabaseConnector::executeSql(connection = connection, sql = sql, reportOverallTime = FALSE) + }, + error = function(e) { + abort("Error in Redshift bulk upload. Please check stl_load_errors and Redshift/S3 access.") + } + ) delta <- Sys.time() - start inform(paste("Bulk load to Redshift took", signif(delta, 3), attr(delta, "units"))) } @@ -197,12 +219,12 @@ bulkLoadHive <- function(connection, sqlTableName, sqlFieldNames, data) { sqlFieldNames <- strsplit(sqlFieldNames, ",")[[1]] if (tolower(Sys.info()["sysname"]) == "windows") { ensure_installed("ssh") - } + } start <- Sys.time() csvFileName <- tempfile("hive_insert_", fileext = ".csv") write.csv(x = data, na = "", file = csvFileName, row.names = FALSE, quote = TRUE) on.exit(unlink(csvFileName)) - + hiveUser <- getHiveSshUser() hivePasswd <- Sys.getenv("HIVE_SSH_PASSWORD") hiveHost <- Sys.getenv("HIVE_NODE_HOST") @@ -210,53 +232,61 @@ bulkLoadHive <- function(connection, sqlTableName, sqlFieldNames, data) { nodePort <- (function(port) if (port == "") "8020" else port)(Sys.getenv("HIVE_NODE_PORT")) hiveKeyFile <- (function(keyfile) if (keyfile == "") NULL else keyfile)(Sys.getenv("HIVE_KEYFILE")) hadoopUser <- (function(hadoopUser) if (hadoopUser == "") "hive" else hadoopUser)(Sys.getenv("HADOOP_USER_NAME")) - - tryCatch({ - if (tolower(Sys.info()["sysname"]) == "windows") { - session <- ssh::ssh_connect(host = sprintf("%s@%s:%s", hiveUser, hiveHost, sshPort), passwd = hivePasswd, keyfile = hiveKeyFile) - remoteFile <- paste0("/tmp/", basename(csvFileName)) - ssh::scp_upload(session, csvFileName, to = remoteFile, verbose = FALSE) - hadoopDir <- sprintf("/user/%s/%s", hadoopUser, generateRandomString(30)) - hadoopFile <- paste0(hadoopDir, "/", basename(csvFileName)) - ssh::ssh_exec_wait(session, sprintf("HADOOP_USER_NAME=%s hadoop fs -mkdir %s", hadoopUser, hadoopDir)) - command <- sprintf("HADOOP_USER_NAME=%s hadoop fs -put %s %s", hadoopUser, remoteFile, hadoopFile) - ssh::ssh_exec_wait(session, command = command) - } else { - remoteFile <- paste0("/tmp/", basename(csvFileName)) - scp_command <- sprintf("sshpass -p \'%s\' scp -P %s %s %s:%s", hivePasswd, sshPort, csvFileName, hiveHost, remoteFile) - system(scp_command) - hadoopDir <- sprintf("/user/%s/%s", hadoopUser, generateRandomString(30)) - hadoopFile <- paste0(hadoopDir, "/", basename(csvFileName)) - hdp_mk_dir_command <- sprintf("sshpass -p \'%s\' ssh %s -p %s HADOOP_USER_NAME=%s hadoop fs -mkdir %s", hivePasswd, hiveHost, sshPort, hadoopUser, hadoopDir) - system(hdp_mk_dir_command) - hdp_put_command <- sprintf("sshpass -p \'%s\' ssh %s -p %s HADOOP_USER_NAME=%s hadoop fs -put %s %s", hivePasswd, hiveHost, sshPort, hadoopUser, remoteFile, hadoopFile) - system(hdp_put_command) - } - def <- function(name) { - return(paste(name, "STRING")) + + tryCatch( + { + if (tolower(Sys.info()["sysname"]) == "windows") { + session <- ssh::ssh_connect(host = sprintf("%s@%s:%s", hiveUser, hiveHost, sshPort), passwd = hivePasswd, keyfile = hiveKeyFile) + remoteFile <- paste0("/tmp/", basename(csvFileName)) + ssh::scp_upload(session, csvFileName, to = remoteFile, verbose = FALSE) + hadoopDir <- sprintf("/user/%s/%s", hadoopUser, generateRandomString(30)) + hadoopFile <- paste0(hadoopDir, "/", basename(csvFileName)) + ssh::ssh_exec_wait(session, sprintf("HADOOP_USER_NAME=%s hadoop fs -mkdir %s", hadoopUser, hadoopDir)) + command <- sprintf("HADOOP_USER_NAME=%s hadoop fs -put %s %s", hadoopUser, remoteFile, hadoopFile) + ssh::ssh_exec_wait(session, command = command) + } else { + remoteFile <- paste0("/tmp/", basename(csvFileName)) + scp_command <- sprintf("sshpass -p \'%s\' scp -P %s %s %s:%s", hivePasswd, sshPort, csvFileName, hiveHost, remoteFile) + system(scp_command) + hadoopDir <- sprintf("/user/%s/%s", hadoopUser, generateRandomString(30)) + hadoopFile <- paste0(hadoopDir, "/", basename(csvFileName)) + hdp_mk_dir_command <- sprintf("sshpass -p \'%s\' ssh %s -p %s HADOOP_USER_NAME=%s hadoop fs -mkdir %s", hivePasswd, hiveHost, sshPort, hadoopUser, hadoopDir) + system(hdp_mk_dir_command) + hdp_put_command <- sprintf("sshpass -p \'%s\' ssh %s -p %s HADOOP_USER_NAME=%s hadoop fs -put %s %s", hivePasswd, hiveHost, sshPort, hadoopUser, remoteFile, hadoopFile) + system(hdp_put_command) + } + def <- function(name) { + return(paste(name, "STRING")) + } + fdef <- paste(sapply(sqlFieldNames, def), collapse = ", ") + sql <- SqlRender::render("CREATE TABLE @table(@fdef) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE LOCATION 'hdfs://@hiveHost:@nodePort@filename';", + filename = hadoopDir, table = sqlTableName, fdef = fdef, hiveHost = hiveHost, nodePort = nodePort + ) + sql <- SqlRender::translate(sql, targetDialect = "hive", tempEmulationSchema = NULL) + + tryCatch( + { + DatabaseConnector::executeSql(connection = connection, sql = sql, reportOverallTime = FALSE) + delta <- Sys.time() - start + inform(paste("Bulk load to Hive took", signif(delta, 3), attr(delta, "units"))) + }, + error = function(e) { + abort(paste("Error in Hive bulk upload: ", e$message)) + } + ) + }, + finally = { + if (tolower(Sys.info()["sysname"]) == "windows") { + ssh::ssh_disconnect(session) + } } - fdef <- paste(sapply(sqlFieldNames, def), collapse = ", ") - sql <- SqlRender::render("CREATE TABLE @table(@fdef) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE LOCATION 'hdfs://@hiveHost:@nodePort@filename';", - filename = hadoopDir, table = sqlTableName, fdef = fdef, hiveHost = hiveHost, nodePort = nodePort) - sql <- SqlRender::translate(sql, targetDialect = "hive", tempEmulationSchema = NULL) - - tryCatch({ - DatabaseConnector::executeSql(connection = connection, sql = sql, reportOverallTime = FALSE) - delta <- Sys.time() - start - inform(paste("Bulk load to Hive took", signif(delta, 3), attr(delta, "units"))) - }, error = function(e) { - abort(paste("Error in Hive bulk upload: ", e$message)) - }) - }, finally = { - if (tolower(Sys.info()["sysname"]) == "windows") - ssh::ssh_disconnect(session) - }) + ) } bulkLoadPostgres <- function(connection, sqlTableName, sqlFieldNames, sqlDataTypes, data) { startTime <- Sys.time() - + for (i in 1:ncol(data)) { if (sqlDataTypes[i] == "INT") { data[, i] <- format(data[, i], scientific = FALSE) @@ -284,27 +314,29 @@ bulkLoadPostgres <- function(connection, sqlTableName, sqlFieldNames, sqlDataTyp if (is.null(port)) { port <- 5432 } - + connInfo <- sprintf("host='%s' port='%s' dbname='%s' user='%s' password='%s'", hostServerDb[[1]], port, hostServerDb[[2]], user, password) - copyCommand <- paste(shQuote(command), - "-d \"", - connInfo, - "\" -c \"\\copy", sqlTableName, - headers, - "FROM", shQuote(csvFileName), - "NULL 'NA' DELIMITER ',' CSV HEADER;\"") - + copyCommand <- paste( + shQuote(command), + "-d \"", + connInfo, + "\" -c \"\\copy", sqlTableName, + headers, + "FROM", shQuote(csvFileName), + "NULL 'NA' DELIMITER ',' CSV HEADER;\"" + ) + countBefore <- countRows(connection, sqlTableName) result <- base::system(copyCommand) countAfter <- countRows(connection, sqlTableName) - + if (result != 0) { abort(paste("Error while bulk uploading data, psql returned a non zero status. Status = ", result)) } if (countAfter - countBefore != nrow(data)) { abort(paste("Something went wrong when bulk uploading. Data has", nrow(data), "rows, but table has", (countAfter - countBefore), "new records")) } - + delta <- Sys.time() - startTime inform(paste("Bulk load to PostgreSQL took", signif(delta, 3), attr(delta, "units"))) } diff --git a/R/Compression.R b/R/Compression.R index 3c141e07..34bc03e6 100644 --- a/R/Compression.R +++ b/R/Compression.R @@ -17,14 +17,14 @@ # limitations under the License. #' Compress files and/or folders into a single zip file -#' -#' @details +#' +#' @details #' Uses Java's compression library to create a zip file. It is similar to \code{utils::zip}, except #' that it does not require an external zip tool to be available on the system path. #' #' @param zipFile The path to the zip file to be created. #' @param files The files and/or folders to be included in the zip file. Folders will be included recursively. -#' @param rootFolder The root folder. All files will be stored with relative paths relative to this folder. +#' @param rootFolder The root folder. All files will be stored with relative paths relative to this folder. #' @param compressionLevel A number between 1 and 9. 9 compresses best, but it also takes the longest. #' #' @export @@ -33,8 +33,10 @@ createZipFile <- function(zipFile, files, rootFolder = getwd(), compressionLevel suppressWarnings(zipFile <- normalizePath(as.character(zipFile), mustWork = FALSE)) rootFolder <- normalizePath(as.character(rootFolder)) compressionLevel <- as.integer(compressionLevel) - rJava::J("org.ohdsi.databaseConnector.Compression")$createZipFile(files, - rootFolder, - zipFile, - compressionLevel) + rJava::J("org.ohdsi.databaseConnector.Compression")$createZipFile( + files, + rootFolder, + zipFile, + compressionLevel + ) } diff --git a/R/Connect.R b/R/Connect.R index d8df0354..f2794046 100644 --- a/R/Connect.R +++ b/R/Connect.R @@ -3,13 +3,13 @@ # Copyright 2021 Observational Health Data Sciences and Informatics # # This file is part of DatabaseConnector -# +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,20 +17,24 @@ # limitations under the License. checkIfDbmsIsSupported <- function(dbms) { - supportedDbmss <- c("oracle", - "postgresql", - "redshift", - "sql server", - "pdw", - "netezza", - "bigquery", - "sqlite", - "sqlite extended", - "spark") + supportedDbmss <- c( + "oracle", + "postgresql", + "redshift", + "sql server", + "pdw", + "netezza", + "bigquery", + "sqlite", + "sqlite extended", + "spark" + ) if (!dbms %in% supportedDbmss) { - abort(sprintf("DBMS '%s' not supported. Please use one of these values: '%s'", - dbms, - paste(supportedDbmss, collapse = "', '"))) + abort(sprintf( + "DBMS '%s' not supported. Please use one of these values: '%s'", + dbms, + paste(supportedDbmss, collapse = "', '") + )) } } @@ -64,10 +68,12 @@ checkIfDbmsIsSupported <- function(dbms) { #' A list with all the details needed to connect to a database. #' @examples #' \dontrun{ -#' connectionDetails <- createConnectionDetails(dbms = "postgresql", -#' server = "localhost/postgres", -#' user = "root", -#' password = "blah") +#' connectionDetails <- createConnectionDetails( +#' dbms = "postgresql", +#' server = "localhost/postgres", +#' user = "root", +#' password = "blah" +#' ) #' conn <- connect(connectionDetails) #' dbGetQuery(conn, "SELECT COUNT(*) FROM person") #' disconnect(conn) @@ -84,36 +90,40 @@ createConnectionDetails <- function(dbms, pathToDriver = Sys.getenv("DATABASECONNECTOR_JAR_FOLDER")) { checkIfDbmsIsSupported(dbms) pathToDriver <- path.expand(pathToDriver) - if (!dir.exists(pathToDriver) && !dbms %in% c("sqlite", "sqlite extended")) { + if (!dir.exists(pathToDriver) && !dbms %in% c("sqlite", "sqlite extended")) { if (file.exists(pathToDriver)) { abort(paste0("The folder location pathToDriver = '", pathToDriver, "' points to a file, but should point to a folder.")) } else { - abort(paste0("The folder location pathToDriver = '", pathToDriver, "' does not exist.", - "Please set the folder to the location containing the JDBC driver.", - "You can download most drivers using the `downloadJdbcDrivers()` function.")) + abort(paste0( + "The folder location pathToDriver = '", pathToDriver, "' does not exist.", + "Please set the folder to the location containing the JDBC driver.", + "You can download most drivers using the `downloadJdbcDrivers()` function." + )) } } - - result <- list(dbms = dbms, - extraSettings = extraSettings, - oracleDriver = oracleDriver, - pathToDriver = pathToDriver) - + + result <- list( + dbms = dbms, + extraSettings = extraSettings, + oracleDriver = oracleDriver, + pathToDriver = pathToDriver + ) + userExpression <- rlang::enquo(user) result$user <- function() rlang::eval_tidy(userExpression) - + passWordExpression <- rlang::enquo(password) result$password <- function() rlang::eval_tidy(passWordExpression) - + serverExpression <- rlang::enquo(server) result$server <- function() rlang::eval_tidy(serverExpression) - + portExpression <- rlang::enquo(port) result$port <- function() rlang::eval_tidy(portExpression) - + csExpression <- rlang::enquo(connectionString) result$connectionString <- function() rlang::eval_tidy(csExpression) - + class(result) <- "connectionDetails" return(result) } @@ -148,10 +158,12 @@ createConnectionDetails <- function(dbms, #' #' @examples #' \dontrun{ -#' conn <- connect(dbms = "postgresql", -#' server = "localhost/postgres", -#' user = "root", -#' password = "xxx") +#' conn <- connect( +#' dbms = "postgresql", +#' server = "localhost/postgres", +#' user = "root", +#' password = "xxx" +#' ) #' dbGetQuery(conn, "SELECT COUNT(*) FROM person") #' disconnect(conn) #' @@ -159,19 +171,22 @@ createConnectionDetails <- function(dbms, #' dbGetQuery(conn, "SELECT COUNT(*) FROM concept") #' disconnect(conn) #' -#' conn <- connect(dbms = "oracle", -#' server = "127.0.0.1/xe", -#' user = "system", -#' password = "xxx", -#' pathToDriver = "c:/temp") +#' conn <- connect( +#' dbms = "oracle", +#' server = "127.0.0.1/xe", +#' user = "system", +#' password = "xxx", +#' pathToDriver = "c:/temp" +#' ) #' dbGetQuery(conn, "SELECT COUNT(*) FROM test_table") #' disconnect(conn) #' -#' conn <- connect(dbms = "postgresql", -#' connectionString = "jdbc:postgresql://127.0.0.1:5432/cmd_database") +#' conn <- connect( +#' dbms = "postgresql", +#' connectionString = "jdbc:postgresql://127.0.0.1:5432/cmd_database" +#' ) #' dbGetQuery(conn, "SELECT COUNT(*) FROM person") #' disconnect(conn) -#' #' } #' @export connect <- function(connectionDetails = NULL, @@ -185,31 +200,35 @@ connect <- function(connectionDetails = NULL, connectionString = NULL, pathToDriver = Sys.getenv("DATABASECONNECTOR_JAR_FOLDER")) { if (!missing(connectionDetails) && !is.null(connectionDetails)) { - connection <- connect(dbms = connectionDetails$dbms, - user = connectionDetails$user(), - password = connectionDetails$password(), - server = connectionDetails$server(), - port = connectionDetails$port(), - extraSettings = connectionDetails$extraSettings, - oracleDriver = connectionDetails$oracleDriver, - connectionString = connectionDetails$connectionString(), - pathToDriver = connectionDetails$pathToDriver) - + connection <- connect( + dbms = connectionDetails$dbms, + user = connectionDetails$user(), + password = connectionDetails$password(), + server = connectionDetails$server(), + port = connectionDetails$port(), + extraSettings = connectionDetails$extraSettings, + oracleDriver = connectionDetails$oracleDriver, + connectionString = connectionDetails$connectionString(), + pathToDriver = connectionDetails$pathToDriver + ) + return(connection) } checkIfDbmsIsSupported(dbms) - + pathToDriver <- path.expand(pathToDriver) - if (!dir.exists(pathToDriver) && !dbms %in% c("sqlite", "sqlite extended")) { + if (!dir.exists(pathToDriver) && !dbms %in% c("sqlite", "sqlite extended")) { if (file.exists(pathToDriver)) { abort(paste0("The folder location pathToDriver = '", pathToDriver, "' points to a file, but should point to a folder.")) } else { - abort(paste0("The folder location pathToDriver = '", pathToDriver, "' does not exist.", - "Please set the folder to the location containing the JDBC driver.", - "You can download most drivers using the `downloadJdbcDrivers()` function.")) + abort(paste0( + "The folder location pathToDriver = '", pathToDriver, "' does not exist.", + "Please set the folder to the location containing the JDBC driver.", + "You can download most drivers using the `downloadJdbcDrivers()` function." + )) } } - + if (dbms == "sql server") { jarPath <- findPathToJar("^mssql-jdbc.*.jar$|^sqljdbc.*\\.jar$", pathToDriver) driver <- getJbcDriverSingleton("com.microsoft.sqlserver.jdbc.SQLServerDriver", jarPath) @@ -217,13 +236,15 @@ connect <- function(connectionDetails = NULL, # Using Windows integrated security inform("Connecting using SQL Server driver using Windows integrated security") setPathToDll() - + if (missing(connectionString) || is.null(connectionString)) { connectionString <- paste("jdbc:sqlserver://", server, ";integratedSecurity=true", sep = "") - if (!missing(port) && !is.null(port)) + if (!missing(port) && !is.null(port)) { connectionString <- paste(connectionString, ";port=", port, sep = "") - if (!missing(extraSettings) && !is.null(extraSettings)) + } + if (!missing(extraSettings) && !is.null(extraSettings)) { connectionString <- paste(connectionString, ";", extraSettings, sep = "") + } } connection <- connectUsingJdbcDriver(driver, connectionString, dbms = dbms) } else { @@ -231,16 +252,19 @@ connect <- function(connectionDetails = NULL, inform("Connecting using SQL Server driver") if (missing(connectionString) || is.null(connectionString)) { connectionString <- paste("jdbc:sqlserver://", server, sep = "") - if (!missing(port) && !is.null(port)) + if (!missing(port) && !is.null(port)) { connectionString <- paste(connectionString, ";port=", port, sep = "") - if (!missing(extraSettings) && !is.null(extraSettings)) + } + if (!missing(extraSettings) && !is.null(extraSettings)) { connectionString <- paste(connectionString, ";", extraSettings, sep = "") + } } connection <- connectUsingJdbcDriver(driver, - connectionString, - user = user, - password = password, - dbms = dbms) + connectionString, + user = user, + password = password, + dbms = dbms + ) } attr(connection, "dbms") <- dbms return(connection) @@ -252,31 +276,37 @@ connect <- function(connectionDetails = NULL, if (missing(user) || is.null(user)) { # Using Windows integrated security setPathToDll() - + if (missing(connectionString) || is.null(connectionString)) { connectionString <- paste("jdbc:sqlserver://", server, ";integratedSecurity=true", sep = "") - if (!missing(port) && !is.null(port)) + if (!missing(port) && !is.null(port)) { connectionString <- paste(connectionString, ";port=", port, sep = "") - if (!missing(extraSettings) && !is.null(extraSettings)) + } + if (!missing(extraSettings) && !is.null(extraSettings)) { connectionString <- paste(connectionString, ";", extraSettings, sep = "") + } } connection <- connectUsingJdbcDriver(driver, connectionString, dbms = dbms) } else { if (missing(connectionString) || is.null(connectionString)) { connectionString <- paste("jdbc:sqlserver://", - server, - ";integratedSecurity=false", - sep = "") - if (!missing(port) && !is.null(port)) + server, + ";integratedSecurity=false", + sep = "" + ) + if (!missing(port) && !is.null(port)) { connectionString <- paste(connectionString, ";port=", port, sep = "") - if (!missing(extraSettings) && !is.null(extraSettings)) + } + if (!missing(extraSettings) && !is.null(extraSettings)) { connectionString <- paste(connectionString, ";", extraSettings, sep = "") + } } connection <- connectUsingJdbcDriver(driver, - connectionString, - user = user, - password = password, - dbms = dbms) + connectionString, + user = user, + password = password, + dbms = dbms + ) } attr(connection, "dbms") <- dbms # Used for bulk upload: @@ -294,8 +324,9 @@ connect <- function(connectionDetails = NULL, # Build connection string from parts if (oracleDriver == "thin") { inform("- using THIN to connect") - if (missing(port) || is.null(port)) + if (missing(port) || is.null(port)) { port <- "1521" + } host <- "127.0.0.1" sid <- server if (grepl("/", server)) { @@ -304,51 +335,57 @@ connect <- function(connectionDetails = NULL, sid <- parts[2] } connectionString <- paste0("jdbc:oracle:thin:@", host, ":", port, ":", sid) - if (!missing(extraSettings) && !is.null(extraSettings)) + if (!missing(extraSettings) && !is.null(extraSettings)) { connectionString <- paste0(connectionString, extraSettings) + } result <- class(try(connection <- connectUsingJdbcDriver(driver, - connectionString, - user = user, - password = password, - oracle.jdbc.mapDateToTimestamp = "false", - dbms = dbms), silent = FALSE))[1] - + connectionString, + user = user, + password = password, + oracle.jdbc.mapDateToTimestamp = "false", + dbms = dbms + ), silent = FALSE))[1] + # Try using TNSName instead: if (result == "try-error") { inform("- Trying using TNSName") connectionString <- paste0("jdbc:oracle:thin:@", server) connection <- connectUsingJdbcDriver(driver, - connectionString, - user = user, - password = password, - oracle.jdbc.mapDateToTimestamp = "false", - dbms = dbms) + connectionString, + user = user, + password = password, + oracle.jdbc.mapDateToTimestamp = "false", + dbms = dbms + ) } } if (oracleDriver == "oci") { inform("- using OCI to connect") connectionString <- paste0("jdbc:oracle:oci8:@", server) connection <- connectUsingJdbcDriver(driver, - connectionString, - user = user, - password = password, - oracle.jdbc.mapDateToTimestamp = "false", - dbms = dbms) + connectionString, + user = user, + password = password, + oracle.jdbc.mapDateToTimestamp = "false", + dbms = dbms + ) } } else { # User has provided the connection string: if (missing(user) || is.null(user)) { connection <- connectUsingJdbcDriver(driver, - connectionString, - oracle.jdbc.mapDateToTimestamp = "false", - dbms = dbms) + connectionString, + oracle.jdbc.mapDateToTimestamp = "false", + dbms = dbms + ) } else { connection <- connectUsingJdbcDriver(driver, - connectionString, - user = user, - password = password, - oracle.jdbc.mapDateToTimestamp = "false", - dbms = dbms) + connectionString, + user = user, + password = password, + oracle.jdbc.mapDateToTimestamp = "false", + dbms = dbms + ) } } attr(connection, "dbms") <- dbms @@ -359,9 +396,10 @@ connect <- function(connectionDetails = NULL, jarPath <- findPathToJar("^postgresql-.*\\.jar$", pathToDriver) driver <- getJbcDriverSingleton("org.postgresql.Driver", jarPath) if (missing(connectionString) || is.null(connectionString)) { - if (!grepl("/", server)) + if (!grepl("/", server)) { abort("Error: database name not included in server string but is required for PostgreSQL. Please specify server as /") - + } + parts <- unlist(strsplit(server, "/")) host <- parts[1] database <- parts[2] @@ -369,17 +407,19 @@ connect <- function(connectionDetails = NULL, port <- "5432" } connectionString <- paste0("jdbc:postgresql://", host, ":", port, "/", database) - if (!missing(extraSettings) && !is.null(extraSettings)) + if (!missing(extraSettings) && !is.null(extraSettings)) { connectionString <- paste(connectionString, "?", extraSettings, sep = "") + } } if (missing(user) || is.null(user)) { connection <- connectUsingJdbcDriver(driver, connectionString, dbms = dbms) } else { connection <- connectUsingJdbcDriver(driver, - connectionString, - user = user, - password = password, - dbms = dbms) + connectionString, + user = user, + password = password, + dbms = dbms + ) } attr(connection, "dbms") <- dbms # Used for bulk upload: @@ -391,7 +431,7 @@ connect <- function(connectionDetails = NULL, attr(connection, "server") <- function() rlang::eval_tidy(serverExpression) portExpression <- rlang::enquo(port) attr(connection, "port") <- function() rlang::eval_tidy(portExpression) - + return(connection) } if (dbms == "redshift") { @@ -403,8 +443,9 @@ connect <- function(connectionDetails = NULL, driver <- getJbcDriverSingleton("com.amazon.redshift.jdbc4.Driver", jarPath) } if (missing(connectionString) || is.null(connectionString)) { - if (!grepl("/", server)) + if (!grepl("/", server)) { abort("Error: database name not included in server string but is required for Redshift Please specify server as /") + } parts <- unlist(strsplit(server, "/")) host <- parts[1] database <- parts[2] @@ -412,18 +453,20 @@ connect <- function(connectionDetails = NULL, port <- "5439" } connectionString <- paste("jdbc:redshift://", host, ":", port, "/", database, sep = "") - - if (!missing(extraSettings) && !is.null(extraSettings)) + + if (!missing(extraSettings) && !is.null(extraSettings)) { connectionString <- paste(connectionString, "?", extraSettings, sep = "") + } } if (missing(user) || is.null(user)) { connection <- connectUsingJdbcDriver(driver, connectionString, dbms = dbms) } else { connection <- connectUsingJdbcDriver(driver, - connectionString, - user = user, - password = password, - dbms = dbms) + connectionString, + user = user, + password = password, + dbms = dbms + ) } attr(connection, "dbms") <- dbms return(connection) @@ -433,13 +476,15 @@ connect <- function(connectionDetails = NULL, jarPath <- findPathToJar("^nzjdbc\\.jar$", pathToDriver) driver <- getJbcDriverSingleton("org.netezza.Driver", jarPath) if (missing(connectionString) || is.null(connectionString)) { - if (!grepl("/", server)) + if (!grepl("/", server)) { abort("Error: database name not included in server string but is required for Netezza. Please specify server as /") + } parts <- unlist(strsplit(server, "/")) host <- parts[1] database <- parts[2] - if (missing(port) || is.null(port)) + if (missing(port) || is.null(port)) { port <- "5480" + } connectionString <- paste0("jdbc:netezza://", host, ":", port, "/", database) if (!missing(extraSettings) && !is.null(extraSettings)) { connectionString <- paste0(connectionString, "?", extraSettings) @@ -449,10 +494,11 @@ connect <- function(connectionDetails = NULL, connection <- connectUsingJdbcDriver(driver, connectionString, dbms = dbms) } else { connection <- connectUsingJdbcDriver(driver, - connectionString, - user = user, - password = password, - dbms = dbms) + connectionString, + user = user, + password = password, + dbms = dbms + ) } attr(connection, "dbms") <- dbms return(connection) @@ -474,10 +520,11 @@ connect <- function(connectionDetails = NULL, connection <- connectUsingJdbcDriver(driver, connectionString, dbms = dbms) } else { connection <- connectUsingJdbcDriver(driver, - connectionString, - user = user, - password = password, - dbms = dbms) + connectionString, + user = user, + password = password, + dbms = dbms + ) } attr(connection, "dbms") <- dbms return(connection) @@ -486,7 +533,7 @@ connect <- function(connectionDetails = NULL, inform("Connecting using Hive driver") jarPath <- findPathToJar("^hive-jdbc-standalone\\.jar$", pathToDriver) driver <- getJbcDriverSingleton("org.apache.hive.jdbc.HiveDriver", jarPath) - + if (missing(connectionString) || is.null(connectionString)) { connectionString <- paste0("jdbc:hive2://", server, ":", port) if (!missing(extraSettings) && !is.null(extraSettings)) { @@ -494,22 +541,23 @@ connect <- function(connectionDetails = NULL, } } connection <- connectUsingJdbcDriver(driver, - connectionString, - user = user, - password = password, - dbms = dbms) - + connectionString, + user = user, + password = password, + dbms = dbms + ) + attr(connection, "dbms") <- dbms return(connection) } if (dbms == "bigquery") { inform("Connecting using BigQuery driver") - + files <- list.files(path = pathToDriver, full.names = TRUE) for (jar in files) { rJava::.jaddClassPath(jar) } - + jarPath <- findPathToJar("^GoogleBigQueryJDBC42\\.jar$", pathToDriver) driver <- getJbcDriverSingleton("com.simba.googlebigquery.jdbc42.Driver", jarPath) if (missing(connectionString) || is.null(connectionString)) { @@ -519,10 +567,11 @@ connect <- function(connectionDetails = NULL, } } connection <- connectUsingJdbcDriver(driver, - connectionString, - user = user, - password = password, - dbms = dbms) + connectionString, + user = user, + password = password, + dbms = dbms + ) attr(connection, "dbms") <- dbms return(connection) } @@ -544,10 +593,11 @@ connect <- function(connectionDetails = NULL, connection <- connectUsingJdbcDriver(driver, connectionString, dbms = dbms) } else { connection <- connectUsingJdbcDriver(driver, - connectionString, - user = user, - password = password, - dbms = dbms) + connectionString, + user = user, + password = password, + dbms = dbms + ) } attr(connection, "dbms") <- dbms return(connection) @@ -564,11 +614,13 @@ connectUsingJdbcDriver <- function(jdbcDriver, p <- rJava::.jnew("java/util/Properties") if (length(properties) > 0) { for (i in 1:length(properties)) { - rJava::.jcall(p, - "Ljava/lang/Object;", - "setProperty", - names(properties)[i], - as.character(properties[[i]])[1]) + rJava::.jcall( + p, + "Ljava/lang/Object;", + "setProperty", + names(properties)[i], + as.character(properties[[i]])[1] + ) } } jConnection <- rJava::.jcall(jdbcDriver, "Ljava/sql/Connection;", "connect", as.character(url), p) @@ -581,25 +633,26 @@ connectUsingJdbcDriver <- function(jdbcDriver, } } connection <- new("DatabaseConnectorJdbcConnection", - jConnection = jConnection, - identifierQuote = identifierQuote, - stringQuote = stringQuote, - dbms = dbms, - uuid = generateRandomString()) + jConnection = jConnection, + identifierQuote = identifierQuote, + stringQuote = stringQuote, + dbms = dbms, + uuid = generateRandomString() + ) registerWithRStudio(connection) return(connection) } connectUsingRsqLite <- function(server, extended) { - dbiConnection <- DBI::dbConnect(RSQLite::SQLite(), server, extended_types = extended) connection <- new("DatabaseConnectorDbiConnection", - server = server, - dbiConnection = dbiConnection, - identifierQuote = "'", - stringQuote = "'", - dbms = ifelse(extended, "sqlite extended", "sqlite"), - uuid = generateRandomString()) + server = server, + dbiConnection = dbiConnection, + identifierQuote = "'", + stringQuote = "'", + dbms = ifelse(extended, "sqlite extended", "sqlite"), + uuid = generateRandomString() + ) registerWithRStudio(connection) return(connection) } @@ -617,17 +670,19 @@ generateRandomString <- function(length = 20) { #' #' @examples #' \dontrun{ -#' connectionDetails <- createConnectionDetails(dbms = "postgresql", -#' server = "localhost", -#' user = "root", -#' password = "blah") +#' connectionDetails <- createConnectionDetails( +#' dbms = "postgresql", +#' server = "localhost", +#' user = "root", +#' password = "blah" +#' ) #' conn <- connect(connectionDetails) #' count <- querySql(conn, "SELECT COUNT(*) FROM person") #' disconnect(conn) #' } #' @export disconnect <- function(connection) { - UseMethod("disconnect", connection) + UseMethod("disconnect", connection) } #' @export @@ -649,7 +704,7 @@ disconnect.DatabaseConnectorDbiConnection <- function(connection) { } setPathToDll <- function() { - pathToDll <- Sys.getenv("PATH_TO_AUTH_DLL") + pathToDll <- Sys.getenv("PATH_TO_AUTH_DLL") if (pathToDll != "") { inform(paste("Looking for authentication DLL in path specified in PATH_TO_AUTH_DLL:", pathToDll)) rJava::J("org.ohdsi.databaseConnector.Authentication")$addPathToJavaLibrary(pathToDll) diff --git a/R/CtasHack.R b/R/CtasHack.R index 7b8ed4d8..0c4a5799 100644 --- a/R/CtasHack.R +++ b/R/CtasHack.R @@ -21,21 +21,22 @@ mergeTempTables <- function(connection, tableName, sqlFieldNames, sourceNames, d unionString <- paste("\nUNION ALL\nSELECT ", sqlFieldNames, " FROM ", sep = "") valueString <- paste(sourceNames, collapse = unionString) sql <- paste(distribution, - "\n", - "SELECT ", - sqlFieldNames, - " INTO ", - tableName, - " FROM ", - valueString, - ";", - sep = "") + "\n", + "SELECT ", + sqlFieldNames, + " INTO ", + tableName, + " FROM ", + valueString, + ";", + sep = "" + ) sql <- SqlRender::translate(sql, targetDialect = connection@dbms, tempEmulationSchema = tempEmulationSchema) if (tempTable && connection@dbms == "redshift") { sql <- gsub("CREATE TABLE", "CREATE TEMP TABLE", sql) } executeSql(connection, sql, progressBar = FALSE, reportOverallTime = FALSE) - + # Drop source tables: for (sourceName in sourceNames) { sql <- paste("DROP TABLE", sourceName) @@ -57,7 +58,7 @@ toStrings <- function(data, sqlDataTypes) { } else { result <- sapply(data, as.character) if (any(intIdx)) { - result[ ,intIdx] <- sapply(data[ ,intIdx], format, scientific = FALSE) + result[, intIdx] <- sapply(data[, intIdx], format, scientific = FALSE) } result <- apply(result, FUN = function(x) paste("'", gsub("'", "''", x), "'", sep = ""), MARGIN = 2) result[is.na(data)] <- "NULL" @@ -83,7 +84,7 @@ ctasHack <- function(connection, sqlTableName, tempTable, sqlFieldNames, sqlData batchSize <- 1000 } mergeSize <- 300 - + if (any(tolower(names(data)) == "subject_id")) { distribution <- "--HINT DISTRIBUTE_ON_KEY(SUBJECT_ID)\n" } else if (any(tolower(names(data)) == "person_id")) { @@ -91,7 +92,7 @@ ctasHack <- function(connection, sqlTableName, tempTable, sqlFieldNames, sqlData } else { distribution <- "" } - + # Insert data in batches in temp tables using CTAS: if (progressBar) { pb <- txtProgressBar(style = 3) @@ -99,48 +100,53 @@ ctasHack <- function(connection, sqlTableName, tempTable, sqlFieldNames, sqlData tempNames <- c() for (start in seq(1, nrow(data), by = batchSize)) { if (progressBar) { - setTxtProgressBar(pb, start/nrow(data)) + setTxtProgressBar(pb, start / nrow(data)) } if (length(tempNames) == mergeSize) { mergedName <- paste("#", paste(sample(letters, 20, replace = TRUE), collapse = ""), sep = "") - mergeTempTables(connection = connection, - tableName = mergedName, - sqlFieldNames = sqlFieldNames, - sourceNames = tempNames, - distribution = distribution, - tempTable = TRUE, - tempEmulationSchema = tempEmulationSchema) + mergeTempTables( + connection = connection, + tableName = mergedName, + sqlFieldNames = sqlFieldNames, + sourceNames = tempNames, + distribution = distribution, + tempTable = TRUE, + tempEmulationSchema = tempEmulationSchema + ) tempNames <- c(mergedName) } end <- min(start + batchSize - 1, nrow(data)) batch <- toStrings(data[start:end, , drop = FALSE], sqlDataTypes) - + varAliases <- strsplit(sqlFieldNames, ",")[[1]] # First line gets type information: valueString <- formatRow(batch[1, , drop = FALSE], varAliases, castValues = TRUE, sqlDataTypes = sqlDataTypes) if (end > start) { # Other lines only get type information if BigQuery: valueString <- paste(c(valueString, apply(batch[2:nrow(batch), , drop = FALSE], - MARGIN = 1, - FUN = formatRow, - aliases = varAliases, - castValues = attr(connection, "dbms") %in% c("bigquery", "hive"), - sqlDataTypes = sqlDataTypes)), - collapse = "\nUNION ALL\nSELECT ") + MARGIN = 1, + FUN = formatRow, + aliases = varAliases, + castValues = attr(connection, "dbms") %in% c("bigquery", "hive"), + sqlDataTypes = sqlDataTypes + )), + collapse = "\nUNION ALL\nSELECT " + ) } tempName <- paste("#", paste(sample(letters, 20, replace = TRUE), collapse = ""), sep = "") tempNames <- c(tempNames, tempName) sql <- paste(distribution, - "WITH data (", - sqlFieldNames, - ") AS (SELECT ", - valueString, - " ) SELECT ", - sqlFieldNames, - " INTO ", - tempName, - " FROM data;", - sep = "") + "WITH data (", + sqlFieldNames, + ") AS (SELECT ", + valueString, + " ) SELECT ", + sqlFieldNames, + " INTO ", + tempName, + " FROM data;", + sep = "" + ) sql <- SqlRender::translate(sql, targetDialect = connection@dbms, tempEmulationSchema = tempEmulationSchema) executeSql(connection, sql, progressBar = FALSE, reportOverallTime = FALSE) } @@ -148,11 +154,13 @@ ctasHack <- function(connection, sqlTableName, tempTable, sqlFieldNames, sqlData setTxtProgressBar(pb, 1) close(pb) } - mergeTempTables(connection = connection, - tableName = sqlTableName, - sqlFieldNames = sqlFieldNames, - sourceNames = tempNames, - distribution = distribution, - tempTable = tempTable, - tempEmulationSchema = tempEmulationSchema) + mergeTempTables( + connection = connection, + tableName = sqlTableName, + sqlFieldNames = sqlFieldNames, + sourceNames = tempNames, + distribution = distribution, + tempTable = tempTable, + tempEmulationSchema = tempEmulationSchema + ) } diff --git a/R/DBI.R b/R/DBI.R index 04ca76d4..cbd68718 100644 --- a/R/DBI.R +++ b/R/DBI.R @@ -3,13 +3,13 @@ # Copyright 2021 Observational Health Data Sciences and Informatics # # This file is part of DatabaseConnector -# +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -57,9 +57,12 @@ DatabaseConnectorDriver <- function() { #' @export #' @import DBI setClass("DatabaseConnectorConnection", - contains = "DBIConnection", - slots = list(identifierQuote = "character", - stringQuote = "character", dbms = "character", uuid = "character")) + contains = "DBIConnection", + slots = list( + identifierQuote = "character", + stringQuote = "character", dbms = "character", uuid = "character" + ) +) #' DatabaseConnectorJdbcConnection class. #' @@ -67,8 +70,9 @@ setClass("DatabaseConnectorConnection", #' @export #' @import rJava setClass("DatabaseConnectorJdbcConnection", - contains = "DatabaseConnectorConnection", - slots = list(jConnection = "jobjRef")) + contains = "DatabaseConnectorConnection", + slots = list(jConnection = "jobjRef") +) #' DatabaseConnectorDbiConnection class. #' @@ -76,9 +80,12 @@ setClass("DatabaseConnectorJdbcConnection", #' @export #' @import DBI setClass("DatabaseConnectorDbiConnection", - contains = "DatabaseConnectorConnection", - slots = list(dbiConnection = "DBIConnection", - server = "character")) + contains = "DatabaseConnectorConnection", + slots = list( + dbiConnection = "DBIConnection", + server = "character" + ) +) #' Create a connection to a DBMS #' @@ -96,11 +103,11 @@ setClass("DatabaseConnectorDbiConnection", #' @examples #' \dontrun{ #' conn <- dbConnect(DatabaseConnectorDriver(), -#' dbms = "postgresql", -#' server = "localhost/ohdsi", -#' user = "joe", -#' -#' password = "secret") +#' dbms = "postgresql", +#' server = "localhost/ohdsi", +#' user = "joe", +#' password = "secret" +#' ) #' querySql(conn, "SELECT * FROM cdm_synpuf.person;") #' dbDisconnect(conn) #' } @@ -156,8 +163,10 @@ setMethod("dbQuoteIdentifier", signature("DatabaseConnectorConnection", "charact abort("Cannot pass NA to dbQuoteIdentifier()") } if (nzchar(conn@identifierQuote)) { - x <- gsub(conn@identifierQuote, paste0(conn@identifierQuote, - conn@identifierQuote), x, fixed = TRUE) + x <- gsub(conn@identifierQuote, paste0( + conn@identifierQuote, + conn@identifierQuote + ), x, fixed = TRUE) } return(DBI::SQL(paste0(conn@identifierQuote, encodeString(x), conn@identifierQuote))) }) @@ -165,20 +174,22 @@ setMethod("dbQuoteIdentifier", signature("DatabaseConnectorConnection", "charact #' @inherit #' DBI::dbQuoteString title description params details references return seealso #' @export -setMethod("dbQuoteString", - signature("DatabaseConnectorConnection", "character"), - function(conn, x, ...) { - if (length(x) == 0L) { - return(DBI::SQL(character())) - } - if (any(is.na(x))) { - abort("Cannot pass NA to dbQuoteString()") - } - if (nzchar(conn@stringQuote)) { - x <- gsub(conn@stringQuote, paste0(conn@stringQuote, conn@stringQuote), x, fixed = TRUE) - } - return(DBI::SQL(paste0(conn@stringQuote, encodeString(x), conn@stringQuote))) - }) +setMethod( + "dbQuoteString", + signature("DatabaseConnectorConnection", "character"), + function(conn, x, ...) { + if (length(x) == 0L) { + return(DBI::SQL(character())) + } + if (any(is.na(x))) { + abort("Cannot pass NA to dbQuoteString()") + } + if (nzchar(conn@stringQuote)) { + x <- gsub(conn@stringQuote, paste0(conn@stringQuote, conn@stringQuote), x, fixed = TRUE) + } + return(DBI::SQL(paste0(conn@stringQuote, encodeString(x), conn@stringQuote))) + } +) # Results ----------------------------------------------------------------------------------------- @@ -188,40 +199,50 @@ setMethod("dbQuoteString", #' @import rJava #' @export setClass("DatabaseConnectorResult", - contains = "DBIResult", - slots = list(content = "jobjRef", type = "character", - statement = "character")) + contains = "DBIResult", + slots = list( + content = "jobjRef", type = "character", + statement = "character" + ) +) #' @inherit #' DBI::dbSendQuery title description params details references return seealso #' @export -setMethod("dbSendQuery", - signature("DatabaseConnectorJdbcConnection", "character"), - function(conn, statement, - ...) { - if (rJava::is.jnull(conn@jConnection)) - abort("Connection is closed") - batchedQuery <- rJava::.jnew("org.ohdsi.databaseConnector.BatchedQuery", - conn@jConnection, - statement, - - conn@dbms) - result <- new("DatabaseConnectorResult", - content = batchedQuery, - type = "batchedQuery", - statement = statement) - return(result) - }) +setMethod( + "dbSendQuery", + signature("DatabaseConnectorJdbcConnection", "character"), + function(conn, statement, + ...) { + if (rJava::is.jnull(conn@jConnection)) { + abort("Connection is closed") + } + batchedQuery <- rJava::.jnew( + "org.ohdsi.databaseConnector.BatchedQuery", + conn@jConnection, + statement, + conn@dbms + ) + result <- new("DatabaseConnectorResult", + content = batchedQuery, + type = "batchedQuery", + statement = statement + ) + return(result) + } +) #' @inherit #' DBI::dbSendQuery title description params details references return seealso #' @export -setMethod("dbSendQuery", - signature("DatabaseConnectorDbiConnection", "character"), - function(conn, statement, - ...) { - return(DBI::dbSendQuery(conn@dbiConnection, statement, ...)) - }) +setMethod( + "dbSendQuery", + signature("DatabaseConnectorDbiConnection", "character"), + function(conn, statement, + ...) { + return(DBI::dbSendQuery(conn@dbiConnection, statement, ...)) + } +) #' @inherit #' DBI::dbHasCompleted title description params details references return seealso @@ -284,27 +305,32 @@ setMethod("dbClearResult", "DatabaseConnectorResult", function(res, ...) { #' @inherit #' DBI::dbGetQuery title description params details references return seealso #' @export -setMethod("dbGetQuery", - signature("DatabaseConnectorConnection", "character"), - function(conn, statement, - ...) { - lowLevelQuerySql(conn, statement) - }) +setMethod( + "dbGetQuery", + signature("DatabaseConnectorConnection", "character"), + function(conn, statement, + ...) { + lowLevelQuerySql(conn, statement) + } +) #' @inherit #' DBI::dbSendStatement title description params details references return seealso #' @export -setMethod("dbSendStatement", - signature("DatabaseConnectorConnection", "character"), - function(conn, statement, - ...) { - rowsAffected <- lowLevelExecuteSql(connection = conn, sql = statement) - rowsAffected <- rJava::.jnew("java/lang/Integer", as.integer(rowsAffected)) - result <- new("DatabaseConnectorResult", - content = rowsAffected, - type = "rowsAffected", - statement = statement) - }) +setMethod( + "dbSendStatement", + signature("DatabaseConnectorConnection", "character"), + function(conn, statement, + ...) { + rowsAffected <- lowLevelExecuteSql(connection = conn, sql = statement) + rowsAffected <- rJava::.jnew("java/lang/Integer", as.integer(rowsAffected)) + result <- new("DatabaseConnectorResult", + content = rowsAffected, + type = "rowsAffected", + statement = statement + ) + } +) #' @inherit #' DBI::dbGetRowsAffected title description params details references return seealso @@ -319,13 +345,15 @@ setMethod("dbGetRowsAffected", "DatabaseConnectorResult", function(res, ...) { #' @inherit #' DBI::dbExecute title description params details references return seealso #' @export -setMethod("dbExecute", - signature("DatabaseConnectorConnection", "character"), - function(conn, statement, - ...) { - rowsAffected <- lowLevelExecuteSql(connection = conn, sql = statement) - return(rowsAffected) - }) +setMethod( + "dbExecute", + signature("DatabaseConnectorConnection", "character"), + function(conn, statement, + ...) { + rowsAffected <- lowLevelExecuteSql(connection = conn, sql = statement) + return(rowsAffected) + } +) # Misc ---------------------------------------------------------------------- @@ -335,16 +363,20 @@ setMethod("dbExecute", #' @param schema Name of the schema. #' #' @export -setMethod("dbListFields", - signature("DatabaseConnectorConnection", "character"), - function(conn, name, - database = NULL, schema = NULL, ...) { - columns <- listDatabaseConnectorColumns(connection = conn, - catalog = database, - schema = schema, - table = name) - return(columns$name) - }) +setMethod( + "dbListFields", + signature("DatabaseConnectorConnection", "character"), + function(conn, name, + database = NULL, schema = NULL, ...) { + columns <- listDatabaseConnectorColumns( + connection = conn, + catalog = database, + schema = schema, + table = name + ) + return(columns$name) + } +) #' @inherit #' DBI::dbListTables title description params details references return seealso @@ -352,33 +384,37 @@ setMethod("dbListFields", #' @param schema Name of the schema. #' #' @export -setMethod("dbListTables", - "DatabaseConnectorConnection", - function(conn, database = NULL, schema = NULL, - ...) { - if (is.null(database)) { - databaseSchema <- schema - } else { - databaseSchema <- paste(database, schema, sep = ".") - } - return(getTableNames(conn, databaseSchema)) - }) +setMethod( + "dbListTables", + "DatabaseConnectorConnection", + function(conn, database = NULL, schema = NULL, + ...) { + if (is.null(database)) { + databaseSchema <- schema + } else { + databaseSchema <- paste(database, schema, sep = ".") + } + return(getTableNames(conn, databaseSchema)) + } +) #' @inherit #' DBI::dbExistsTable title description params details references return seealso #' @param database Name of the database. #' @param schema Name of the schema. #' @export -setMethod("dbExistsTable", - signature("DatabaseConnectorConnection", "character"), - function(conn, name, - database = NULL, schema = NULL, ...) { - if (length(name) != 1) { - abort("Name should be a single string") - } - tables <- dbListTables(conn, name = name, database = database, schema = schema) - return(tolower(name) %in% tolower(tables)) - }) +setMethod( + "dbExistsTable", + signature("DatabaseConnectorConnection", "character"), + function(conn, name, + database = NULL, schema = NULL, ...) { + if (length(name) != 1) { + abort("Name should be a single string") + } + tables <- dbListTables(conn, name = name, database = database, schema = schema) + return(tolower(name) %in% tolower(tables)) + } +) #' @inherit #' DBI::dbWriteTable title description params details references return seealso @@ -389,21 +425,25 @@ setMethod("dbExistsTable", #' can be created. # #' @export -setMethod("dbWriteTable", - signature("DatabaseConnectorConnection", "character", "data.frame"), - function(conn, - name, value, overwrite = FALSE, append = FALSE, temporary = FALSE, oracleTempSchema = NULL, ...) { - if (overwrite) - append <- FALSE - insertTable(connection = conn, - tableName = name, - data = value, - dropTableIfExists = overwrite, - createTable = !append, - - tempTable = temporary, oracleTempSchema = oracleTempSchema) - invisible(TRUE) - }) +setMethod( + "dbWriteTable", + signature("DatabaseConnectorConnection", "character", "data.frame"), + function(conn, + name, value, overwrite = FALSE, append = FALSE, temporary = FALSE, oracleTempSchema = NULL, ...) { + if (overwrite) { + append <- FALSE + } + insertTable( + connection = conn, + tableName = name, + data = value, + dropTableIfExists = overwrite, + createTable = !append, + tempTable = temporary, oracleTempSchema = oracleTempSchema + ) + invisible(TRUE) + } +) #' @inherit #' DBI::dbAppendTable title description params details references return seealso @@ -412,19 +452,22 @@ setMethod("dbWriteTable", #' can be created. # #' @export -setMethod("dbAppendTable", - signature("DatabaseConnectorConnection", "character", "data.frame"), - function(conn, - name, value, temporary = FALSE, oracleTempSchema = NULL, ..., row.names = NULL) { - insertTable(connection = conn, - tableName = name, - data = value, - dropTableIfExists = FALSE, - createTable = FALSE, - - tempTable = temporary, oracleTempSchema = oracleTempSchema) - invisible(TRUE) - }) +setMethod( + "dbAppendTable", + signature("DatabaseConnectorConnection", "character", "data.frame"), + function(conn, + name, value, temporary = FALSE, oracleTempSchema = NULL, ..., row.names = NULL) { + insertTable( + connection = conn, + tableName = name, + data = value, + dropTableIfExists = FALSE, + createTable = FALSE, + tempTable = temporary, oracleTempSchema = oracleTempSchema + ) + invisible(TRUE) + } +) #' @inherit #' DBI::dbCreateTable title description params details references return seealso @@ -433,14 +476,18 @@ setMethod("dbAppendTable", #' can be created. # #' @export -setMethod("dbCreateTable", - signature("DatabaseConnectorConnection", "character", "data.frame"), - function(conn, - name, fields, oracleTempSchema = NULL, ..., row.names = NULL, temporary = FALSE) { - insertTable(connection = conn, tableName = name, data = fields[FALSE, ], dropTableIfExists = TRUE, - createTable = TRUE, tempTable = temporary, oracleTempSchema = oracleTempSchema) - invisible(TRUE) - }) +setMethod( + "dbCreateTable", + signature("DatabaseConnectorConnection", "character", "data.frame"), + function(conn, + name, fields, oracleTempSchema = NULL, ..., row.names = NULL, temporary = FALSE) { + insertTable( + connection = conn, tableName = name, data = fields[FALSE, ], dropTableIfExists = TRUE, + createTable = TRUE, tempTable = temporary, oracleTempSchema = oracleTempSchema + ) + invisible(TRUE) + } +) #' @inherit #' DBI::dbReadTable title description params details references return seealso @@ -456,9 +503,9 @@ setMethod("dbReadTable", signature("DatabaseConnectorConnection", "character"), ...) { if (!is.null(oracleTempSchema) && oracleTempSchema != "") { warn("The 'oracleTempSchema' argument is deprecated. Use 'tempEmulationSchema' instead.", - .frequency = "regularly", - - .frequency_id = "oracleTempSchema") + .frequency = "regularly", + .frequency_id = "oracleTempSchema" + ) tempEmulationSchema <- oracleTempSchema } if (!is.null(schema)) { @@ -469,9 +516,11 @@ setMethod("dbReadTable", signature("DatabaseConnectorConnection", "character"), } sql <- "SELECT * FROM @table;" sql <- SqlRender::render(sql = sql, table = name) - sql <- SqlRender::translate(sql = sql, - targetDialect = conn@dbms, - tempEmulationSchema = tempEmulationSchema) + sql <- SqlRender::translate( + sql = sql, + targetDialect = conn@dbms, + tempEmulationSchema = tempEmulationSchema + ) return(lowLevelQuerySql(conn, sql)) }) @@ -482,23 +531,27 @@ setMethod("dbReadTable", signature("DatabaseConnectorConnection", "character"), #' @param oracleTempSchema Specifically for Oracle, a schema with write privileges where temp tables #' can be created. #' @export -setMethod("dbRemoveTable", - signature("DatabaseConnectorConnection", "character"), - function(conn, name, - database = NULL, schema = NULL, oracleTempSchema = NULL, ...) { - if (!is.null(schema)) { - name <- paste(schema, name, sep = ".") - } - if (!is.null(database)) { - name <- paste(database, name, sep = ".") - } - sql <- "TRUNCATE TABLE @table; DROP TABLE @table;" - sql <- SqlRender::render(sql = sql, table = name) - sql <- SqlRender::translate(sql = sql, - targetDialect = conn@dbms, - oracleTempSchema = oracleTempSchema) - for (statement in SqlRender::splitSql(sql)) { - lowLevelExecuteSql(conn, statement) - } - return(TRUE) - }) +setMethod( + "dbRemoveTable", + signature("DatabaseConnectorConnection", "character"), + function(conn, name, + database = NULL, schema = NULL, oracleTempSchema = NULL, ...) { + if (!is.null(schema)) { + name <- paste(schema, name, sep = ".") + } + if (!is.null(database)) { + name <- paste(database, name, sep = ".") + } + sql <- "TRUNCATE TABLE @table; DROP TABLE @table;" + sql <- SqlRender::render(sql = sql, table = name) + sql <- SqlRender::translate( + sql = sql, + targetDialect = conn@dbms, + oracleTempSchema = oracleTempSchema + ) + for (statement in SqlRender::splitSql(sql)) { + lowLevelExecuteSql(conn, statement) + } + return(TRUE) + } +) diff --git a/R/DatabaseConnector.R b/R/DatabaseConnector.R index a0b63175..31a9eb74 100644 --- a/R/DatabaseConnector.R +++ b/R/DatabaseConnector.R @@ -3,13 +3,13 @@ # Copyright 2021 Observational Health Data Sciences and Informatics # # This file is part of DatabaseConnector -# +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -32,15 +32,15 @@ NULL #' #' @title #' How to download and use JDBC drivers for the various data platforms. -#' -#' @description +#' +#' @description #' Below are instructions for downloading JDBC drivers for the various data platforms. Once downloaded #' use the \code{pathToDriver} argument in the \code{\link{connect}} or \code{\link{createConnectionDetails}} -#' functions to point to the driver. Alternatively, you can set the 'DATABASECONNECTOR_JAR_FOLDER' environmental +#' functions to point to the driver. Alternatively, you can set the 'DATABASECONNECTOR_JAR_FOLDER' environmental #' variable, for example in your .Renviron file (recommended). -#' +#' #' @section -#' SQL Server, Oracle, PostgreSQL, PDW, RedShift: Use the \code{\link{downloadJdbcDrivers}} function to download these drivers +#' SQL Server, Oracle, PostgreSQL, PDW, Spark, RedShift: Use the \code{\link{downloadJdbcDrivers}} function to download these drivers #' from the OHDSI GitHub pages. #' #' @section @@ -56,17 +56,18 @@ NULL #' Impala: Go to #' \href{https://www.cloudera.com/downloads/connectors/impala/jdbc/2-5-5.html}{Cloudera's site}, pick #' your OS version, and click "GET IT NOW!'. Register, and you should be able to download the driver. -#' +#' #' @section #' SQLite: For SQLite we actually don't use a JDBC driver. Instead, we use the RSQLite package, which can be installed #' using \code{install.packages("RSQLite")}. -#' +#' NULL # Borrowed from devtools: https://github.com/hadley/devtools/blob/ba7a5a4abd8258c52cb156e7b26bb4bf47a79f0b/R/utils.r#L44 is_installed <- function(pkg, version = 0) { - installed_version <- tryCatch(utils::packageVersion(pkg), - error = function(e) NA) + installed_version <- tryCatch(utils::packageVersion(pkg), + error = function(e) NA + ) !is.na(installed_version) && installed_version >= version } diff --git a/R/Drivers.R b/R/Drivers.R index 3c801aa9..69c579ba 100644 --- a/R/Drivers.R +++ b/R/Drivers.R @@ -3,13 +3,13 @@ # Copyright 2021 Observational Health Data Sciences and Informatics # # This file is part of DatabaseConnector -# +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -19,7 +19,7 @@ jdbcDrivers <- new.env() #' Download DatabaseConnector JDBC Jar files -#' +#' #' Download the DatabaseConnector JDBC drivers from https://ohdsi.github.io/DatabaseConnectorJars/ #' #' @param pathToDriver The full path to the folder where the JDBC driver .jar files should be downloaded to. @@ -30,19 +30,21 @@ jdbcDrivers <- new.env() #' \item{"redshift" for Amazon Redshift} #' \item{"sql server" or "pdw" for Microsoft SQL Server} #' \item{"oracle" for Oracle} +#' \item{"spark" for Spark} #' } #' @param method The method used for downloading files. See \code{?download.file} for details and options. -#' @param ... Further arguments passed on to \code{download.file} -#' -#' @details +#' @param ... Further arguments passed on to \code{download.file} +#' +#' @details #' The following versions of the JDBC drivers are currently used: #' \itemize{ #' \item{PostgreSQL}{V42.2.18} #' \item{RedShift}{V1.2.27.1051} #' \item{SQL Server}{V8.4.1.zip} #' \item{Oracle}{V19.8} +#' \item{Spark}{V2.6.17} #' } -#' +#' #' @return Invisibly returns the destination if the download was successful. #' @export #' @@ -50,22 +52,24 @@ jdbcDrivers <- new.env() #' \dontrun{ #' downloadJdbcDrivers("redshift") #' } -downloadJdbcDrivers <- function(dbms, pathToDriver = Sys.getenv("DATABASECONNECTOR_JAR_FOLDER"), method = "auto", ...){ - - if (is.null(pathToDriver) || is.na(pathToDriver) || pathToDriver == "") +downloadJdbcDrivers <- function(dbms, pathToDriver = Sys.getenv("DATABASECONNECTOR_JAR_FOLDER"), method = "auto", ...) { + if (is.null(pathToDriver) || is.na(pathToDriver) || pathToDriver == "") { abort("The pathToDriver argument must be specified. Consider setting the DATABASECONNECTOR_JAR_FOLDER environment variable, for example in the .Renviron file.") - + } + if (pathToDriver != Sys.getenv("DATABASECONNECTOR_JAR_FOLDER")) { if (Sys.getenv("DATABASECONNECTOR_JAR_FOLDER") != pathToDriver) { - inform(paste0("Consider adding `DATABASECONNECTOR_JAR_FOLDER='", - pathToDriver, - "'` to ", - path.expand("~/.Renviron"), " and restarting R.")) + inform(paste0( + "Consider adding `DATABASECONNECTOR_JAR_FOLDER='", + pathToDriver, + "'` to ", + path.expand("~/.Renviron"), " and restarting R." + )) } } - + pathToDriver <- path.expand(pathToDriver) - + if (!dir.exists(pathToDriver)) { if (file.exists(pathToDriver)) { abort(paste0("The folder location pathToDriver = '", pathToDriver, "' points to a file, but should point to a folder.")) @@ -73,51 +77,56 @@ downloadJdbcDrivers <- function(dbms, pathToDriver = Sys.getenv("DATABASECONNECT warn(paste0("The folder location '", pathToDriver, "' does not exist. Attempting to create.")) dir.create(pathToDriver, recursive = TRUE) } - + stopifnot(is.character(dbms), length(dbms) == 1, dbms %in% c("all", "postgresql", "redshift", "sql server", "oracle", "pdw", "spark")) - + if (dbms == "pdw") { dbms <- "sql server" } - + baseUrl <- "https://ohdsi.github.io/DatabaseConnectorJars/" - - jdbcDriverNames <- c("postgresql" = "postgresqlV42.2.18.zip", - "redshift" = "redShiftV1.2.27.1051.zip", - "sql server" = "sqlServerV9.2.0.zip", - "oracle" = "oracleV19.8.zip", - "spark" = "SimbaSparkV2.6.17.zip") - + + jdbcDriverNames <- c( + "postgresql" = "postgresqlV42.2.18.zip", + "redshift" = "redShiftV1.2.27.1051.zip", + "sql server" = "sqlServerV9.2.0.zip", + "oracle" = "oracleV19.8.zip", + "spark" = "SimbaSparkV2.6.17.zip" + ) + if (dbms == "all") { dbms <- names(jdbcDriverNames) } - - for(db in dbms) { + + for (db in dbms) { driverName <- jdbcDriverNames[[db]] - result <- download.file(url = paste0(baseUrl, driverName), - destfile = paste(pathToDriver, driverName, sep = "/"), - method = method) - + result <- download.file( + url = paste0(baseUrl, driverName), + destfile = paste(pathToDriver, driverName, sep = "/"), + method = method + ) + extractedFilename <- unzip(file.path(pathToDriver, driverName), exdir = pathToDriver) unzipSuccess <- is.character(extractedFilename) - + if (unzipSuccess) { file.remove(file.path(pathToDriver, driverName)) } - if (unzipSuccess && result == 0) { + if (unzipSuccess && result == 0) { inform(paste0("DatabaseConnector ", db, " JDBC driver downloaded to '", pathToDriver, "'.")) } else { abort(paste0("Downloading and unzipping of ", db, " JDBC driver to '", pathToDriver, "' has failed.")) } } - + invisible(pathToDriver) } loadJdbcDriver <- function(driverClass, classPath) { rJava::.jaddClassPath(classPath) - if (nchar(driverClass) && rJava::is.jnull(rJava::.jfindClass(as.character(driverClass)[1]))) + if (nchar(driverClass) && rJava::is.jnull(rJava::.jfindClass(as.character(driverClass)[1]))) { abort("Cannot find JDBC driver class ", driverClass) + } jdbcDriver <- rJava::.jnew(driverClass, check = FALSE) rJava::.jcheck(TRUE) return(jdbcDriver) @@ -146,16 +155,20 @@ findPathToJar <- function(name, pathToDriver = Sys.getenv("DATABASECONNECTOR_JAR if (file.exists(pathToDriver)) { abort(paste0("The folder location pathToDriver = '", pathToDriver, "' points to a file, but should point to a folder.")) } else { - abort(paste0("The folder location pathToDriver = '", pathToDriver, "' does not exist.", - "Please set the folder to the location containing the JDBC driver.", - "You can download most drivers using the `downloadJdbcDrivers()` function.")) + abort(paste0( + "The folder location pathToDriver = '", pathToDriver, "' does not exist.", + "Please set the folder to the location containing the JDBC driver.", + "You can download most drivers using the `downloadJdbcDrivers()` function." + )) } - } + } files <- list.files(path = pathToDriver, pattern = name, full.names = TRUE) if (length(files) == 0) { - abort(paste("No drives matching pattern", name, "found in folder", pathToDriver, ".", - "\nPlease download the JDBC drivers for your database to the folder.", - "You can download most drivers using the `downloadJdbcDrivers()` function.")) + abort(paste( + "No drives matching pattern", name, "found in folder", pathToDriver, ".", + "\nPlease download the JDBC drivers for your database to the folder.", + "You can download most drivers using the `downloadJdbcDrivers()` function." + )) } else { return(files) } diff --git a/R/InsertTable.R b/R/InsertTable.R index 084f2af7..97567de9 100644 --- a/R/InsertTable.R +++ b/R/InsertTable.R @@ -48,26 +48,31 @@ getSqlDataTypes <- function(column) { if (identifier) { vid <- grep("^[A-Za-z]+([A-Za-z0-9_]*)$", s) if (length(s[-vid])) { - if (is.na(quote)) - abort(paste0("The JDBC connection doesn't support quoted identifiers, but table/column name contains characters that must be quoted (", - paste(s[-vid], collapse = ","), - ")")) + if (is.na(quote)) { + abort(paste0( + "The JDBC connection doesn't support quoted identifiers, but table/column name contains characters that must be quoted (", + paste(s[-vid], collapse = ","), + ")" + )) + } s[-vid] <- .sql.qescape(s[-vid], FALSE, quote) } return(s) } - if (is.na(quote)) + if (is.na(quote)) { quote <- "" + } s <- gsub("\\\\", "\\\\\\\\", s) - if (nchar(quote)) + if (nchar(quote)) { s <- gsub(paste("\\", quote, sep = ""), paste("\\\\\\", quote, sep = ""), s, perl = TRUE) + } paste(quote, s, quote, sep = "") } validateInt64Insert <- function() { # Validate that communication of 64-bit integers with Java is correct: values <- bit64::as.integer64(c(1, -1, 8589934592, -8589934592)) - class(values) <- "double" + class(values) <- "double" success <- rJava::J("org.ohdsi.databaseConnector.BatchedInsert")$validateInteger64(values) if (!success) { abort("Error converting 64-bit integers between R and Java") @@ -76,11 +81,14 @@ validateInt64Insert <- function() { trySettingAutoCommit <- function(connection, value) { - tryCatch({ - rJava::.jcall(connection@jConnection, "V", "setAutoCommit", value) - }, error = function(cond) { - # do nothing - }) + tryCatch( + { + rJava::.jcall(connection@jConnection, "V", "setAutoCommit", value) + }, + error = function(cond) { + # do nothing + } + ) } #' Insert a table on the server @@ -91,7 +99,7 @@ trySettingAutoCommit <- function(connection, value) { #' #' @param connection The connection to the database server. #' @param databaseSchema (Optional) The name of the database schema where the table should -#' be located. +#' be located. #' @param tableName The name of the table where the data should be inserted. #' @param data The data frame containing the data to be inserted. #' @param dropTableIfExists Drop the table if the table already exists before writing? @@ -101,8 +109,8 @@ trySettingAutoCommit <- function(connection, value) { #' @param tempEmulationSchema Some database platforms like Oracle and Impala do not truly support temp tables. To #' emulate temp tables, provide a schema with write privileges where temp tables #' can be created. -#' @param bulkLoad If using Redshift, PDW, Hive or Postgres, use more performant bulk loading -#' techniques. Does not work for temp tables (except for HIVE). See Details for +#' @param bulkLoad If using Redshift, PDW, Hive or Postgres, use more performant bulk loading +#' techniques. Does not work for temp tables (except for HIVE). See Details for #' requirements for the various platforms. #' @param useMppBulkLoad DEPRECATED. Use \code{bulkLoad} instead. #' @param progressBar Show a progress bar when uploading? @@ -112,55 +120,61 @@ trySettingAutoCommit <- function(connection, value) { #' @details #' This function sends the data in a data frame to a table on the server. Either a new table is #' created, or the data is appended to an existing table. NA values are inserted as null values in the -#' database. -#' -#' Bulk uploading: -#' +#' database. +#' +#' Bulk uploading: +#' #' Redshift: The MPP bulk loading relies upon the CloudyR S3 library -#' to test a connection to an S3 bucket using AWS S3 credentials. Credentials are configured +#' to test a connection to an S3 bucket using AWS S3 credentials. Credentials are configured #' directly into the System Environment using the following keys: Sys.setenv("AWS_ACCESS_KEY_ID" = #' "some_access_key_id", "AWS_SECRET_ACCESS_KEY" = "some_secret_access_key", "AWS_DEFAULT_REGION" = #' "some_aws_region", "AWS_BUCKET_NAME" = "some_bucket_name", "AWS_OBJECT_KEY" = "some_object_key", -#' "AWS_SSE_TYPE" = "server_side_encryption_type"). -#' +#' "AWS_SSE_TYPE" = "server_side_encryption_type"). +#' #' PDW: The MPP bulk loading relies upon the client #' having a Windows OS and the DWLoader exe installed, and the following permissions granted: --Grant #' BULK Load permissions - needed at a server level USE master; GRANT ADMINISTER BULK OPERATIONS TO #' user; --Grant Staging database permissions - we will use the user db. USE scratch; EXEC -#' sp_addrolemember 'db_ddladmin', user; Set the R environment variable DWLOADER_PATH to the location +#' sp_addrolemember 'db_ddladmin', user; Set the R environment variable DWLOADER_PATH to the location #' of the binary. -#' +#' #' PostgreSQL: -#' Uses the 'pg' executable to upload. Set the POSTGRES_PATH environment variable to the Postgres +#' Uses the 'pg' executable to upload. Set the POSTGRES_PATH environment variable to the Postgres #' binary path, e.g. 'C:/Program Files/PostgreSQL/11/bin'. #' #' @examples #' \dontrun{ -#' connectionDetails <- createConnectionDetails(dbms = "mysql", -#' server = "localhost", -#' user = "root", -#' password = "blah") +#' connectionDetails <- createConnectionDetails( +#' dbms = "mysql", +#' server = "localhost", +#' user = "root", +#' password = "blah" +#' ) #' conn <- connect(connectionDetails) #' data <- data.frame(x = c(1, 2, 3), y = c("a", "b", "c")) #' insertTable(conn, "my_schema", "my_table", data) #' disconnect(conn) #' #' ## bulk data insert with Redshift or PDW -#' connectionDetails <- createConnectionDetails(dbms = "redshift", -#' server = "localhost", -#' user = "root", -#' password = "blah", -#' schema = "cdm_v5") +#' connectionDetails <- createConnectionDetails( +#' dbms = "redshift", +#' server = "localhost", +#' user = "root", +#' password = "blah", +#' schema = "cdm_v5" +#' ) #' conn <- connect(connectionDetails) #' data <- data.frame(x = c(1, 2, 3), y = c("a", "b", "c")) -#' insertTable(connection = connection, -#' databaseSchema = "scratch", -#' tableName = "somedata", -#' data = data, -#' dropTableIfExists = TRUE, -#' createTable = TRUE, -#' tempTable = FALSE, -#' bulkLoad = TRUE) # or, Sys.setenv("DATABASE_CONNECTOR_BULK_UPLOAD" = TRUE) +#' insertTable( +#' connection = connection, +#' databaseSchema = "scratch", +#' tableName = "somedata", +#' data = data, +#' dropTableIfExists = TRUE, +#' createTable = TRUE, +#' tempTable = FALSE, +#' bulkLoad = TRUE +#' ) # or, Sys.setenv("DATABASE_CONNECTOR_BULK_UPLOAD" = TRUE) #' } #' @export insertTable <- function(connection, @@ -195,15 +209,17 @@ insertTable.default <- function(connection, camelCaseToSnakeCase = FALSE) { if (!is.null(useMppBulkLoad) && useMppBulkLoad != "") { warn("The 'useMppBulkLoad' argument is deprecated. Use 'bulkLoad' instead.", - .frequency = "regularly", - .frequency_id = "useMppBulkLoad") + .frequency = "regularly", + .frequency_id = "useMppBulkLoad" + ) bulkLoad <- useMppBulkLoad } bulkLoad <- (!is.null(bulkLoad) && bulkLoad == "TRUE") if (!is.null(oracleTempSchema) && oracleTempSchema != "") { warn("The 'oracleTempSchema' argument is deprecated. Use 'tempEmulationSchema' instead.", - .frequency = "regularly", - .frequency_id = "oracleTempSchema") + .frequency = "regularly", + .frequency_id = "oracleTempSchema" + ) tempEmulationSchema <- oracleTempSchema } if (camelCaseToSnakeCase) { @@ -213,58 +229,70 @@ insertTable.default <- function(connection, tempTable <- TRUE warn("Temp table name detected, setting tempTable parameter to TRUE") } - if (dropTableIfExists) + if (dropTableIfExists) { createTable <- TRUE - if (tempTable & substr(tableName, 1, 1) != "#" & attr(connection, "dbms") != "redshift") + } + if (tempTable & substr(tableName, 1, 1) != "#" & attr(connection, "dbms") != "redshift") { tableName <- paste("#", tableName, sep = "") - if (!is.null(databaseSchema)) + } + if (!is.null(databaseSchema)) { tableName <- paste(databaseSchema, tableName, sep = ".") - if (is.vector(data) && !is.list(data)) + } + if (is.vector(data) && !is.list(data)) { data <- data.frame(x = data) - if (length(data) < 1) + } + if (length(data) < 1) { abort("data must have at least one column") - if (is.null(names(data))) + } + if (is.null(names(data))) { names(data) <- paste("V", 1:length(data), sep = "") + } if (length(data[[1]]) > 0) { - if (!is.data.frame(data)) + if (!is.data.frame(data)) { data <- as.data.frame(data, row.names = 1:length(data[[1]])) + } } else { - if (!is.data.frame(data)) + if (!is.data.frame(data)) { data <- as.data.frame(data) + } } isSqlReservedWord(c(tableName, colnames(data)), warn = TRUE) useBulkLoad <- (bulkLoad && connection@dbms %in% c("hive", "redshift") && createTable) || (bulkLoad && connection@dbms %in% c("pdw", "postgresql") && !tempTable) useCtasHack <- connection@dbms %in% c("pdw", "redshift", "bigquery", "hive") && createTable && nrow(data) > 0 && !useBulkLoad - + sqlDataTypes <- sapply(data, getSqlDataTypes) sqlTableDefinition <- paste(.sql.qescape(names(data), TRUE, connection@identifierQuote), sqlDataTypes, collapse = ", ") sqlTableName <- .sql.qescape(tableName, TRUE, connection@identifierQuote) sqlFieldNames <- paste(.sql.qescape(names(data), TRUE, connection@identifierQuote), collapse = ",") - + if (dropTableIfExists) { if (tempTable) { sql <- "IF OBJECT_ID('tempdb..@tableName', 'U') IS NOT NULL DROP TABLE @tableName;" } else { sql <- "IF OBJECT_ID('@tableName', 'U') IS NOT NULL DROP TABLE @tableName;" } - renderTranslateExecuteSql(connection = connection, - sql = sql, - tableName = tableName, - tempEmulationSchema = tempEmulationSchema, - progressBar = FALSE, - reportOverallTime = FALSE) + renderTranslateExecuteSql( + connection = connection, + sql = sql, + tableName = tableName, + tempEmulationSchema = tempEmulationSchema, + progressBar = FALSE, + reportOverallTime = FALSE + ) } - + if (createTable && !useCtasHack && !(bulkLoad && connection@dbms == "hive")) { sql <- paste("CREATE TABLE ", sqlTableName, " (", sqlTableDefinition, ");", sep = "") - renderTranslateExecuteSql(connection = connection, - sql = sql, - tempEmulationSchema = tempEmulationSchema, - progressBar = FALSE, - reportOverallTime = FALSE) + renderTranslateExecuteSql( + connection = connection, + sql = sql, + tempEmulationSchema = tempEmulationSchema, + progressBar = FALSE, + reportOverallTime = FALSE + ) } - + if (useBulkLoad) { # Inserting using bulk upload for MPP ------------------------------------------------ if (!checkBulkLoadCredentials(connection)) { @@ -278,7 +306,7 @@ insertTable.default <- function(connection, } else if (connection@dbms == "hive") { bulkLoadHive(connection, sqlTableName, sqlFieldNames, data) } else if (connection@dbms == "postgresql") { - bulkLoadPostgres(connection, sqlTableName, sqlFieldNames, sqlDataTypes, data) + bulkLoadPostgres(connection, sqlTableName, sqlFieldNames, sqlDataTypes, data) } } else if (useCtasHack) { # Inserting using CTAS hack ---------------------------------------------------------------- @@ -288,19 +316,22 @@ insertTable.default <- function(connection, if (any(sqlDataTypes == "BIGINT")) { validateInt64Insert() } - - insertSql <- paste0("INSERT INTO ", - sqlTableName, - " (", - sqlFieldNames, - ") VALUES(", - paste(rep("?", length(sqlDataTypes)), collapse = ","), - ")") + + insertSql <- paste0( + "INSERT INTO ", + sqlTableName, + " (", + sqlFieldNames, + ") VALUES(", + paste(rep("?", length(sqlDataTypes)), collapse = ","), + ")" + ) insertSql <- SqlRender::translate(insertSql, - targetDialect = connection@dbms, - tempEmulationSchema = tempEmulationSchema) + targetDialect = connection@dbms, + tempEmulationSchema = tempEmulationSchema + ) batchSize <- 10000 - + autoCommit <- rJava::.jcall(connection@jConnection, "Z", "getAutoCommit") if (autoCommit) { trySettingAutoCommit(connection, FALSE) @@ -310,13 +341,15 @@ insertTable.default <- function(connection, if (progressBar) { pb <- txtProgressBar(style = 3) } - batchedInsert <- rJava::.jnew("org.ohdsi.databaseConnector.BatchedInsert", - connection@jConnection, - insertSql, - ncol(data)) + batchedInsert <- rJava::.jnew( + "org.ohdsi.databaseConnector.BatchedInsert", + connection@jConnection, + insertSql, + ncol(data) + ) for (start in seq(1, nrow(data), by = batchSize)) { if (progressBar) { - setTxtProgressBar(pb, start/nrow(data)) + setTxtProgressBar(pb, start / nrow(data)) } end <- min(start + batchSize - 1, nrow(data)) setColumn <- function(i, start, end) { @@ -343,10 +376,9 @@ insertTable.default <- function(connection, lapply(1:ncol(data), setColumn, start = start, end = end) if (attr(connection, "dbms") == "bigquery") { rJava::.jcall(batchedInsert, "V", "executeBigQueryBatch") - } else { + } else { rJava::.jcall(batchedInsert, "V", "executeBatch") } - } if (progressBar) { setTxtProgressBar(pb, 1) @@ -378,7 +410,7 @@ insertTable.DatabaseConnectorDbiConnection <- function(connection, warn("Temp table name detected, setting tempTable parameter to TRUE") } isSqlReservedWord(c(tableName, colnames(data)), warn = TRUE) - + tableName <- gsub("^#", "", tableName) if (!is.null(databaseSchema)) { if (connection@dbms %in% c("sqlite", "sqlite extended")) { @@ -389,7 +421,7 @@ insertTable.DatabaseConnectorDbiConnection <- function(connection, tableName <- paste(databaseSchema, tableName, sep = ".") } } - + if (connection@dbms == "sqlite") { # Convert dates and datetime to UNIX timestamp: for (i in 1:ncol(data)) { @@ -401,11 +433,13 @@ insertTable.DatabaseConnectorDbiConnection <- function(connection, } } } - DBI::dbWriteTable(conn = connection@dbiConnection, - name = tableName, - value = data, - overwrite = dropTableIfExists, - append = !createTable, - temporary = tempTable) + DBI::dbWriteTable( + conn = connection@dbiConnection, + name = tableName, + value = data, + overwrite = dropTableIfExists, + append = !createTable, + temporary = tempTable + ) invisible(NULL) } diff --git a/R/ListTables.R b/R/ListTables.R index 4b7f69f8..b2dd2431 100644 --- a/R/ListTables.R +++ b/R/ListTables.R @@ -41,14 +41,14 @@ getTableNames <- function(connection, databaseSchema) { tables <- dbListTables(connection@dbiConnection, schema = databaseSchema) return(toupper(tables)) } - + if (is.null(databaseSchema)) { database <- rJava::.jnull("java/lang/String") schema <- rJava::.jnull("java/lang/String") } else { if (connection@dbms == "oracle") { databaseSchema <- toupper(databaseSchema) - } + } if (connection@dbms == "redshift") { databaseSchema <- tolower(databaseSchema) } @@ -69,17 +69,17 @@ getTableNames <- function(connection, databaseSchema) { metaData <- rJava::.jcall(connection@jConnection, "Ljava/sql/DatabaseMetaData;", "getMetaData") types <- rJava::.jarray(c("TABLE", "VIEW")) resultSet <- rJava::.jcall(metaData, - "Ljava/sql/ResultSet;", - "getTables", - database, - schema, - rJava::.jnull("java/lang/String"), - types, - check = FALSE) + "Ljava/sql/ResultSet;", + "getTables", + database, + schema, + rJava::.jnull("java/lang/String"), + types, + check = FALSE + ) tables <- character() while (rJava::.jcall(resultSet, "Z", "next")) { tables <- c(tables, rJava::.jcall(resultSet, "S", "getString", "TABLE_NAME")) } return(toupper(tables)) - } diff --git a/R/RStudio.R b/R/RStudio.R index a8149b75..6df69fc1 100644 --- a/R/RStudio.R +++ b/R/RStudio.R @@ -3,13 +3,13 @@ # Copyright 2021 Observational Health Data Sciences and Informatics # # This file is part of DatabaseConnector -# +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -30,27 +30,33 @@ registerWithRStudio <- function(connection) { i <- i + 1 displayName <- paste0(server, " (", i, ")") } - registeredDisplayNames <- rbind(registeredDisplayNames, - data.frame(uuid = connection@uuid, - displayName = displayName, - stringsAsFactors = FALSE)) + registeredDisplayNames <- rbind( + registeredDisplayNames, + data.frame( + uuid = connection@uuid, + displayName = displayName, + stringsAsFactors = FALSE + ) + ) options(registeredDisplayNames = registeredDisplayNames) - observer$connectionOpened(type = compileTypeLabel(connection), - displayName = displayName, - host = displayName, - connectCode = compileReconnectCode(connection), - icon = "", - disconnect = function() { - disconnect(connection) - }, listObjectTypes = function() { - listDatabaseConnectorObjectTypes(connection) - }, listObjects = function(...) { - listDatabaseConnectorObjects(connection, ...) - }, listColumns = function(...) { - listDatabaseConnectorColumns(connection, ...) - }, previewObject = function(rowLimit, ...) { - previewObject(connection, rowLimit, ...) - }, actions = connectionActions(connection), connectionObject = connection) + observer$connectionOpened( + type = compileTypeLabel(connection), + displayName = displayName, + host = displayName, + connectCode = compileReconnectCode(connection), + icon = "", + disconnect = function() { + disconnect(connection) + }, listObjectTypes = function() { + listDatabaseConnectorObjectTypes(connection) + }, listObjects = function(...) { + listDatabaseConnectorObjects(connection, ...) + }, listColumns = function(...) { + listDatabaseConnectorColumns(connection, ...) + }, previewObject = function(rowLimit, ...) { + previewObject(connection, rowLimit, ...) + }, actions = connectionActions(connection), connectionObject = connection + ) } } @@ -78,7 +84,7 @@ listDatabaseConnectorColumns <- function(connection, schema = NULL, table = NULL, ...) { - UseMethod("listDatabaseConnectorColumns", connection) + UseMethod("listDatabaseConnectorColumns", connection) } listDatabaseConnectorColumns.default <- function(connection, @@ -103,19 +109,23 @@ listDatabaseConnectorColumns.default <- function(connection, schema <- tolower(schema) } } - if (is.null(catalog)) + if (is.null(catalog)) { catalog <- rJava::.jnull("java/lang/String") - if (is.null(schema)) + } + if (is.null(schema)) { schema <- rJava::.jnull("java/lang/String") - + } + metaData <- rJava::.jcall(connection@jConnection, "Ljava/sql/DatabaseMetaData;", "getMetaData") - resultSet <- rJava::.jcall(metaData, - "Ljava/sql/ResultSet;", - "getColumns", - catalog, - schema, - table, - rJava::.jnull("java/lang/String")) + resultSet <- rJava::.jcall( + metaData, + "Ljava/sql/ResultSet;", + "getColumns", + catalog, + schema, + table, + rJava::.jnull("java/lang/String") + ) on.exit(rJava::.jcall(resultSet, "V", "close")) fields <- character() types <- character() @@ -132,7 +142,7 @@ listDatabaseConnectorColumns.DatabaseConnectorDbiConnection <- function(connecti table = NULL, ...) { res <- DBI::dbSendQuery(connection@dbiConnection, sprintf("SELECT * FROM %s LIMIT 0;", table)) - info <- dbColumnInfo(res) + info <- dbColumnInfo(res) dbClearResult(res) if (connection@dbms == "sqlite") { info$type[grepl("DATE$", info$name)] <- "date" @@ -144,15 +154,19 @@ listDatabaseConnectorColumns.DatabaseConnectorDbiConnection <- function(connecti listDatabaseConnectorObjects <- function(connection, catalog = NULL, schema = NULL, ...) { if (is.null(catalog) && hasCatalogs(connection)) { catalogs <- getCatalogs(connection) - return(data.frame(name = catalogs, - type = rep("catalog", times = length(catalogs)), - stringsAsFactors = FALSE)) + return(data.frame( + name = catalogs, + type = rep("catalog", times = length(catalogs)), + stringsAsFactors = FALSE + )) } if (is.null(schema)) { schemas <- getSchemaNames(connection, catalog) - return(data.frame(name = schemas, - type = rep("schema", times = length(schemas)), - stringsAsFactors = FALSE)) + return(data.frame( + name = schemas, + type = rep("schema", times = length(schemas)), + stringsAsFactors = FALSE + )) } if (!hasCatalogs(connection) || connection@dbms %in% c("postgresql", "redshift", "sqlite", "sqlite extended")) { databaseSchema <- schema @@ -160,14 +174,18 @@ listDatabaseConnectorObjects <- function(connection, catalog = NULL, schema = NU databaseSchema <- paste(catalog, schema, sep = ".") } tables <- getTableNames(connection, databaseSchema) - return(data.frame(name = tables, - type = rep("table", times = length(tables)), - stringsAsFactors = FALSE)) + return(data.frame( + name = tables, + type = rep("table", times = length(tables)), + stringsAsFactors = FALSE + )) } listDatabaseConnectorObjectTypes <- function(connection) { - types <- list(schema = list(contains = c(list(table = list(contains = "data")), - list(view = list(contains = "data"))))) + types <- list(schema = list(contains = c( + list(table = list(contains = "data")), + list(view = list(contains = "data")) + ))) if (hasCatalogs(connection)) { types <- list(catalog = list(contains = types)) } @@ -200,9 +218,11 @@ getServer.default <- function(connection) { if (connection@dbms == "hive") { url <- connection@url } else { - databaseMetaData <- rJava::.jcall(connection@jConnection, - "Ljava/sql/DatabaseMetaData;", - "getMetaData") + databaseMetaData <- rJava::.jcall( + connection@jConnection, + "Ljava/sql/DatabaseMetaData;", + "getMetaData" + ) url <- rJava::.jcall(databaseMetaData, "Ljava/lang/String;", "getURL") } server <- urltools::url_parse(url)$domain @@ -218,27 +238,33 @@ compileReconnectCode <- function(connection) { } compileReconnectCode.default <- function(connection) { - databaseMetaData <- rJava::.jcall(connection@jConnection, - "Ljava/sql/DatabaseMetaData;", - "getMetaData") - if (connection@dbms == 'hive') { + databaseMetaData <- rJava::.jcall( + connection@jConnection, + "Ljava/sql/DatabaseMetaData;", + "getMetaData" + ) + if (connection@dbms == "hive") { url <- connection@url user <- connection@user } else { url <- rJava::.jcall(databaseMetaData, "Ljava/lang/String;", "getURL") user <- rJava::.jcall(databaseMetaData, "Ljava/lang/String;", "getUserName") } - code <- sprintf("library(DatabaseConnector)\ncon <- connect(dbms = \"%s\", connectionString = \"%s\", user = \"%s\", password = password)", - connection@dbms, - url, - user) + code <- sprintf( + "library(DatabaseConnector)\ncon <- connect(dbms = \"%s\", connectionString = \"%s\", user = \"%s\", password = password)", + connection@dbms, + url, + user + ) return(code) } compileReconnectCode.DatabaseConnectorDbiConnection <- function(connection) { - code <- sprintf("library(DatabaseConnector)\ncon <- connect(dbms = \"%s\", server = \"%s\")", - connection@dbms, - connection@server) + code <- sprintf( + "library(DatabaseConnector)\ncon <- connect(dbms = \"%s\", server = \"%s\")", + connection@dbms, + connection@server + ) return(code) } @@ -247,14 +273,17 @@ getSchemaNames <- function(conn, catalog = NULL) { } getSchemaNames.default <- function(conn, catalog = NULL) { - if (is.null(catalog)) + if (is.null(catalog)) { catalog <- rJava::.jnull("java/lang/String") + } metaData <- rJava::.jcall(conn@jConnection, "Ljava/sql/DatabaseMetaData;", "getMetaData") - resultSet <- rJava::.jcall(metaData, - "Ljava/sql/ResultSet;", - "getSchemas", - catalog, - rJava::.jnull("java/lang/String")) + resultSet <- rJava::.jcall( + metaData, + "Ljava/sql/ResultSet;", + "getSchemas", + catalog, + rJava::.jnull("java/lang/String") + ) on.exit(rJava::.jcall(resultSet, "V", "close")) schemas <- character() while (rJava::.jcall(resultSet, "Z", "next")) { diff --git a/R/Sql.R b/R/Sql.R index f7cf271a..a2f6b536 100644 --- a/R/Sql.R +++ b/R/Sql.R @@ -17,13 +17,13 @@ # limitations under the License. #' Get available Java heap space -#' -#' @description +#' +#' @description #' For debugging purposes: get the available Java heap space. -#' +#' #' @return #' The Java heap space (in bytes). -#' +#' #' @export getAvailableJavaHeapSpace <- function() { availableSpace <- rJava::J("org.ohdsi.databaseConnector.BatchedQuery")$getAvailableHeapSpace() @@ -43,8 +43,12 @@ getAvailableJavaHeapSpace <- function() { lines <- c(lines, paste("-", si$basePkgs)) lines <- c(lines, "") lines <- c(lines, "Other attached packages:") - for (pkg in si$otherPkgs) lines <- c(lines, - paste("- ", pkg$Package, " (", pkg$Version, ")", sep = "")) + for (pkg in si$otherPkgs) { + lines <- c( + lines, + paste("- ", pkg$Package, " (", pkg$Version, ")", sep = "") + ) + } return(paste(lines, collapse = "\n")) } @@ -54,9 +58,10 @@ getAvailableJavaHeapSpace <- function() { writeChar(report, fileConn, eos = NULL) close(fileConn) abort(paste("Error executing SQL:", - message, - paste("An error report has been created at ", fileName), - sep = "\n"), call. = FALSE) + message, + paste("An error report has been created at ", fileName), + sep = "\n" + ), call. = FALSE) } validateInt64Query <- function() { @@ -83,52 +88,64 @@ parseJdbcColumnData <- function(content, columnTypes = NULL, datesAsString = FALSE, integerAsNumeric = getOption("databaseConnectorIntegerAsNumeric", - default = TRUE), + default = TRUE + ), integer64AsNumeric = getOption("databaseConnectorInteger64AsNumeric", - default = TRUE)) { - - if (is.null(columnTypes)) + default = TRUE + )) { + if (is.null(columnTypes)) { columnTypes <- rJava::.jcall(content, "[I", "getColumnTypes") - + } + columns <- vector("list", length(columnTypes)) - + for (i in seq.int(length(columnTypes))) { if (columnTypes[i] == 1) { - column <- rJava::.jcall(content, - "[D", - "getNumeric", - as.integer(i)) + column <- rJava::.jcall( + content, + "[D", + "getNumeric", + as.integer(i) + ) # rJava doesn't appear to be able to return NAs, so converting NaNs to NAs: column[is.nan(column)] <- NA columns[[i]] <- c(columns[[i]], column) } else if (columnTypes[i] == 5) { - column <- rJava::.jcall(content, - "[D", - "getInteger64", - as.integer(i)) + column <- rJava::.jcall( + content, + "[D", + "getInteger64", + as.integer(i) + ) oldClass(column) <- "integer64" if (is.null(columns[[i]])) { columns[[i]] <- column } else { columns[[i]] <- c(columns[[i]], column) } - + if (integer64AsNumeric) { columns[[i]] <- convertInteger64ToNumeric(columns[[i]]) } } else if (columnTypes[i] == 6) { - columns[[i]] <- c(columns[[i]], - rJava::.jcall(content, - "[I", - "getInteger", - as.integer(i))) + columns[[i]] <- c( + columns[[i]], + rJava::.jcall( + content, + "[I", + "getInteger", + as.integer(i) + ) + ) if (integerAsNumeric) { columns[[i]] <- as.numeric(columns[[i]]) } } else { - columns[[i]] <- c(columns[[i]], - rJava::.jcall(content, "[Ljava/lang/String;", "getString", i)) - + columns[[i]] <- c( + columns[[i]], + rJava::.jcall(content, "[Ljava/lang/String;", "getString", i) + ) + if (!datesAsString) { if (columnTypes[i] == 3) { columns[[i]] <- as.Date(columns[[i]]) @@ -138,7 +155,7 @@ parseJdbcColumnData <- function(content, } } } - + names(columns) <- rJava::.jcall(content, "[Ljava/lang/String;", "getColumnNames") # More efficient than as.data.frame, as it avoids converting row.names to character: columns <- structure(columns, class = "data.frame", row.names = seq_len(length(columns[[1]]))) @@ -156,9 +173,9 @@ parseJdbcColumnData <- function(content, #' @param datesAsString Logical: Should dates be imported as character vectors, our should they be converted #' to R's date format? #' @param integerAsNumeric Logical: should 32-bit integers be converted to numeric (double) values? If FALSE -#' 32-bit integers will be represented using R's native \code{Integer} class. +#' 32-bit integers will be represented using R's native \code{Integer} class. #' @param integer64AsNumeric Logical: should 64-bit integers be converted to numeric (double) values? If FALSE -#' 64-bit integers will be represented using \code{bit64::integer64}. +#' 64-bit integers will be represented using \code{bit64::integer64}. #' #' @details #' Retrieves data from the database server and stores it in a data frame. Null values in the database are converted @@ -168,30 +185,33 @@ parseJdbcColumnData <- function(content, #' A data frame containing the data retrieved from the server #' #' @export -lowLevelQuerySql <- function(connection, - query, - datesAsString = FALSE, +lowLevelQuerySql <- function(connection, + query, + datesAsString = FALSE, integerAsNumeric = getOption("databaseConnectorIntegerAsNumeric", default = TRUE), integer64AsNumeric = getOption("databaseConnectorInteger64AsNumeric", default = TRUE)) { UseMethod("lowLevelQuerySql", connection) } #' @export -lowLevelQuerySql.default <- function(connection, - query, - datesAsString = FALSE, +lowLevelQuerySql.default <- function(connection, + query, + datesAsString = FALSE, integerAsNumeric = getOption("databaseConnectorIntegerAsNumeric", default = TRUE), integer64AsNumeric = getOption("databaseConnectorInteger64AsNumeric", default = TRUE)) { - if (rJava::is.jnull(connection@jConnection)) + if (rJava::is.jnull(connection@jConnection)) { abort("Connection is closed") - - batchedQuery <- rJava::.jnew("org.ohdsi.databaseConnector.BatchedQuery", - connection@jConnection, - query, - connection@dbms) - + } + + batchedQuery <- rJava::.jnew( + "org.ohdsi.databaseConnector.BatchedQuery", + connection@jConnection, + query, + connection@dbms + ) + on.exit(rJava::.jcall(batchedQuery, "V", "clear")) - + columnTypes <- rJava::.jcall(batchedQuery, "[I", "getColumnTypes") if (any(columnTypes == 5)) { validateInt64Query() @@ -200,20 +220,21 @@ lowLevelQuerySql.default <- function(connection, while (!rJava::.jcall(batchedQuery, "Z", "isDone")) { rJava::.jcall(batchedQuery, "V", "fetchBatch") batch <- parseJdbcColumnData(batchedQuery, - columnTypes = columnTypes, - datesAsString = datesAsString, - integer64AsNumeric = integer64AsNumeric, - integerAsNumeric = integerAsNumeric) - + columnTypes = columnTypes, + datesAsString = datesAsString, + integer64AsNumeric = integer64AsNumeric, + integerAsNumeric = integerAsNumeric + ) + columns <- rbind(columns, batch) } return(columns) } #' @export -lowLevelQuerySql.DatabaseConnectorDbiConnection <- function(connection, - query, - datesAsString = FALSE, +lowLevelQuerySql.DatabaseConnectorDbiConnection <- function(connection, + query, + datesAsString = FALSE, integerAsNumeric = getOption("databaseConnectorIntegerAsNumeric", default = TRUE), integer64AsNumeric = getOption("databaseConnectorInteger64AsNumeric", default = TRUE)) { columns <- DBI::dbGetQuery(connection@dbiConnection, query) @@ -221,14 +242,14 @@ lowLevelQuerySql.DatabaseConnectorDbiConnection <- function(connection, for (i in seq.int(ncol(columns))) { if (is(columns[[i]], "integer")) { columns[[i]] <- as.numeric(columns[[i]]) - } + } } } if (integer64AsNumeric) { for (i in seq.int(ncol(columns))) { if (is(columns[[i]], "integer64")) { columns[[i]] <- convertInteger64ToNumeric(columns[[i]]) - } + } } } return(columns) @@ -251,7 +272,7 @@ delayIfNecessary <- function(sql, regex, executionTimeList, threshold) { regexGroups <- stringr::str_match(sql, stringr::regex(regex, ignore_case = TRUE)) tableName <- regexGroups[3] if (!is.na(tableName) && !is.null(tableName)) { - currentTime <- Sys.time(); + currentTime <- Sys.time() lastExecutedTime <- executionTimeList[[tableName]] if (!is.na(lastExecutedTime) && !is.null(lastExecutedTime)) { delta <- currentTime - lastExecutedTime @@ -259,8 +280,8 @@ delayIfNecessary <- function(sql, regex, executionTimeList, threshold) { Sys.sleep(threshold - delta) } } - - executionTimeList[[tableName]] = currentTime + + executionTimeList[[tableName]] <- currentTime } return(executionTimeList) } @@ -270,9 +291,9 @@ delayIfNecessaryForDdl <- function(sql) { if (is.null(ddlList)) { ddlList <- list() } - - regexForDdl = "(^CREATE\\s+TABLE\\s+IF\\s+EXISTS|^CREATE\\s+TABLE|^DROP\\s+TABLE\\s+IF\\s+EXISTS|^DROP\\s+TABLE)\\s+([a-zA-Z0-9_$#-]*\\.?\\s*(?:[a-zA-Z0-9_]+)*)" - updatedList <- delayIfNecessary(sql, regexForDdl, ddlList, 5); + + regexForDdl <- "(^CREATE\\s+TABLE\\s+IF\\s+EXISTS|^CREATE\\s+TABLE|^DROP\\s+TABLE\\s+IF\\s+EXISTS|^DROP\\s+TABLE)\\s+([a-zA-Z0-9_$#-]*\\.?\\s*(?:[a-zA-Z0-9_]+)*)" + updatedList <- delayIfNecessary(sql, regexForDdl, ddlList, 5) options(ddlList = updatedList) } @@ -281,9 +302,9 @@ delayIfNecessaryForInsert <- function(sql) { if (is.null(insetList)) { insetList <- list() } - - regexForInsert = "(^INSERT\\s+INTO)\\s+([a-zA-Z0-9_$#-]*\\.?\\s*(?:[a-zA-Z0-9_]+)*)" - updatedList <- delayIfNecessary(sql, regexForInsert, insetList, 5); + + regexForInsert <- "(^INSERT\\s+INTO)\\s+([a-zA-Z0-9_$#-]*\\.?\\s*(?:[a-zA-Z0-9_]+)*)" + updatedList <- delayIfNecessary(sql, regexForInsert, insetList, 5) options(insetList = updatedList) } @@ -292,12 +313,12 @@ lowLevelExecuteSql.default <- function(connection, sql) { statement <- rJava::.jcall(connection@jConnection, "Ljava/sql/Statement;", "createStatement") on.exit(rJava::.jcall(statement, "V", "close")) hasResultSet <- rJava::.jcall(statement, "Z", "execute", as.character(sql), check = FALSE) - + if (connection@dbms == "bigquery") { delayIfNecessaryForDdl(sql) delayIfNecessaryForInsert(sql) } - + rowsAffected <- 0 if (!hasResultSet) { rowsAffected <- rJava::.jcall(statement, "I", "getUpdateCount", check = FALSE) @@ -315,20 +336,23 @@ supportsBatchUpdates <- function(connection) { if (!inherits(connection, "DatabaseConnectorJdbcConnection")) { return(FALSE) } - tryCatch({ - dbmsMeta <- rJava::.jcall(connection@jConnection, "Ljava/sql/DatabaseMetaData;", "getMetaData", check = FALSE) - if (!is.jnull(dbmsMeta)) { - if (rJava::.jcall(dbmsMeta, "Z", "supportsBatchUpdates")) { - # inform("JDBC driver supports batch updates") - return(TRUE); - } else { - inform("JDBC driver does not support batch updates. Sending updates one at a time.") + tryCatch( + { + dbmsMeta <- rJava::.jcall(connection@jConnection, "Ljava/sql/DatabaseMetaData;", "getMetaData", check = FALSE) + if (!is.jnull(dbmsMeta)) { + if (rJava::.jcall(dbmsMeta, "Z", "supportsBatchUpdates")) { + # inform("JDBC driver supports batch updates") + return(TRUE) + } else { + inform("JDBC driver does not support batch updates. Sending updates one at a time.") + } } + }, + error = function(err) { + inform(paste("JDBC driver 'supportsBatchUpdates' threw exception", err$message)) } - }, error = function(err) { - inform(paste("JDBC driver 'supportsBatchUpdates' threw exception", err$message)) - }) - return(FALSE); + ) + return(FALSE) } #' Execute SQL code @@ -346,9 +370,9 @@ supportsBatchUpdates <- function(connection) { #' all statements. #' @param errorReportFile The file where an error report will be written if an error occurs. Defaults to #' 'errorReportSql.txt' in the current working directory. -#' @param runAsBatch When true the SQL statements are sent to the server as a single batch, and +#' @param runAsBatch When true the SQL statements are sent to the server as a single batch, and #' executed there. This will be faster if you have many small SQL statements, but -#' there will be no progress bar, and no per-statement error messages. If the +#' there will be no progress bar, and no per-statement error messages. If the #' database platform does not support batched updates the query is executed without #' batching. #' @@ -361,11 +385,13 @@ supportsBatchUpdates <- function(connection) { #' #' @examples #' \dontrun{ -#' connectionDetails <- createConnectionDetails(dbms = "postgresql", -#' server = "localhost", -#' user = "root", -#' password = "blah", -#' schema = "cdm_v4") +#' connectionDetails <- createConnectionDetails( +#' dbms = "postgresql", +#' server = "localhost", +#' user = "root", +#' password = "blah", +#' schema = "cdm_v4" +#' ) #' conn <- connect(connectionDetails) #' executeSql(conn, "CREATE TABLE x (k INT); CREATE TABLE y (k INT);") #' disconnect(conn) @@ -375,23 +401,24 @@ executeSql <- function(connection, sql, profile = FALSE, progressBar = TRUE, - reportOverallTime = TRUE, + reportOverallTime = TRUE, errorReportFile = file.path(getwd(), "errorReportSql.txt"), runAsBatch = FALSE) { - if (inherits(connection, "DatabaseConnectorJdbcConnection") && rJava::is.jnull(connection@jConnection)) + if (inherits(connection, "DatabaseConnectorJdbcConnection") && rJava::is.jnull(connection@jConnection)) { abort("Connection is closed") - + } + startTime <- Sys.time() - - if (inherits(connection, "DatabaseConnectorJdbcConnection") && - connection@dbms == "redshift" && - rJava::.jcall(connection@jConnection, "Z", "getAutoCommit")) { + + if (inherits(connection, "DatabaseConnectorJdbcConnection") && + connection@dbms == "redshift" && + rJava::.jcall(connection@jConnection, "Z", "getAutoCommit")) { # Turn off autocommit for RedShift to avoid this issue: # https://github.com/OHDSI/DatabaseConnector/issues/90 trySettingAutoCommit(connection, FALSE) on.exit(trySettingAutoCommit(connection, TRUE)) } - + batched <- runAsBatch && supportsBatchUpdates(connection) sqlStatements <- SqlRender::splitSql(sql) if (batched) { @@ -399,7 +426,7 @@ executeSql <- function(connection, rowsAffected <- 0 for (start in seq(1, length(sqlStatements), by = batchSize)) { end <- min(start + batchSize - 1, length(sqlStatements)) - + statement <- rJava::.jcall(connection@jConnection, "Ljava/sql/Statement;", "createStatement") batchSql <- c() for (i in start:end) { @@ -410,20 +437,26 @@ executeSql <- function(connection, if (profile) { SqlRender::writeSql(paste(batchSql, collapse = "\n\n"), sprintf("statements_%s_%s.sql", start, end)) } - tryCatch({ - startQuery <- Sys.time() - rowsAffected <- c(rowsAffected, rJava::.jcall(statement, "[I", "executeBatch")) - delta <- Sys.time() - startQuery - inform(paste("Statements", start, "through", end, "took", delta, attr(delta, "units"))) - }, error = function(err) { - .createErrorReport(connection@dbms, err$message, paste(batchSql, collapse = "\n\n"), errorReportFile) - }, finally = {rJava::.jcall(statement, "V", "close")}) + tryCatch( + { + startQuery <- Sys.time() + rowsAffected <- c(rowsAffected, rJava::.jcall(statement, "[I", "executeBatch")) + delta <- Sys.time() - startQuery + inform(paste("Statements", start, "through", end, "took", delta, attr(delta, "units"))) + }, + error = function(err) { + .createErrorReport(connection@dbms, err$message, paste(batchSql, collapse = "\n\n"), errorReportFile) + }, + finally = { + rJava::.jcall(statement, "V", "close") + } + ) } } else { if (progressBar) { pb <- txtProgressBar(style = 3) } - + for (i in 1:length(sqlStatements)) { sqlStatement <- sqlStatements[i] if (profile) { @@ -431,29 +464,32 @@ executeSql <- function(connection, writeChar(sqlStatement, fileConn, eos = NULL) close(fileConn) } - tryCatch({ - startQuery <- Sys.time() - lowLevelExecuteSql(connection, sqlStatement) - if (profile) { - delta <- Sys.time() - startQuery - inform(paste("Statement ", i, "took", delta, attr(delta, "units"))) + tryCatch( + { + startQuery <- Sys.time() + lowLevelExecuteSql(connection, sqlStatement) + if (profile) { + delta <- Sys.time() - startQuery + inform(paste("Statement ", i, "took", delta, attr(delta, "units"))) + } + }, + error = function(err) { + .createErrorReport(connection@dbms, err$message, sqlStatement, errorReportFile) } - }, error = function(err) { - .createErrorReport(connection@dbms, err$message, sqlStatement, errorReportFile) - }) + ) if (progressBar) { - setTxtProgressBar(pb, i/length(sqlStatements)) + setTxtProgressBar(pb, i / length(sqlStatements)) } } if (progressBar) { close(pb) } } - + if (inherits(connection, "DatabaseConnectorJdbcConnection") && !rJava::.jcall(connection@jConnection, "Z", "getAutoCommit")) { rJava::.jcall(connection@jConnection, "V", "commit") } - + if (reportOverallTime) { delta <- Sys.time() - startTime inform(paste("Executing SQL took", signif(delta, 3), attr(delta, "units"))) @@ -480,15 +516,15 @@ convertFields <- function(dbms, result) { result[[colname]] <- as.POSIXct(as.numeric(result[[colname]]), origin = "1970-01-01", tz = "GMT") } } - } + } if (dbms %in% c("bigquery")) { # BigQuery doesn't have INT fields, only INT64. For more consistent behavior with other # platforms, if it fits in an integer, convert it to an integer: if (ncol(result) > 0) { for (i in 1:ncol(result)) { - if (bit64::is.integer64(result[[i]]) && - (all(is.na(result[[i]])) || ( - min(result[[i]], na.rm = TRUE) > -.Machine$integer.max && + if (bit64::is.integer64(result[[i]]) && + (all(is.na(result[[i]])) || ( + min(result[[i]], na.rm = TRUE) > -.Machine$integer.max && max(result[[i]], na.rm = TRUE) < .Machine$integer.max))) { result[[i]] <- as.integer(result[[i]]) } @@ -499,11 +535,14 @@ convertFields <- function(dbms, result) { } trySettingAutoCommit <- function(connection, value) { - tryCatch({ - rJava::.jcall(connection@jConnection, "V", "setAutoCommit", value) - }, error = function(cond) { - # do nothing - }) + tryCatch( + { + rJava::.jcall(connection@jConnection, "V", "setAutoCommit", value) + }, + error = function(cond) { + # do nothing + } + ) } #' Retrieve data to a data.frame @@ -515,11 +554,11 @@ trySettingAutoCommit <- function(connection, value) { #' @param sql The SQL to be send. #' @param errorReportFile The file where an error report will be written if an error occurs. Defaults to #' 'errorReportSql.txt' in the current working directory. -#' @param snakeCaseToCamelCase If true, field names are assumed to use snake_case, and are converted to camelCase. +#' @param snakeCaseToCamelCase If true, field names are assumed to use snake_case, and are converted to camelCase. #' @param integerAsNumeric Logical: should 32-bit integers be converted to numeric (double) values? If FALSE -#' 32-bit integers will be represented using R's native \code{Integer} class. +#' 32-bit integers will be represented using R's native \code{Integer} class. #' @param integer64AsNumeric Logical: should 64-bit integers be converted to numeric (double) values? If FALSE -#' 64-bit integers will be represented using \code{bit64::integer64}. +#' 64-bit integers will be represented using \code{bit64::integer64}. #' #' @details #' This function sends the SQL to the server and retrieves the results. If an error occurs during SQL @@ -531,44 +570,55 @@ trySettingAutoCommit <- function(connection, value) { #' #' @examples #' \dontrun{ -#' connectionDetails <- createConnectionDetails(dbms = "postgresql", -#' server = "localhost", -#' user = "root", -#' password = "blah", -#' schema = "cdm_v4") +#' connectionDetails <- createConnectionDetails( +#' dbms = "postgresql", +#' server = "localhost", +#' user = "root", +#' password = "blah", +#' schema = "cdm_v4" +#' ) #' conn <- connect(connectionDetails) #' count <- querySql(conn, "SELECT COUNT(*) FROM person") #' disconnect(conn) #' } #' @export -querySql <- function(connection, - sql, - errorReportFile = file.path(getwd(), "errorReportSql.txt"), - snakeCaseToCamelCase = FALSE, +querySql <- function(connection, + sql, + errorReportFile = file.path(getwd(), "errorReportSql.txt"), + snakeCaseToCamelCase = FALSE, integerAsNumeric = getOption("databaseConnectorIntegerAsNumeric", default = TRUE), integer64AsNumeric = getOption("databaseConnectorInteger64AsNumeric", default = TRUE)) { - if (inherits(connection, "DatabaseConnectorJdbcConnection") && rJava::is.jnull(connection@jConnection)) + if (inherits(connection, "DatabaseConnectorJdbcConnection") && rJava::is.jnull(connection@jConnection)) { abort("Connection is closed") + } # Calling splitSql, because this will also strip trailing semicolons (which cause Oracle to crash). sqlStatements <- SqlRender::splitSql(sql) - if (length(sqlStatements) > 1) - abort(paste("A query that returns a result can only consist of one SQL statement, but", - length(sqlStatements), - "statements were found")) - tryCatch({ - result <- lowLevelQuerySql(connection = connection, - query = sqlStatements[1], - integerAsNumeric = integerAsNumeric, - integer64AsNumeric = integer64AsNumeric) - colnames(result) <- toupper(colnames(result)) - result <- convertFields(connection@dbms, result) - if (snakeCaseToCamelCase) { - colnames(result) <- SqlRender::snakeCaseToCamelCase(colnames(result)) + if (length(sqlStatements) > 1) { + abort(paste( + "A query that returns a result can only consist of one SQL statement, but", + length(sqlStatements), + "statements were found" + )) + } + tryCatch( + { + result <- lowLevelQuerySql( + connection = connection, + query = sqlStatements[1], + integerAsNumeric = integerAsNumeric, + integer64AsNumeric = integer64AsNumeric + ) + colnames(result) <- toupper(colnames(result)) + result <- convertFields(connection@dbms, result) + if (snakeCaseToCamelCase) { + colnames(result) <- SqlRender::snakeCaseToCamelCase(colnames(result)) + } + return(result) + }, + error = function(err) { + .createErrorReport(connection@dbms, err$message, sql, errorReportFile) } - return(result) - }, error = function(err) { - .createErrorReport(connection@dbms, err$message, sql, errorReportFile) - }) + ) } #' Render, translate, execute SQL code @@ -586,10 +636,10 @@ querySql <- function(connection, #' all statements. #' @param errorReportFile The file where an error report will be written if an error occurs. Defaults to #' 'errorReportSql.txt' in the current working directory. -#' @param runAsBatch When true the SQL statements are sent to the server as a single batch, and +#' @param runAsBatch When true the SQL statements are sent to the server as a single batch, and #' executed there. This will be faster if you have many small SQL statements, but -#' there will be no progress bar, and no per-statement error messages. If the -#' database platform does not support batched updates the query is executed as +#' there will be no progress bar, and no per-statement error messages. If the +#' database platform does not support batched updates the query is executed as #' ordinarily. #' @param oracleTempSchema DEPRECATED: use \code{tempEmulationSchema} instead. #' @param tempEmulationSchema Some database platforms like Oracle and Impala do not truly support temp tables. To @@ -598,20 +648,23 @@ querySql <- function(connection, #' @param ... Parameters that will be used to render the SQL. #' #' @details -#' This function calls the \code{render} and \code{translate} functions in the SqlRender package before +#' This function calls the \code{render} and \code{translate} functions in the SqlRender package before #' calling \code{\link{executeSql}}. #' #' @examples #' \dontrun{ -#' connectionDetails <- createConnectionDetails(dbms = "postgresql", -#' server = "localhost", -#' user = "root", -#' password = "blah", -#' schema = "cdm_v4") +#' connectionDetails <- createConnectionDetails( +#' dbms = "postgresql", +#' server = "localhost", +#' user = "root", +#' password = "blah", +#' schema = "cdm_v4" +#' ) #' conn <- connect(connectionDetails) -#' renderTranslateExecuteSql(connection, -#' sql = "SELECT * INTO #temp FROM @@schema.person;", -#' schema = "cdm_synpuf") +#' renderTranslateExecuteSql(connection, +#' sql = "SELECT * INTO #temp FROM @@schema.person;", +#' schema = "cdm_synpuf" +#' ) #' disconnect(conn) #' } #' @export @@ -619,7 +672,7 @@ renderTranslateExecuteSql <- function(connection, sql, profile = FALSE, progressBar = TRUE, - reportOverallTime = TRUE, + reportOverallTime = TRUE, errorReportFile = file.path(getwd(), "errorReportSql.txt"), runAsBatch = FALSE, oracleTempSchema = NULL, @@ -627,19 +680,22 @@ renderTranslateExecuteSql <- function(connection, ...) { if (!is.null(oracleTempSchema) && oracleTempSchema != "") { warn("The 'oracleTempSchema' argument is deprecated. Use 'tempEmulationSchema' instead.", - .frequency = "regularly", - .frequency_id = "oracleTempSchema") + .frequency = "regularly", + .frequency_id = "oracleTempSchema" + ) tempEmulationSchema <- oracleTempSchema } sql <- SqlRender::render(sql, ...) sql <- SqlRender::translate(sql, targetDialect = connection@dbms, tempEmulationSchema = tempEmulationSchema) - executeSql(connection = connection, - sql = sql, - profile = profile, - progressBar = progressBar, - reportOverallTime = reportOverallTime, - errorReportFile = errorReportFile, - runAsBatch = runAsBatch) + executeSql( + connection = connection, + sql = sql, + profile = profile, + progressBar = progressBar, + reportOverallTime = reportOverallTime, + errorReportFile = errorReportFile, + runAsBatch = runAsBatch + ) } #' Render, translate, and query to data.frame @@ -651,19 +707,19 @@ renderTranslateExecuteSql <- function(connection, #' @param sql The SQL to be send. #' @param errorReportFile The file where an error report will be written if an error occurs. Defaults to #' 'errorReportSql.txt' in the current working directory. -#' @param snakeCaseToCamelCase If true, field names are assumed to use snake_case, and are converted to camelCase. +#' @param snakeCaseToCamelCase If true, field names are assumed to use snake_case, and are converted to camelCase. #' @param oracleTempSchema DEPRECATED: use \code{tempEmulationSchema} instead. #' @param tempEmulationSchema Some database platforms like Oracle and Impala do not truly support temp tables. To #' emulate temp tables, provide a schema with write privileges where temp tables #' can be created. #' @param integerAsNumeric Logical: should 32-bit integers be converted to numeric (double) values? If FALSE -#' 32-bit integers will be represented using R's native \code{Integer} class. +#' 32-bit integers will be represented using R's native \code{Integer} class. #' @param integer64AsNumeric Logical: should 64-bit integers be converted to numeric (double) values? If FALSE -#' 64-bit integers will be represented using \code{bit64::integer64}. +#' 64-bit integers will be represented using \code{bit64::integer64}. #' @param ... Parameters that will be used to render the SQL. #' #' @details -#' This function calls the \code{render} and \code{translate} functions in the SqlRender package before +#' This function calls the \code{render} and \code{translate} functions in the SqlRender package before #' calling \code{\link{querySql}}. #' #' @return @@ -671,21 +727,24 @@ renderTranslateExecuteSql <- function(connection, #' #' @examples #' \dontrun{ -#' connectionDetails <- createConnectionDetails(dbms = "postgresql", -#' server = "localhost", -#' user = "root", -#' password = "blah", -#' schema = "cdm_v4") +#' connectionDetails <- createConnectionDetails( +#' dbms = "postgresql", +#' server = "localhost", +#' user = "root", +#' password = "blah", +#' schema = "cdm_v4" +#' ) #' conn <- connect(connectionDetails) -#' persons <- renderTranslatequerySql(conn, -#' sql = "SELECT TOP 10 * FROM @@schema.person", -#' schema = "cdm_synpuf") +#' persons <- renderTranslatequerySql(conn, +#' sql = "SELECT TOP 10 * FROM @@schema.person", +#' schema = "cdm_synpuf" +#' ) #' disconnect(conn) #' } #' @export -renderTranslateQuerySql <- function(connection, - sql, - errorReportFile = file.path(getwd(), "errorReportSql.txt"), +renderTranslateQuerySql <- function(connection, + sql, + errorReportFile = file.path(getwd(), "errorReportSql.txt"), snakeCaseToCamelCase = FALSE, oracleTempSchema = NULL, tempEmulationSchema = getOption("sqlRenderTempEmulationSchema"), @@ -694,34 +753,38 @@ renderTranslateQuerySql <- function(connection, ...) { if (!is.null(oracleTempSchema) && oracleTempSchema != "") { warn("The 'oracleTempSchema' argument is deprecated. Use 'tempEmulationSchema' instead.", - .frequency = "regularly", - .frequency_id = "oracleTempSchema") + .frequency = "regularly", + .frequency_id = "oracleTempSchema" + ) tempEmulationSchema <- oracleTempSchema } sql <- SqlRender::render(sql, ...) sql <- SqlRender::translate(sql, targetDialect = connection@dbms, tempEmulationSchema = tempEmulationSchema) - return(querySql(connection = connection, - sql = sql, - errorReportFile = errorReportFile, - snakeCaseToCamelCase = snakeCaseToCamelCase, - integerAsNumeric = integerAsNumeric, - integer64AsNumeric = integer64AsNumeric)) + return(querySql( + connection = connection, + sql = sql, + errorReportFile = errorReportFile, + snakeCaseToCamelCase = snakeCaseToCamelCase, + integerAsNumeric = integerAsNumeric, + integer64AsNumeric = integer64AsNumeric + )) } #' Test a character vector of SQL names for SQL reserved words -#' +#' #' This function checks a character vector against a predefined list of reserved SQL words. #' #' @param sqlNames A character vector containing table or field names to check. #' @param warn (logical) Should a warn be thrown if invalid SQL names are found? #' #' @return A logical vector with length equal to sqlNames that is TRUE for each name that is reserved and FALSE otherwise -#' +#' #' @export -isSqlReservedWord <- function(sqlNames, warn = FALSE){ - if (!is.character(sqlNames)) +isSqlReservedWord <- function(sqlNames, warn = FALSE) { + if (!is.character(sqlNames)) { abort("sqlNames should be a character vector") + } sqlNames <- gsub("^#", "", sqlNames) sqlReservedWords <- read.csv(system.file("csv", "sqlReservedWords.csv", package = "DatabaseConnector"), stringsAsFactors = FALSE) nameIsReserved <- toupper(sqlNames) %in% sqlReservedWords$reservedWords @@ -730,7 +793,7 @@ isSqlReservedWord <- function(sqlNames, warn = FALSE){ warn(paste(badSqlNames, "is a reserved keyword in SQL and should not be used as a table or field name.")) } else if (length(badSqlNames) > 1 & warn) { warn(paste(paste(badSqlNames, collapse = ","), "are reserved keywords in SQL and should not be used as table or field names.")) - } + } return(nameIsReserved) } @@ -768,105 +831,114 @@ isSqlReservedWord <- function(sqlNames, warn = FALSE){ #' #' @examples #' \dontrun{ -#' connectionDetails <- createConnectionDetails(dbms = "postgresql", -#' server = "localhost", -#' user = "root", -#' password = "blah", -#' schema = "cdm_v4") +#' connectionDetails <- createConnectionDetails( +#' dbms = "postgresql", +#' server = "localhost", +#' user = "root", +#' password = "blah", +#' schema = "cdm_v4" +#' ) #' connection <- connect(connectionDetails) -#' +#' #' # First example: write data to a large CSV file: #' filepath <- "myBigFile.csv" #' writeBatchesToCsv <- function(data, position, ...) { -#' write.csv(data, filepath, append = position != 1) -#' return(NULL) +#' write.csv(data, filepath, append = position != 1) +#' return(NULL) #' } #' renderTranslateQueryApplyBatched(connection, -#' "SELECT * FROM @schema.person;", -#' schema = "cdm_synpuf", -#' fun = writeBatchesToCsv) +#' "SELECT * FROM @schema.person;", +#' schema = "cdm_synpuf", +#' fun = writeBatchesToCsv +#' ) #' #' # Second example: write data to Andromeda #' # (Alternative to querySqlToAndromeda if some local computation needs to be applied) #' bigResults <- Andromeda::andromeda() #' writeBatchesToAndromeda <- function(data, position, ...) { -#' data$p <- EmpiricalCalibration::computeTraditionalP(data$logRr, data$logSeRr) -#' if (position == 1) { -#' bigResults$rrs <- data -#' } else { -#' Andromeda::appendToTable(bigResults$rrs, data) -#' } -#' return(NULL) +#' data$p <- EmpiricalCalibration::computeTraditionalP(data$logRr, data$logSeRr) +#' if (position == 1) { +#' bigResults$rrs <- data +#' } else { +#' Andromeda::appendToTable(bigResults$rrs, data) +#' } +#' return(NULL) #' } #' sql <- "SELECT target_id, comparator_id, log_rr, log_se_rr FROM @schema.my_results;" #' renderTranslateQueryApplyBatched(connection, -#' sql, -#' fun = writeBatchesToAndromeda -#' schema = "my_results", -#' snakeCaseToCamelCase = TRUE) +#' sql, +#' fun = writeBatchesToAndromeda, +#' schema = "my_results", +#' snakeCaseToCamelCase = TRUE +#' ) #' #' disconnect(connection) #' } -#' +#' #' @export -renderTranslateQueryApplyBatched <- function( - connection, - sql, - fun, - args = list(), - errorReportFile = file.path(getwd(), "errorReportSql.txt"), - snakeCaseToCamelCase = FALSE, - tempEmulationSchema = getOption("sqlRenderTempEmulationSchema"), - integerAsNumeric = getOption("databaseConnectorIntegerAsNumeric", default = TRUE), - integer64AsNumeric = getOption("databaseConnectorInteger64AsNumeric", default = TRUE), - ... -) { +renderTranslateQueryApplyBatched <- function(connection, + sql, + fun, + args = list(), + errorReportFile = file.path(getwd(), "errorReportSql.txt"), + snakeCaseToCamelCase = FALSE, + tempEmulationSchema = getOption("sqlRenderTempEmulationSchema"), + integerAsNumeric = getOption("databaseConnectorIntegerAsNumeric", default = TRUE), + integer64AsNumeric = getOption("databaseConnectorInteger64AsNumeric", default = TRUE), + ...) { UseMethod("renderTranslateQueryApplyBatched", connection) } #' @export -renderTranslateQueryApplyBatched.default <- function( - connection, - sql, - fun, - args = list(), - errorReportFile = file.path(getwd(), "errorReportSql.txt"), - snakeCaseToCamelCase = FALSE, - tempEmulationSchema = getOption("sqlRenderTempEmulationSchema"), - integerAsNumeric = getOption("databaseConnectorIntegerAsNumeric", default = TRUE), - integer64AsNumeric = getOption("databaseConnectorInteger64AsNumeric", default = TRUE), - ... -) { - if (!is.function(fun)) +renderTranslateQueryApplyBatched.default <- function(connection, + sql, + fun, + args = list(), + errorReportFile = file.path(getwd(), "errorReportSql.txt"), + snakeCaseToCamelCase = FALSE, + tempEmulationSchema = getOption("sqlRenderTempEmulationSchema"), + integerAsNumeric = getOption("databaseConnectorIntegerAsNumeric", default = TRUE), + integer64AsNumeric = getOption("databaseConnectorInteger64AsNumeric", default = TRUE), + ...) { + if (!is.function(fun)) { abort("fun argument must be a function") + } sql <- SqlRender::render(sql, ...) sql <- SqlRender::translate(sql, targetDialect = connection@dbms, tempEmulationSchema = tempEmulationSchema) sql <- SqlRender::splitSql(sql) - if (length(sql) > 1) - abort(paste("A query that returns a result can only consist of one SQL statement, but", - length(sql), - "statements were found")) - tryCatch({ - queryResult <- dbSendQuery(connection, sql) - }, error = function(err) { - .createErrorReport(connection@dbms, err$message, sql, errorReportFile) - }) + if (length(sql) > 1) { + abort(paste( + "A query that returns a result can only consist of one SQL statement, but", + length(sql), + "statements were found" + )) + } + tryCatch( + { + queryResult <- dbSendQuery(connection, sql) + }, + error = function(err) { + .createErrorReport(connection@dbms, err$message, sql, errorReportFile) + } + ) on.exit(dbClearResult(queryResult)) - + columnTypes <- rJava::.jcall(queryResult@content, "[I", "getColumnTypes") if (any(columnTypes == 5)) { validateInt64Query() } - + results <- list() position <- 1 while (!rJava::.jcall(queryResult@content, "Z", "isDone")) { batch <- dbFetch(queryResult, - columnTypes = columnTypes, - integerAsNumeric = integerAsNumeric, - integer64AsNumeric = integer64AsNumeric) - if (snakeCaseToCamelCase) + columnTypes = columnTypes, + integerAsNumeric = integerAsNumeric, + integer64AsNumeric = integer64AsNumeric + ) + if (snakeCaseToCamelCase) { colnames(batch) <- SqlRender::snakeCaseToCamelCase(colnames(batch)) + } rowCount <- nrow(batch) if (rowCount > 0) { result <- do.call(fun, append(list(batch, position), args)) @@ -879,38 +951,41 @@ renderTranslateQueryApplyBatched.default <- function( #' @export -renderTranslateQueryApplyBatched.DatabaseConnectorDbiConnection <- function( - connection, - sql, - fun, - args = list(), - errorReportFile = file.path(getwd(), "errorReportSql.txt"), - snakeCaseToCamelCase = FALSE, - tempEmulationSchema = getOption("sqlRenderTempEmulationSchema"), - integerAsNumeric = getOption("databaseConnectorIntegerAsNumeric", default = TRUE), - integer64AsNumeric = getOption("databaseConnectorInteger64AsNumeric", default = TRUE), - ... -) { - - if (!is.function(fun)) +renderTranslateQueryApplyBatched.DatabaseConnectorDbiConnection <- function(connection, + sql, + fun, + args = list(), + errorReportFile = file.path(getwd(), "errorReportSql.txt"), + snakeCaseToCamelCase = FALSE, + tempEmulationSchema = getOption("sqlRenderTempEmulationSchema"), + integerAsNumeric = getOption("databaseConnectorIntegerAsNumeric", default = TRUE), + integer64AsNumeric = getOption("databaseConnectorInteger64AsNumeric", default = TRUE), + ...) { + if (!is.function(fun)) { abort("fun argument must be a function") - + } + sql <- SqlRender::render(sql, ...) sql <- SqlRender::translate(sql, targetDialect = connection@dbms, tempEmulationSchema = tempEmulationSchema) sql <- SqlRender::splitSql(sql) - if (length(sql) > 1) - abort(paste("A query that returns a result can only consist of one SQL statement, but", - length(sql), - "statements were found")) + if (length(sql) > 1) { + abort(paste( + "A query that returns a result can only consist of one SQL statement, but", + length(sql), + "statements were found" + )) + } results <- lowLevelQuerySql(connection, - sql, - integerAsNumeric = integerAsNumeric, - integer64AsNumeric = integer64AsNumeric) - if (snakeCaseToCamelCase) + sql, + integerAsNumeric = integerAsNumeric, + integer64AsNumeric = integer64AsNumeric + ) + if (snakeCaseToCamelCase) { colnames(results) <- SqlRender::snakeCaseToCamelCase(colnames(results)) - + } + # Note that the DBI connection implementation only ever processes a single batch position <- 1 results <- list(do.call(fun, append(list(results, position), args))) invisible(results) -} \ No newline at end of file +} diff --git a/README.md b/README.md index efc40bb6..55ee9249 100644 --- a/README.md +++ b/README.md @@ -67,9 +67,9 @@ install.packages("DatabaseConnector") ``` -To download and use the JDBC drivers for Oracle, SQL Server, PDW, PostgreSQL, or RedShift, you can use the `downloadJdbcDrivers()` function. For BigQuery, Impala, or Netezza, see [these instructions](http://ohdsi.github.io/DatabaseConnector/reference/jdbcDrivers.html). +To download and use the JDBC drivers for Oracle, SQL Server, PDW, PostgreSQL, Spark, or RedShift, you can use the `downloadJdbcDrivers()` function. For BigQuery, Impala, or Netezza, see [these instructions](http://ohdsi.github.io/DatabaseConnector/reference/jdbcDrivers.html). -To be able to use Windows authentication for SQL Server, you have to install the JDBC driver. Download the .zip from [Microsoft](https://docs.microsoft.com/en-us/sql/connect/jdbc/download-microsoft-jdbc-driver-for-sql-server?view=sql-server-ver15) and extract its contents to a folder. In the extracted folder you will find the file sqljdbc_9.2/enu/auth/x64/mssql-jdbc_auth-9.2.0.x64.dll (64-bits) or ssqljdbc_9.2/enu/auth/x86/mssql-jdbc_auth-9.2.0.x86.dll (32-bits), which needs to be moved to location on the system path, for example to c:/windows/system32. If you not have write access to any folder in the system path, you can also specify the path to the folder containing the dll by setting the environmental variable `PATH_TO_AUTH_DLL`, so for example `Sys.setenv("PATH_TO_AUTH_DLL" = "c:/temp")`. +To be able to use Windows authentication for SQL Server, you have to install the JDBC driver. Download the **version 9.2.0** .zip from [Microsoft](https://docs.microsoft.com/en-us/sql/connect/jdbc/release-notes-for-the-jdbc-driver?view=sql-server-ver15#92-releases) and extract its contents to a folder. In the extracted folder you will find the file sqljdbc_9.2/enu/auth/x64/mssql-jdbc_auth-9.2.0.x64.dll (64-bits) or ssqljdbc_9.2/enu/auth/x86/mssql-jdbc_auth-9.2.0.x86.dll (32-bits), which needs to be moved to location on the system path, for example to c:/windows/system32. If you not have write access to any folder in the system path, you can also specify the path to the folder containing the dll by setting the environmental variable `PATH_TO_AUTH_DLL`, so for example `Sys.setenv("PATH_TO_AUTH_DLL" = "c:/temp")`. User Documentation ================== diff --git a/docs/articles/UsingDatabaseConnector.html b/docs/articles/UsingDatabaseConnector.html index 7ebbb500..79aa6a24 100644 --- a/docs/articles/UsingDatabaseConnector.html +++ b/docs/articles/UsingDatabaseConnector.html @@ -82,13 +82,13 @@ -
+