-
Notifications
You must be signed in to change notification settings - Fork 465
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
fix: records update bugs and inefficiency #2941
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,17 +1,8 @@ | ||
import dayjs from 'dayjs'; | ||
import utc from 'dayjs/plugin/utc.js'; | ||
import { db } from '../db/client.js'; | ||
import type { | ||
FormattedRecord, | ||
FormattedRecordWithMetadata, | ||
GetRecordsResponse, | ||
LastAction, | ||
RecordCount, | ||
ReturnedRecord, | ||
UnencryptedRecord, | ||
UpsertSummary | ||
} from '../types.js'; | ||
import { decryptRecord, decryptRecords, encryptRecords } from '../utils/encryption.js'; | ||
import type { FormattedRecord, FormattedRecordWithMetadata, GetRecordsResponse, LastAction, RecordCount, ReturnedRecord, UpsertSummary } from '../types.js'; | ||
import { decryptRecordData, encryptRecords } from '../utils/encryption.js'; | ||
import { RECORDS_TABLE, RECORD_COUNTS_TABLE } from '../constants.js'; | ||
import { removeDuplicateKey, getUniqueId } from '../helpers/uniqueKey.js'; | ||
import { logger } from '../utils/logger.js'; | ||
|
@@ -173,7 +164,7 @@ export async function getRecords({ | |
} | ||
|
||
const results = rawResults.map((item) => { | ||
const decryptedData = decryptRecord(item); | ||
const decryptedData = decryptRecordData(item); | ||
const encodedCursor = Buffer.from(`${item.last_modified_at}||${item.id}`).toString('base64'); | ||
return { | ||
...decryptedData, | ||
|
@@ -322,24 +313,27 @@ export async function update({ | |
for (let i = 0; i < recordsWithoutDuplicates.length; i += BATCH_SIZE) { | ||
const chunk = recordsWithoutDuplicates.slice(i, i + BATCH_SIZE); | ||
|
||
updatedKeys.push(...(await getUpdatedKeys({ records: chunk, connectionId, model, trx }))); | ||
const oldRecords = await getRecordsToUpdate({ records: chunk, connectionId, model, trx }); | ||
if (oldRecords.length === 0) { | ||
logger.warning(`Did not find any record to update: ${{ connectionId, model }}`); | ||
} | ||
|
||
const recordsToUpdate: FormattedRecord[] = []; | ||
const rawOldRecords = await getRecordsByExternalIds({ externalIds: updatedKeys, connectionId, model, trx }); | ||
for (const rawOldRecord of rawOldRecords) { | ||
if (!rawOldRecord) { | ||
for (const oldRecord of oldRecords) { | ||
const oldRecordData = decryptRecordData(oldRecord); | ||
|
||
const inputRecord = chunk.find((record) => record.external_id === oldRecord.external_id); | ||
if (!inputRecord) { | ||
continue; | ||
} | ||
|
||
const { record: oldRecord, ...oldRecordRest } = rawOldRecord; | ||
|
||
const record = records.find((record) => record.external_id === rawOldRecord.external_id); | ||
const { json: newRecordData, ...newRecordRest } = inputRecord; | ||
|
||
const newRecord: FormattedRecord = { | ||
...oldRecordRest, | ||
...newRecordRest, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. that was a bug. The only attributes that is different is the sync_job_id but was not updated |
||
json: { | ||
...oldRecord, | ||
...record?.json | ||
...oldRecordData, | ||
...newRecordData | ||
}, | ||
updated_at: new Date() | ||
}; | ||
|
@@ -348,10 +342,10 @@ export async function update({ | |
if (recordsToUpdate.length > 0) { | ||
const encryptedRecords = encryptRecords(recordsToUpdate); | ||
await trx.from(RECORDS_TABLE).insert(encryptedRecords).onConflict(['connection_id', 'external_id', 'model']).merge(); | ||
updatedKeys.push(...recordsToUpdate.map((record) => record.external_id)); | ||
} | ||
} | ||
}); | ||
|
||
return Ok({ | ||
addedKeys: [], | ||
updatedKeys, | ||
|
@@ -458,10 +452,10 @@ export async function markNonCurrentGenerationRecordsAsDeleted({ | |
} | ||
|
||
/** | ||
* getUpdatedKeys | ||
* @desc returns a list of the keys that exist in the records tables but have a different data_hash | ||
* getRecordsToUpdate | ||
* @desc returns records that exist in the records table but have a different data_hash | ||
*/ | ||
async function getUpdatedKeys({ | ||
async function getRecordsToUpdate({ | ||
records, | ||
connectionId, | ||
model, | ||
|
@@ -471,21 +465,19 @@ async function getUpdatedKeys({ | |
connectionId: number; | ||
model: string; | ||
trx: Knex.Transaction; | ||
}): Promise<string[]> { | ||
}): Promise<FormattedRecord[]> { | ||
const keys: string[] = records.map((record: FormattedRecord) => getUniqueId(record)); | ||
const keysWithHash: [string, string][] = records.map((record: FormattedRecord) => [getUniqueId(record), record.data_hash]); | ||
|
||
const rowsToUpdate = (await trx | ||
return trx | ||
.select<FormattedRecord[]>('*') | ||
.from(RECORDS_TABLE) | ||
.pluck('external_id') | ||
.where({ | ||
connection_id: connectionId, | ||
model | ||
}) | ||
.whereIn('external_id', keys) | ||
.whereNotIn(['external_id', 'data_hash'], keysWithHash)) as unknown as string[]; | ||
|
||
return rowsToUpdate; | ||
.whereNotIn(['external_id', 'data_hash'], keysWithHash); | ||
} | ||
|
||
async function getUpsertSummary({ | ||
|
@@ -523,44 +515,12 @@ async function getUpsertSummary({ | |
}; | ||
} else { | ||
const addedKeys = keys.filter((key: string) => !nonDeletedKeys.includes(key)); | ||
const updatedKeys = await getUpdatedKeys({ records, connectionId, model, trx }); | ||
const updatedRecords = await getRecordsToUpdate({ records, connectionId, model, trx }); | ||
return { | ||
addedKeys, | ||
updatedKeys, | ||
updatedKeys: updatedRecords.map((record) => record.external_id), | ||
deletedKeys: [], | ||
nonUniqueKeys: nonUniqueKeys | ||
}; | ||
} | ||
} | ||
|
||
async function getRecordsByExternalIds({ | ||
externalIds, | ||
connectionId, | ||
model, | ||
trx | ||
}: { | ||
externalIds: string[]; | ||
connectionId: number; | ||
model: string; | ||
trx: Knex.Transaction; | ||
}): Promise<UnencryptedRecord[]> { | ||
const encryptedRecords = await trx | ||
.from<FormattedRecord>(RECORDS_TABLE) | ||
.where({ | ||
connection_id: connectionId, | ||
model | ||
}) | ||
.whereIn('external_id', externalIds); | ||
|
||
if (!encryptedRecords) { | ||
return []; | ||
} | ||
|
||
const result = decryptRecords(encryptedRecords); | ||
|
||
if (!result || result.length === 0) { | ||
return []; | ||
} | ||
|
||
return result; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
import type { EncryptedRecordData, FormattedRecord, UnencryptedRecord, UnencryptedRecordData } from '../types'; | ||
import type { EncryptedRecordData, FormattedRecord, UnencryptedRecordData } from '../types'; | ||
import { Encryption } from '@nangohq/utils'; | ||
import { envs } from '../env.js'; | ||
|
||
|
@@ -14,7 +14,7 @@ function isEncrypted(data: UnencryptedRecordData | EncryptedRecordData): data is | |
return 'encryptedValue' in data; | ||
} | ||
|
||
export function decryptRecord(record: FormattedRecord): UnencryptedRecordData { | ||
export function decryptRecordData(record: FormattedRecord): UnencryptedRecordData { | ||
const encryptionManager = getEncryption(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. renaming to make it clearer that the function only return the unencrypted data not the entire record |
||
const { json } = record; | ||
if (isEncrypted(json)) { | ||
|
@@ -25,17 +25,6 @@ export function decryptRecord(record: FormattedRecord): UnencryptedRecordData { | |
return json; | ||
} | ||
|
||
export function decryptRecords(records: FormattedRecord[]): UnencryptedRecord[] { | ||
const decryptedRecords: UnencryptedRecord[] = []; | ||
for (const record of records) { | ||
decryptedRecords.push({ | ||
...record, | ||
record: decryptRecord(record) | ||
}); | ||
} | ||
return decryptedRecords; | ||
} | ||
|
||
export function encryptRecords(records: FormattedRecord[]): FormattedRecord[] { | ||
const encryptionManager = getEncryption(); | ||
const encryptedDataRecords: FormattedRecord[] = Object.assign([], records); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -69,7 +69,7 @@ export const sendSync = async ({ | |
responseResults?.added === 0 && responseResults?.updated === 0 && (responseResults.deleted === 0 || responseResults.deleted === undefined); | ||
|
||
if (!webhookSettings.on_sync_completion_always && noChanges) { | ||
await logCtx?.info('There were no added, updated, or deleted results. No webhook sent, as per your environment settings'); | ||
await logCtx?.info(`There were no added, updated, or deleted results for model ${model}. No webhook sent, as per your environment settings`); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. unrelated but it is confusing in the logs to see multiple times the same message when sync is multi model |
||
|
||
return Ok(undefined); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we used to make 2 queries to get the existing records to update. :(