Skip to content

Commit

Permalink
fix: records update
Browse files Browse the repository at this point in the history
  • Loading branch information
TBonnin committed Nov 5, 2024
1 parent e0636fe commit 39fa78c
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 82 deletions.
93 changes: 27 additions & 66 deletions packages/records/lib/models/records.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,8 @@
import dayjs from 'dayjs';
import utc from 'dayjs/plugin/utc.js';
import { db } from '../db/client.js';
import type {
FormattedRecord,
FormattedRecordWithMetadata,
GetRecordsResponse,
LastAction,
RecordCount,
ReturnedRecord,
UnencryptedRecord,
UpsertSummary
} from '../types.js';
import { decryptRecord, decryptRecords, encryptRecords } from '../utils/encryption.js';
import type { FormattedRecord, FormattedRecordWithMetadata, GetRecordsResponse, LastAction, RecordCount, ReturnedRecord, UpsertSummary } from '../types.js';
import { decryptRecordData, encryptRecords } from '../utils/encryption.js';
import { RECORDS_TABLE, RECORD_COUNTS_TABLE } from '../constants.js';
import { removeDuplicateKey, getUniqueId } from '../helpers/uniqueKey.js';
import { logger } from '../utils/logger.js';
Expand Down Expand Up @@ -173,7 +164,7 @@ export async function getRecords({
}

const results = rawResults.map((item) => {
const decryptedData = decryptRecord(item);
const decryptedData = decryptRecordData(item);
const encodedCursor = Buffer.from(`${item.last_modified_at}||${item.id}`).toString('base64');
return {
...decryptedData,
Expand Down Expand Up @@ -322,24 +313,27 @@ export async function update({
for (let i = 0; i < recordsWithoutDuplicates.length; i += BATCH_SIZE) {
const chunk = recordsWithoutDuplicates.slice(i, i + BATCH_SIZE);

updatedKeys.push(...(await getUpdatedKeys({ records: chunk, connectionId, model, trx })));
const oldRecords = await getRecordsToUpdate({ records: chunk, connectionId, model, trx });
if (oldRecords.length === 0) {
logger.warn(`Did not find any record to update: ${{ connectionId, model }}`);
}

const recordsToUpdate: FormattedRecord[] = [];
const rawOldRecords = await getRecordsByExternalIds({ externalIds: updatedKeys, connectionId, model, trx });
for (const rawOldRecord of rawOldRecords) {
if (!rawOldRecord) {
for (const oldRecord of oldRecords) {
const oldRecordData = decryptRecordData(oldRecord);

const inputRecord = chunk.find((record) => record.external_id === oldRecord.external_id);
if (!inputRecord) {
continue;
}

const { record: oldRecord, ...oldRecordRest } = rawOldRecord;

const record = records.find((record) => record.external_id === rawOldRecord.external_id);
const { json: newRecordData, ...newRecordRest } = inputRecord;

const newRecord: FormattedRecord = {
...oldRecordRest,
...newRecordRest,
json: {
...oldRecord,
...record?.json
...oldRecordData,
...newRecordData
},
updated_at: new Date()
};
Expand All @@ -348,10 +342,10 @@ export async function update({
if (recordsToUpdate.length > 0) {
const encryptedRecords = encryptRecords(recordsToUpdate);
await trx.from(RECORDS_TABLE).insert(encryptedRecords).onConflict(['connection_id', 'external_id', 'model']).merge();
updatedKeys.push(...recordsToUpdate.map((record) => record.external_id));
}
}
});

return Ok({
addedKeys: [],
updatedKeys,
Expand Down Expand Up @@ -458,10 +452,10 @@ export async function markNonCurrentGenerationRecordsAsDeleted({
}

/**
* getUpdatedKeys
* @desc returns a list of the keys that exist in the records tables but have a different data_hash
* getRecordsToUpdate
* @desc returns records that exist in the records table but have a different data_hash
*/
async function getUpdatedKeys({
async function getRecordsToUpdate({
records,
connectionId,
model,
Expand All @@ -471,21 +465,20 @@ async function getUpdatedKeys({
connectionId: number;
model: string;
trx: Knex.Transaction;
}): Promise<string[]> {
}): Promise<FormattedRecord[]> {
const keys: string[] = records.map((record: FormattedRecord) => getUniqueId(record));
const keysWithHash: [string, string][] = records.map((record: FormattedRecord) => [getUniqueId(record), record.data_hash]);

const rowsToUpdate = (await trx
return trx
.select<FormattedRecord[]>('*')
.from(RECORDS_TABLE)
.pluck('external_id')
.where({
connection_id: connectionId,
model
})
.debug(true)
.whereIn('external_id', keys)
.whereNotIn(['external_id', 'data_hash'], keysWithHash)) as unknown as string[];

return rowsToUpdate;
.whereNotIn(['external_id', 'data_hash'], keysWithHash);
}

async function getUpsertSummary({
Expand Down Expand Up @@ -523,44 +516,12 @@ async function getUpsertSummary({
};
} else {
const addedKeys = keys.filter((key: string) => !nonDeletedKeys.includes(key));
const updatedKeys = await getUpdatedKeys({ records, connectionId, model, trx });
const updatedRecords = await getRecordsToUpdate({ records, connectionId, model, trx });
return {
addedKeys,
updatedKeys,
updatedKeys: updatedRecords.map((record) => record.external_id),
deletedKeys: [],
nonUniqueKeys: nonUniqueKeys
};
}
}

async function getRecordsByExternalIds({
externalIds,
connectionId,
model,
trx
}: {
externalIds: string[];
connectionId: number;
model: string;
trx: Knex.Transaction;
}): Promise<UnencryptedRecord[]> {
const encryptedRecords = await trx
.from<FormattedRecord>(RECORDS_TABLE)
.where({
connection_id: connectionId,
model
})
.whereIn('external_id', externalIds);

if (!encryptedRecords) {
return [];
}

const result = decryptRecords(encryptedRecords);

if (!result || result.length === 0) {
return [];
}

return result;
}
2 changes: 0 additions & 2 deletions packages/records/lib/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ export interface FormattedRecord {

export type FormattedRecordWithMetadata = FormattedRecord & RecordMetadata;

export type UnencryptedRecord = FormattedRecord & { record: UnencryptedRecordData };

export interface EncryptedRecordData {
iv: string;
authTag: string;
Expand Down
15 changes: 2 additions & 13 deletions packages/records/lib/utils/encryption.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { EncryptedRecordData, FormattedRecord, UnencryptedRecord, UnencryptedRecordData } from '../types';
import type { EncryptedRecordData, FormattedRecord, UnencryptedRecordData } from '../types';
import { Encryption } from '@nangohq/utils';
import { envs } from '../env.js';

Expand All @@ -14,7 +14,7 @@ function isEncrypted(data: UnencryptedRecordData | EncryptedRecordData): data is
return 'encryptedValue' in data;
}

export function decryptRecord(record: FormattedRecord): UnencryptedRecordData {
export function decryptRecordData(record: FormattedRecord): UnencryptedRecordData {
const encryptionManager = getEncryption();
const { json } = record;
if (isEncrypted(json)) {
Expand All @@ -25,17 +25,6 @@ export function decryptRecord(record: FormattedRecord): UnencryptedRecordData {
return json;
}

export function decryptRecords(records: FormattedRecord[]): UnencryptedRecord[] {
const decryptedRecords: UnencryptedRecord[] = [];
for (const record of records) {
decryptedRecords.push({
...record,
record: decryptRecord(record)
});
}
return decryptedRecords;
}

export function encryptRecords(records: FormattedRecord[]): FormattedRecord[] {
const encryptionManager = getEncryption();
const encryptedDataRecords: FormattedRecord[] = Object.assign([], records);
Expand Down
2 changes: 1 addition & 1 deletion packages/webhooks/lib/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ export const sendSync = async ({
responseResults?.added === 0 && responseResults?.updated === 0 && (responseResults.deleted === 0 || responseResults.deleted === undefined);

if (!webhookSettings.on_sync_completion_always && noChanges) {
await logCtx?.info('There were no added, updated, or deleted results. No webhook sent, as per your environment settings');
await logCtx?.info(`There were no added, updated, or deleted results for model ${model}. No webhook sent, as per your environment settings`);

return Ok(undefined);
}
Expand Down

0 comments on commit 39fa78c

Please # to comment.