forked from plotly/falcon
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathathena.js
188 lines (166 loc) · 6.8 KB
/
athena.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
import {executeQuery} from './drivers/athena';
import Logger from '../../logger';
const SHOW_TABLES_QUERY = 'SHOW TABLES';
const SHOW_SCHEMA_QUERY = 'SELECT table_name, column_name, data_type FROM '
+ 'information_schema.columns WHERE table_schema ';
const DEFAULT_QUERY_INTERVAL = 2000;
/*
* The connection function will validate the parameters and return the connection
* parameters
* @param {object} connection
* @param {string} connection.accessKey - AWS Access Key
* @param {string} connection.secretAccessKey - AWS Secret Key
* @param {string} connection.region - AWS Region
* @param {string} connection.database - Database name to connect to
* @param {string} connection.outputS3Bucket - Location where Athena will output resutls of query
* @param {number} connection.timeout - The timeout interval when the query should stop
* @param {boolean}
* @returns {Promise} that resolves connection
*/
export function connect(connection) {
const {
region, accessKey, secretKey, database, sqlStatement, outputS3Bucket, sslEnabled
} = connection;
let queryInterval = connection.timeout;
return new Promise(function(resolve, reject) {
if (!region) {
return reject(new Error('The AWS Region was not defined'));
}
if (!accessKey) {
return reject(new Error('The AWS access key was not defined'));
}
if (!secretKey) {
return reject(new Error('The AWS secret key was not defined'));
}
if (!database) {
return reject(new Error('The Database Name was not defined'));
}
if (!outputS3Bucket) {
return reject(new Error('The Athena S3 Results Output Bucket was not defined'));
}
if (!queryInterval && queryInterval < 0) {
queryInterval = DEFAULT_QUERY_INTERVAL;
}
const con = {
region, accessKey, secretKey, database, sqlStatement, outputS3Bucket, queryInterval, sslEnabled
};
// Test the connection to get a list of schemas
// This will validate that the connection properties work
schemas(con).then(() => {
resolve(con);
}).catch(err => {
Logger.log(err);
reject(err);
});
});
}
/**
* The following method will execute a query against the specified connection
* @param {object} queryObject - The SQL to query against the connection
* @param {object} connection - Connection parameters
* @returns {Promise} that resolves to { columnnames, rows }
*/
export function query(queryObject, connection) {
let columnnames = [];
const rows = [];
return new Promise(function(resolve, reject) {
if (!queryObject) {
return reject(new Error('The SQL Statement was not defined'));
}
connection.sqlStatement = queryObject;
executeQuery(connection).then(dataSet => {
if (dataSet && dataSet.length > 0) {
// First column contains the column names
const cols = dataSet[0].Data;
columnnames = cols.map(col => col.VarCharValue);
// Loop through the remaining rows to extract data
for (let i = 1; i < dataSet.length; i++) {
const row = dataSet[i];
// Ensure Row is defined and has expected number of columns
if (row && row.Data && row.Data.length === columnnames.length) {
const r = row.Data.map(element => {
if (element && element.VarCharValue) {
return element.VarCharValue;
}
return '';
});
rows.push(r);
}
}
}
resolve({columnnames, rows});
}).catch(err => {
Logger.log(err);
throw err;
});
});
}
/**
* Should return a list of tables and their columns that are defined within the database.
* @param {object} connection - Connection parameters
* @param {string} connection.accessKey - AWS Access Key
* @param {string} connection.secretAccessKey - AWS Secret Key
* @param {string} connection.region - AWS Region
* @param {string} connection.database - Database name to connect to
* @param {string} connection.outputS3Bucket - Location will Athena will output resutls of query
* @returns {Promise} that resolves to { columnnames, rows }
*/
export function schemas(connection) {
const columnnames = ['Table', 'column_name', 'data_type'];
const rows = [];
connection.sqlStatement = `${SHOW_SCHEMA_QUERY} = '${connection.database}'` ;
connection.queryInterval = DEFAULT_QUERY_INTERVAL;
return new Promise(function(resolve) {
executeQuery(connection).then(dataSet => {
if (dataSet && dataSet.length > 0) {
for (let i = 0; i < dataSet.length; i++) {
const data = dataSet[i];
if (data && data.Data && data.Data.length > 0) {
if (i !== 0) {
const row = [];
row.push(data.Data[0].VarCharValue); // Table Name
row.push(data.Data[1].VarCharValue); // Column Name
row.push(data.Data[2].VarCharValue); // DataType
rows.push(row);
}
}
}
}
return resolve({columnnames, rows});
}).catch(err => {
Logger.log(err);
throw err;
});
});
}
/**
* Should return a list of tables that are in the database
* @param {object} connection - Connection Parameters
* @param {string} connection.accessKey - AWS Access Key
* @param {string} connection.secretAccessKey - AWS Secret Key
* @param {string} connection.region - AWS Region
* @param {string} connection.database - Database name to connect to
* @param {string} connection.outputS3Bucket - Location will Athena will output resutls of query
* @returns {Promise} that resolves to { columnnames, rows }
*/
export function tables(connection) {
connection.sqlStatement = SHOW_TABLES_QUERY;
connection.queryInterval = DEFAULT_QUERY_INTERVAL;
return new Promise(function(resolve, reject) {
executeQuery(connection).then(dataSet => {
let rst = [];
if (dataSet && dataSet.length > 0) {
rst = dataSet.map(data => {
if (data && data.Data && data.Data.length > 0) {
return data.Data[0].VarCharValue;
}
return '';
});
}
return resolve(rst);
}).catch(err => {
Logger.log(err);
return reject(err);
});
});
}