Skip to content

Commit

Permalink
Updating the DBLINK repo (#20)
Browse files Browse the repository at this point in the history
  • Loading branch information
oualib authored Aug 4, 2023
1 parent 12e7d4a commit 8027fca
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 17 deletions.
30 changes: 30 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
DBLINK Version 0.3.0 (10 May 2023)

* text and binary column length is now limited to the max Vertica supported length
* added support for connect_secret function parameter
* added support for dblink_secret session parameter
* added install_unfenced in the Makefile (getReturnType is called once if unfenced)
* it is now possible to set env variables through the UDx
* added non-Western language test

DBLINK Version 0.2.2 (25 Jan 2023)

* fixed wrong completion in DriverConnect
* replaced SQLFetch() with SQLFetchScroll()
* added ODBC64 in Makefile

DBLINK Version 0.2.1 (16 Dec 2022)

* New connection method via ``connect_secret`` SESSION PARAMETER

DBLINK Version 0.2.0 (01 Dec 2022)

* Fixed a bug with database connections being left open
* Improved error messages
* New section in the README on "How to report issues"
* Created a CHANGELOG


DBLINK Version 0.1.0 (13 Apr 2022)

* Initial Release
119 changes: 102 additions & 17 deletions ldblink.cpp
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// vim:ru:scs:si:sm:sw=4:sta:ts=4:tw=0

// (c) Copyright [2022-2023] Micro Focus or one of its affiliates.
// Licensed under the Apache License, Version 2.0 (the "License");
// You may not use this file except in compliance with the License.
Expand All @@ -11,7 +13,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// vim:ru:scs:si:sm:sw=4:sta:ts=4:tw=0
#include "Vertica.h"
#include "StringParsers.h"

Expand All @@ -23,7 +24,7 @@ using namespace std;
#include <sqlext.h>
#include <iostream>
#include <fstream>

#define DBLINK_CIDS "/usr/local/etc/dblink.cids" // Default Connection identifiers config file FIX: add a param
#define MAXCNAMELEN 128 // Max column name length
#define DEF_ROWSET 100 // Default rowset
Expand Down Expand Up @@ -214,12 +215,12 @@ class DBLink : public TransformFunction
if (!SQL_SUCCEEDED(Oret=SQLBindCol(Ost, j+1, SQL_C_CHAR, Ores[j], desz[j], Olen[j])))
ex_err(SQL_HANDLE_STMT, Ost, 401, "Error binding column");
break ;
case SQL_CHAR:
case SQL_WCHAR:
case SQL_VARCHAR:
case SQL_WVARCHAR:
case SQL_LONGVARCHAR:
case SQL_WLONGVARCHAR:
case SQL_CHAR:
case SQL_VARCHAR:
case SQL_LONGVARCHAR:
Ores[j] = (SQLPOINTER)srvInterface.allocator->alloc(desz[j] * rowset);
if (!SQL_SUCCEEDED(Oret=SQLBindCol(Ost, j+1, SQL_C_CHAR, Ores[j], desz[j], Olen[j])))
ex_err(SQL_HANDLE_STMT, Ost, 401, "Error binding column");
Expand Down Expand Up @@ -282,7 +283,7 @@ class DBLink : public TransformFunction
}

// Fetch loop:
while ( SQL_SUCCEEDED(Oret=SQLFetch(Ost)) && !isCanceled() ) {
while ( SQL_SUCCEEDED(Oret=SQLFetchScroll(Ost, SQL_FETCH_NEXT, 0)) && !isCanceled() ) {
for ( unsigned int i = 0 ; i < nfr ; i++, outputWriter.next() ) {
for ( unsigned int j = 0 ; j < Oncol ; j++ ) {
Odp = (SQLPOINTER)((uint8_t *)Ores[j] + desz[j] * i) ;
Expand Down Expand Up @@ -432,28 +433,43 @@ class DBLinkFactory : public TransformFunctionFactory
SQLSMALLINT Onull = 0 ;
SQLCHAR Ocname[MAXCNAMELEN] ;
std::string cid = "" ;
std::string cid_env = "" ;
std::string cid_file = DBLINK_CIDS ;
std::string cid_name = "" ;
std::string cid_value = "" ;
bool connect = false ;

// Read Params:
ParamReader params = srvInterface.getParamReader();
if( params.containsParameter("cidfile") ) { // Start checking "cidfile" param
cid_file = params.getStringRef("cidfile").str() ;
#ifdef DBLINK_DEBUG
srvInterface.log("DEBUG DBLINK read param cidfile=<%s>", cid_file.c_str() );
#endif
}
if( params.containsParameter("cid") ) { // Start checking "cid" param
cid = params.getStringRef("cid").str() ;
#ifdef DBLINK_DEBUG
srvInterface.log("DEBUG DBLINK read param cid=<%s>", cid.c_str() );
#endif
} else if( params.containsParameter("connect_secret") ) { // if "cid" is undef try with "connect_secret"
connect = true ;
cid = params.getStringRef("connect_secret").str() ;
} else if( params.containsParameter("connect") ) { // support the legacy name "connect"
} else if( params.containsParameter("connect") ) { // if "cid" is undef try with "connect"
connect = true ;
cid = params.getStringRef("connect").str() ;
} else if (srvInterface.getUDSessionParamReader("library").containsParameter("dblink_secret")) {
connect = true ;
cid = srvInterface.getUDSessionParamReader("library").getStringRef("dblink_secret").str() ;
} else if (srvInterface.getUDSessionParamReader("library").containsParameter("dblink_secret")) {
// if "cid", "connect_secret" and "connect" are not defined try "dblink_secret" session param
connect = true ;
cid = srvInterface.getUDSessionParamReader("library").getStringRef("dblink_secret").str() ;
} else {
vt_report_error(101, "DBLINK. Missing connection parameters");
}
if( params.containsParameter("query") ) {
query = params.getStringRef("query").str() ;
#ifdef DBLINK_DEBUG
srvInterface.log("DEBUG DBLINK read param query=<%s>", query.c_str() );
#endif
} else {
vt_report_error(102, "DBLINK. Missing query parameter");
}
Expand All @@ -474,7 +490,7 @@ class DBLinkFactory : public TransformFunctionFactory
cid_value = cid ;
}
} else { // new CID connect style:
std::ifstream cids(DBLINK_CIDS) ;
std::ifstream cids(cid_file) ;
if ( cids.is_open() ) {
std::string cline ;
size_t pos ;
Expand All @@ -485,13 +501,27 @@ class DBLinkFactory : public TransformFunctionFactory
cid_name = cline.substr(0, pos) ;
if ( cid_name == cid )
cid_value = cline.substr(pos + 1, std::string::npos ) ;
else if ( cid_name == cid + "$" ) {
cid_env = cline.substr(pos + 1, std::string::npos ) ;
std::stringstream se_stream ( cid_env ) ;
std::string token ;
while ( std::getline ( se_stream, token, ';' ) ) {
size_t pos = 0 ;
if ( ( pos = token.find('=') ) && pos != std::string::npos ) {
#ifdef DBLINK_DEBUG
srvInterface.log("DEBUG DBLINK setting <%s> to <%s>", token.substr(0, pos).c_str(), token.substr(pos+1).c_str() );
#endif
setenv ( token.substr(0, pos).c_str(), token.substr(pos + 1).c_str(), 1);
}
}
}
} else {
continue ; // skip malformed lines
}
}
cids.close() ;
} else {
vt_report_error(104, "DBLINK. Error reading <%s>", DBLINK_CIDS);
vt_report_error(104, "DBLINK. Error reading <%s>", cid_file);
}
if ( cid_value.empty() ) {
vt_report_error(105, "DBLINK. Error finding CID <%s> in <%s>", cid.c_str(), DBLINK_CIDS);
Expand Down Expand Up @@ -520,7 +550,7 @@ class DBLinkFactory : public TransformFunctionFactory
if (!SQL_SUCCEEDED(Oret=SQLAllocHandle(SQL_HANDLE_DBC, Oenv, &Ocon))){
ex_err(0, 0, 109, "Error allocating Connection Handle");
}
if (!SQL_SUCCEEDED(Oret=SQLDriverConnect(Ocon, NULL, (SQLCHAR *)cid_value.c_str(), SQL_NTS, NULL, 0, NULL, SQL_DRIVER_COMPLETE))){
if (!SQL_SUCCEEDED(Oret=SQLDriverConnect(Ocon, (SQLHWND)NULL, (SQLCHAR *)cid_value.c_str(), SQL_NTS, NULL, 0, NULL, SQL_DRIVER_NOPROMPT))){
ex_err(SQL_HANDLE_DBC, Ocon, 110, "Error connecting to target database");
}

Expand All @@ -529,7 +559,7 @@ class DBLinkFactory : public TransformFunctionFactory
if ( !strncasecmp(query.c_str(), "SELECT", 6) )
is_select = true ;

// ODBC Statement execution:
// ODBC Statement preparation:
if (!SQL_SUCCEEDED(Oret=SQLAllocHandle(SQL_HANDLE_STMT, Ocon, &Ost))){
ex_err(SQL_HANDLE_DBC, Ocon, 111, "Error allocating Statement Handle");
}
Expand All @@ -553,11 +583,15 @@ class DBLinkFactory : public TransformFunctionFactory
ex_err(0, 0, 119, "Error allocating result set decimal size array");
}
for ( unsigned int j = 0 ; j < Oncol ; j++ ) {
SQLLEN Ool = 0 ;
if ( !SQL_SUCCEEDED(Oret=SQLDescribeCol(Ost, (SQLUSMALLINT)(j+1),
Ocname, (SQLSMALLINT) MAXCNAMELEN, &Onamel,
&Odt[j], &Ors[j], &Odd[j], &Onull))) {
ex_err(SQL_HANDLE_STMT, Ost, 120, "Error getting column description");
}
#ifdef DBLINK_DEBUG
srvInterface.log("DEBUG DBLINK SQLDescribeCol src column=%u name=%s data_type=%d length=%zu", j, (char *)Ocname, Odt[j], Ors[j]);
#endif
std::string cname((char *)Ocname);
switch(Odt[j]) {
case SQL_SMALLINT:
Expand All @@ -581,20 +615,59 @@ class DBLinkFactory : public TransformFunctionFactory
break ;
case SQL_CHAR:
case SQL_WCHAR:
if( !SQL_SUCCEEDED(Oret=SQLColAttribute(Ost, (SQLUSMALLINT)(j+1), SQL_DESC_OCTET_LENGTH,
(SQLPOINTER) NULL, (SQLSMALLINT) 0, (SQLSMALLINT *) NULL, &Ool))) {
ex_err(SQL_HANDLE_STMT, Ost, 120, "Error getting column description");
}
#ifdef DBLINK_DEBUG
srvInterface.log("DEBUG DBLINK SQLColAttribute SQL_DESC_OCTET_LENGTH src column=%u name=%s data_type=%d length=%ld", j, (char *)Ocname, Odt[j], Ool);
#endif
if ( Ool > 0 && (SQLULEN)Ool > Ors[j] )
Ors[j] = Ool ;
if ( Ors[j] > 65000 ) {
srvInterface.log("DBLINK SQL_[W]CHAR column %s of length %zu limited to 65000 bytes", (char *)Ocname, Ors[j]);
Ors[j] = 65000;
}
desz[j] = (size_t)(Ors[j] + 1) ;
if ( !Ors[j] )
Ors[j] = 1 ;
outputTypes.addChar((int32)Ors[j], cname) ;
break ;
case SQL_VARCHAR:
case SQL_WVARCHAR:
if( !SQL_SUCCEEDED(Oret=SQLColAttribute(Ost, (SQLUSMALLINT)(j+1), SQL_DESC_OCTET_LENGTH,
(SQLPOINTER) NULL, (SQLSMALLINT) 0, (SQLSMALLINT *) NULL, &Ool))) {
ex_err(SQL_HANDLE_STMT, Ost, 120, "Error getting column description");
}
#ifdef DBLINK_DEBUG
srvInterface.log("DEBUG DBLINK SQLColAttribute SQL_DESC_OCTET_LENGTH src column=%u name=%s data_type=%d length=%ld", j, (char *)Ocname, Odt[j], Ool);
#endif
if ( Ool > 0 && (SQLULEN)Ool > Ors[j] )
Ors[j] = Ool ;
if ( Ors[j] > 65000 ) {
srvInterface.log("DBLINK SQL_[W]VARCHAR column %s of length %zu limited to 65000 bytes", (char *)Ocname, Ors[j]);
Ors[j] = 65000;
}
desz[j] = (size_t)(Ors[j] + 1) ;
if ( !Ors[j] )
Ors[j] = 1 ;
outputTypes.addVarchar((int32)Ors[j], cname) ;
break ;
case SQL_LONGVARCHAR:
case SQL_WLONGVARCHAR:
if( !SQL_SUCCEEDED(Oret=SQLColAttribute(Ost, (SQLUSMALLINT)(j+1), SQL_DESC_OCTET_LENGTH,
(SQLPOINTER) NULL, (SQLSMALLINT) 0, (SQLSMALLINT *) NULL, &Ool))) {
ex_err(SQL_HANDLE_STMT, Ost, 120, "Error getting column description");
}
#ifdef DBLINK_DEBUG
srvInterface.log("DEBUG DBLINK SQLColAttribute SQL_DESC_OCTET_LENGTH src column=%u name=%s data_type=%d length=%ld", j, (char *)Ocname, Odt[j], Ool);
#endif
if ( Ool > 0 && (SQLULEN)Ool > Ors[j] )
Ors[j] = Ool ;
if ( Ors[j] > 32000000 ) {
srvInterface.log("DBLINK SQL_LONG[W]VARCHAR column %s of length %zu limited to 32000000 bytes", (char *)Ocname, Ors[j]);
Ors[j] = 32000000;
}
desz[j] = (size_t)(Ors[j] + 1) ;
if ( !Ors[j] )
Ors[j] = 1 ;
Expand All @@ -617,14 +690,26 @@ class DBLinkFactory : public TransformFunctionFactory
outputTypes.addBool(cname) ;
break ;
case SQL_BINARY:
if ( Ors[j] > 65000 ) {
srvInterface.log("DBLINK SQL_BINARY column %s of length %zu limited to 65000 bytes", (char *)Ocname, Ors[j]);
Ors[j] = 65000;
}
desz[j] = (size_t)(Ors[j] + 1) ;
outputTypes.addBinary((int32)Ors[j], cname) ;
break ;
case SQL_VARBINARY:
if ( Ors[j] > 65000 ) {
srvInterface.log("DBLINK SQL_VARBINARY column %s of length %zu limited to 65000 bytes", (char *)Ocname, Ors[j]);
Ors[j] = 65000;
}
desz[j] = (size_t)(Ors[j] + 1) ;
outputTypes.addVarbinary((int32)Ors[j], cname) ;
break ;
case SQL_LONGVARBINARY:
if ( Ors[j] > 32000000 ) {
srvInterface.log("DBLINK SQL_LONGVARBINARY column %s of length %zu limited to 32000000 bytes", (char *)Ocname, Ors[j]);
Ors[j] = 32000000;
}
desz[j] = (size_t)(Ors[j] + 1) ;
outputTypes.addLongVarbinary((int32)Ors[j], cname) ;
break ;
Expand All @@ -650,6 +735,7 @@ class DBLinkFactory : public TransformFunctionFactory
parameterTypes.addVarchar(1024, "cid", { true, false, false, "Connection Identifier Database. Identifies an entry in the connection identifier database." });
parameterTypes.addVarchar(1024, "connect", { true, false, false, "The ODBC connection string containing the DSN and credentials." });
parameterTypes.addVarchar(1024, "connect_secret", { true, false, false, "The ODBC connection string containing the DSN and credentials." });
parameterTypes.addVarchar(1024, "cidfile", { true, false, false, "Connection Identifier File Path." });
parameterTypes.addVarchar(65000, "query", { true, false, false, "The query being pushed on the remote database. Or, '@' followed by the name of the file containing the query." });
parameterTypes.addInt("rowset", { true, false, false, "Number of rows retrieved from the remote database during each SQLFetch() cycle. Default is 100." });
}
Expand All @@ -661,12 +747,11 @@ class DBLinkFactory : public TransformFunctionFactory
};
RegisterFactory(DBLinkFactory);

// older versions of the SDK don't have this defined
RegisterLibrary (
"Maurizio Felici",
__DATE__,
"0.2.1",
"12.0.2",
"0.3.0",
"12.0.4",
"maurizio.felici@vertica.com",
"DBLINK: run SQL on other databases",
"",
Expand Down

0 comments on commit 8027fca

Please # to comment.