Skip to content
This repository has been archived by the owner on Aug 15, 2024. It is now read-only.

Experiencing bulk api #20

Merged
merged 5 commits into from
Nov 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,8 @@ yarn cli query "SELECT 1;"
# or installed global
tentaclesql query "SELECT 1;"
```

### Bulk fetch
By default Tentacle sends one HTTP request for each table data, however you can change this and fetch all table data in one HTTP request. To enable this you need to pass following paramaters:
- `BULK_FETCH=true`
- `BULK_FETCH_URL=url`
130 changes: 105 additions & 25 deletions src/executor/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,33 @@ async function fetchTableData (
return res.json()
}

async function fetchTablesData (
tableDefinitions: Array<TableDefinition>,
headers: any,
queryAst: any,
method: 'POST' | 'GET' = 'POST'
): Promise<any> {
if (process.env.BULK_FETCH_URL === undefined) {
return Error('Bulk fetch requested but bulk fetch url is not defined.')
}
const res = await fetch(
process.env.BULK_FETCH_URL, {
headers: headers,
method: method,
body: JSON.stringify({
query_ast: queryAst,
names: tableDefinitions.map((tableDefinition: TableDefinition) => tableDefinition.name)
})
}
)

if (!res.ok) {
return Promise.reject(new Error(`Error with the request. Status code: ${res.status}`))
}

return res.json()
}

async function populateTables (
db: IDatabaseAdapter,
usedTables: Array<string>,
Expand All @@ -81,46 +108,99 @@ async function populateTables (
) => usedTables.includes(tableDefinition.name))

const promises = filteredTableDefinition.map(async (tableDefinition: TableDefinition) => {
const schemas = parseSchema(tableDefinition.fields).join(', ')
const data = await fetchTableData(tableDefinition, headers, queryAst)
syncData(tableDefinition, data, db)
})
return Promise.all(promises)
}

if (!tableDefinition.autodiscover) {
db.createTable(tableDefinition, schemas)
}
async function populateTablesInOneHTTPRequest (
db: IDatabaseAdapter,
usedTables: Array<string>,
headers: any,
schema: any,
queryAst: any
) {
const filteredTableDefinition = schema.filter((
tableDefinition: TableDefinition
) => usedTables.includes(tableDefinition.name))
const remoteData = await fetchTablesData(filteredTableDefinition, headers, queryAst)
filteredTableDefinition.forEach((tableDefinition: TableDefinition) => {
const targetTable = remoteData.find((tableData: any) => tableData.name === tableDefinition.name)
syncData(
tableDefinition,
targetTable.data,
db
)
})
}

const data = await fetchTableData(tableDefinition, headers, queryAst)
function syncData (
tableDefinition: TableDefinition,
data: any,
db: IDatabaseAdapter
) {
const schemas = parseSchema(tableDefinition.fields).join(', ')

const resultKey = tableDefinition.resultKey
const dataPointer = resultKey ? data[resultKey] : data
const fixedData = dataPointer.map((field: any) => flattenObject(field, '_'))
if (!tableDefinition.autodiscover) {
db.createTable(tableDefinition, schemas)
}

if (fixedData.length === 0) return
const resultKey = tableDefinition.resultKey
const dataPointer = resultKey ? data[resultKey] : data
const fixedData = dataPointer.map((field: any) => flattenObject(field, '_'))

// No support for booleans :/
mutateDataframe(fixedData, (row, k) => {
if (typeof row[k] === 'boolean') row[k] = row[k] ? 'TRUE' : 'FALSE'
})
if (fixedData.length === 0) return

if (tableDefinition.autodiscover) {
const dynamicDefinition = {
name: tableDefinition.name,
fields: Object.keys(fixedData[0]).map((key) => ({ key: key }))
}
// No support for booleans :/
mutateDataframe(fixedData, (row, k) => {
if (typeof row[k] === 'boolean') row[k] = row[k] ? 'TRUE' : 'FALSE'
})

db.createTable(dynamicDefinition, schemas)
db.storeToDb(dynamicDefinition, fixedData)
} else {
db.storeToDb(tableDefinition, fixedData)
if (tableDefinition.autodiscover) {
const dynamicDefinition = {
name: tableDefinition.name,
fields: Object.keys(fixedData[0]).map((key) => ({ key: key }))
}
})

return Promise.all(promises)
db.createTable(dynamicDefinition, schemas)
db.storeToDb(dynamicDefinition, fixedData)
} else {
db.storeToDb(tableDefinition, fixedData)
}
}

const DEFAULT_CONFIG = {
extensions: [],
schema: []
}

async function runPopulateTables (
db: IDatabaseAdapter,
usedTables: Array<string>,
headers: any,
schema: any,
ast: any
) {
if (process.env.BULK_FETCH) {
await populateTablesInOneHTTPRequest(
db,
usedTables,
headers,
schema,
ast
)
} else {
await populateTables(
db,
usedTables,
headers,
schema,
ast
)
}
}

async function executor (
sql: string,
parameters: Parameters,
Expand Down Expand Up @@ -153,7 +233,7 @@ async function executor (
const headersWithHost = getHost() ? { ...headers, host: getHost() } : { ...headers }
headersWithHost['user-agent'] = `tentaclesql/${version}`

await populateTables(
await runPopulateTables(
db,
usedTables,
headers,
Expand Down