From bdca46bc5b9df6b41e3a93c0e726940f5ddc8590 Mon Sep 17 00:00:00 2001 From: Mish Savelyev <1564970+sausage-todd@users.noreply.github.com> Date: Thu, 9 Nov 2023 12:11:10 +0100 Subject: [PATCH] Merge organizations asynchronously (#1825) --- .../src/api/organization/organizationMerge.ts | 17 ++-- .../scripts/merge-duplicated-organizations.ts | 2 +- .../src/bin/scripts/merge-organizations.ts | 2 +- .../U1699357748__org-merge-status.sql | 1 + .../migrations/U1699459698__merge-actions.sql | 1 + .../V1699357748__org-merge-status.sql | 1 + .../migrations/V1699459698__merge-actions.sql | 13 +++ .../repositories/mergeActionsRepository.ts | 86 +++++++++++++++++++ .../repositories/organizationRepository.ts | 11 +++ .../microservices/nodejs/messageTypes.ts | 8 ++ .../nodejs/org-merge/orgMergeWorker.ts | 62 +++++++++++++ .../microservices/nodejs/workerFactory.ts | 11 +++ backend/src/serverless/utils/nodeWorkerSQS.ts | 17 ++++ backend/src/services/organizationService.ts | 56 +++++++++++- 14 files changed, 277 insertions(+), 11 deletions(-) create mode 100644 backend/src/database/migrations/U1699357748__org-merge-status.sql create mode 100644 backend/src/database/migrations/U1699459698__merge-actions.sql create mode 100644 backend/src/database/migrations/V1699357748__org-merge-status.sql create mode 100644 backend/src/database/migrations/V1699459698__merge-actions.sql create mode 100644 backend/src/database/repositories/mergeActionsRepository.ts create mode 100644 backend/src/serverless/microservices/nodejs/org-merge/orgMergeWorker.ts diff --git a/backend/src/api/organization/organizationMerge.ts b/backend/src/api/organization/organizationMerge.ts index 040c9deac7..0ed9e2cc43 100644 --- a/backend/src/api/organization/organizationMerge.ts +++ b/backend/src/api/organization/organizationMerge.ts @@ -6,14 +6,17 @@ import PermissionChecker from '../../services/user/permissionChecker' export default async (req, res) => { new PermissionChecker(req).validateHas(Permissions.values.organizationEdit) - const payload = await new OrganizationService(req).merge( - req.params.organizationId, - req.body.organizationToMerge, - ) + const primaryOrgId = req.params.organizationId + const secondaryOrgId = req.body.organizationToMerge - track('Merge organizations', { ...payload }, { ...req }) + const requestPayload = { + primary: primaryOrgId, + secondary: secondaryOrgId, + } - const status = payload.status || 200 + await new OrganizationService(req).mergeAsync(primaryOrgId, secondaryOrgId) - await req.responseHandler.success(req, res, payload, status) + track('Merge organizations', requestPayload, { ...req }) + + await req.responseHandler.success(req, res, requestPayload) } diff --git a/backend/src/bin/scripts/merge-duplicated-organizations.ts b/backend/src/bin/scripts/merge-duplicated-organizations.ts index 19902c3d09..54e845a493 100644 --- a/backend/src/bin/scripts/merge-duplicated-organizations.ts +++ b/backend/src/bin/scripts/merge-duplicated-organizations.ts @@ -63,7 +63,7 @@ async function mergeOrganizationsWithSameWebsite(): Promise { const primaryOrganizationId = orgInfo.organizationIds.shift() for (const orgId of orgInfo.organizationIds) { log.info(`Merging organization ${orgId} into ${primaryOrganizationId}!`) - await service.merge(primaryOrganizationId, orgId) + await service.mergeSync(primaryOrganizationId, orgId) } } } while (mergeableOrganizations.length > 0) diff --git a/backend/src/bin/scripts/merge-organizations.ts b/backend/src/bin/scripts/merge-organizations.ts index a3513af201..ee96697d4c 100644 --- a/backend/src/bin/scripts/merge-organizations.ts +++ b/backend/src/bin/scripts/merge-organizations.ts @@ -87,7 +87,7 @@ if (parameters.help || !parameters.originalId || !parameters.toMergeId || !param log.info(`Merging ${toMergeId} into ${originalId}...`) const service = new OrganizationService(options) try { - await service.merge(originalId, toMergeId) + await service.mergeSync(originalId, toMergeId) } catch (err) { log.error(`Error merging organizations: ${err.message}`) process.exit(1) diff --git a/backend/src/database/migrations/U1699357748__org-merge-status.sql b/backend/src/database/migrations/U1699357748__org-merge-status.sql new file mode 100644 index 0000000000..ab4c21703d --- /dev/null +++ b/backend/src/database/migrations/U1699357748__org-merge-status.sql @@ -0,0 +1 @@ +ALTER TABLE "organizationToMerge" DROP COLUMN status; diff --git a/backend/src/database/migrations/U1699459698__merge-actions.sql b/backend/src/database/migrations/U1699459698__merge-actions.sql new file mode 100644 index 0000000000..47d99af6ba --- /dev/null +++ b/backend/src/database/migrations/U1699459698__merge-actions.sql @@ -0,0 +1 @@ +DROP TABLE "mergeActions"; diff --git a/backend/src/database/migrations/V1699357748__org-merge-status.sql b/backend/src/database/migrations/V1699357748__org-merge-status.sql new file mode 100644 index 0000000000..c72e446c64 --- /dev/null +++ b/backend/src/database/migrations/V1699357748__org-merge-status.sql @@ -0,0 +1 @@ +ALTER TABLE "organizationToMerge" ADD COLUMN status VARCHAR(16) NOT NULL DEFAULT 'ready'; diff --git a/backend/src/database/migrations/V1699459698__merge-actions.sql b/backend/src/database/migrations/V1699459698__merge-actions.sql new file mode 100644 index 0000000000..f50452bccf --- /dev/null +++ b/backend/src/database/migrations/V1699459698__merge-actions.sql @@ -0,0 +1,13 @@ +CREATE TABLE "mergeActions" ( + id UUID PRIMARY KEY NOT NULL DEFAULT uuid_generate_v4(), + "tenantId" UUID NOT NULL REFERENCES tenants (id) ON DELETE CASCADE, + type VARCHAR(16) NOT NULL, -- org or member + "primaryId" UUID NOT NULL, + "secondaryId" UUID NOT NULL, + "createdAt" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(), + "updatedAt" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(), + "state" VARCHAR(16) NOT NULL, -- pending, in-progress, done + UNIQUE ("tenantId", type, "primaryId", "secondaryId") +); + +CREATE INDEX "mergeActions_main_idx" ON "mergeActions" (type, "primaryId", "secondaryId"); diff --git a/backend/src/database/repositories/mergeActionsRepository.ts b/backend/src/database/repositories/mergeActionsRepository.ts new file mode 100644 index 0000000000..9913ccd154 --- /dev/null +++ b/backend/src/database/repositories/mergeActionsRepository.ts @@ -0,0 +1,86 @@ +import { QueryTypes } from 'sequelize' +import { IRepositoryOptions } from './IRepositoryOptions' +import SequelizeRepository from './sequelizeRepository' + +enum MergeActionType { + ORG = 'org', + MEMBER = 'member', +} + +enum MergeActionState { + PENDING = 'pending', + IN_PROGRESS = 'in-progress', + DONE = 'done', + ERROR = 'error', +} + +class MergeActionsRepository { + static async add( + type: MergeActionType, + primaryId: string, + secondaryId: string, + options: IRepositoryOptions, + ) { + const transaction = SequelizeRepository.getTransaction(options) + const tenantId = options.currentTenant.id + + await options.database.sequelize.query( + ` + INSERT INTO "mergeActions" ("tenantId", "type", "primaryId", "secondaryId", state) + VALUES (:tenantId, :type, :primaryId, :secondaryId, :state) + ON CONFLICT ("tenantId", "type", "primaryId", "secondaryId") + DO UPDATE SET state = :state + `, + { + replacements: { + tenantId, + type, + primaryId, + secondaryId, + state: MergeActionState.PENDING, + }, + type: QueryTypes.INSERT, + transaction, + }, + ) + } + + static async setState( + type: MergeActionType, + primaryId: string, + secondaryId: string, + state: MergeActionState, + options: IRepositoryOptions, + ) { + const transaction = SequelizeRepository.getTransaction(options) + const tenantId = options.currentTenant.id + + // eslint-disable-next-line @typescript-eslint/no-unused-vars + const [_, rowCount] = await options.database.sequelize.query( + ` + UPDATE "mergeActions" + SET state = :state + WHERE "tenantId" = :tenantId + AND type = :type + AND "primaryId" = :primaryId + AND "secondaryId" = :secondaryId + AND state != :state + `, + { + replacements: { + tenantId, + type, + primaryId, + secondaryId, + state, + }, + type: QueryTypes.UPDATE, + transaction, + }, + ) + + return rowCount > 0 + } +} + +export { MergeActionsRepository, MergeActionType, MergeActionState } diff --git a/backend/src/database/repositories/organizationRepository.ts b/backend/src/database/repositories/organizationRepository.ts index 54d6b81b9f..1be135f2a0 100644 --- a/backend/src/database/repositories/organizationRepository.ts +++ b/backend/src/database/repositories/organizationRepository.ts @@ -25,6 +25,7 @@ import OrganizationSyncRemoteRepository from './organizationSyncRemoteRepository import isFeatureEnabled from '@/feature-flags/isFeatureEnabled' import { SegmentData } from '@/types/segmentTypes' import SegmentRepository from './segmentRepository' +import { MergeActionType, MergeActionState } from './mergeActionsRepository' const { Op } = Sequelize @@ -1567,9 +1568,17 @@ class OrganizationRepository { JOIN "organizationToMerge" otm ON org.id = otm."organizationId" JOIN "organizationSegments" os ON os."organizationId" = org.id JOIN "organizationSegments" to_merge_segments on to_merge_segments."organizationId" = otm."toMergeId" + LEFT JOIN "mergeActions" ma + ON ma.type = :mergeActionType + AND ma."tenantId" = :tenantId + AND ( + (ma."primaryId" = org.id AND ma."secondaryId" = otm."toMergeId") + OR (ma."primaryId" = otm."toMergeId" AND ma."secondaryId" = org.id) + ) WHERE org."tenantId" = :tenantId AND os."segmentId" IN (:segmentIds) AND to_merge_segments."segmentId" IN (:segmentIds) + AND (ma.id IS NULL OR ma.state = :mergeActionStatus) ), count_cte AS ( @@ -1605,6 +1614,8 @@ class OrganizationRepository { segmentIds, limit, offset, + mergeActionType: MergeActionType.ORG, + mergeActionStatus: MergeActionState.ERROR, }, type: QueryTypes.SELECT, }, diff --git a/backend/src/serverless/microservices/nodejs/messageTypes.ts b/backend/src/serverless/microservices/nodejs/messageTypes.ts index 39c161898f..30b9aa44dc 100644 --- a/backend/src/serverless/microservices/nodejs/messageTypes.ts +++ b/backend/src/serverless/microservices/nodejs/messageTypes.ts @@ -86,3 +86,11 @@ export type OrganizationBulkEnrichMessage = { tenantId: string maxEnrichLimit: number } + +export type OrganizationMergeMessage = { + service: string + tenantId: string + primaryOrgId: string + secondaryOrgId: string + notifyFrontend?: boolean +} diff --git a/backend/src/serverless/microservices/nodejs/org-merge/orgMergeWorker.ts b/backend/src/serverless/microservices/nodejs/org-merge/orgMergeWorker.ts new file mode 100644 index 0000000000..747927ddd2 --- /dev/null +++ b/backend/src/serverless/microservices/nodejs/org-merge/orgMergeWorker.ts @@ -0,0 +1,62 @@ +import { getRedisClient, RedisPubSubEmitter } from '@crowd/redis' +import { ApiWebsocketMessage } from '@crowd/types' +import { REDIS_CONFIG } from '../../../../conf' +import getUserContext from '../../../../database/utils/getUserContext' +import OrganizationService from '../../../../services/organizationService' + +async function doNotifyFrontend({ log, success, tenantId, primaryOrgId, secondaryOrgId }) { + const redis = await getRedisClient(REDIS_CONFIG, true) + const apiPubSubEmitter = new RedisPubSubEmitter( + 'api-pubsub', + redis, + (err) => { + log.error({ err }, 'Error in api-ws emitter!') + }, + log, + ) + + apiPubSubEmitter.emit( + 'user', + new ApiWebsocketMessage( + 'org-merge', + JSON.stringify({ + success, + tenantId, + primaryOrgId, + secondaryOrgId, + }), + undefined, + tenantId, + ), + ) +} + +async function orgMergeWorker( + tenantId: string, + primaryOrgId: string, + secondaryOrgId: string, + notifyFrontend: boolean, +) { + const userContext = await getUserContext(tenantId) + + const organizationService = new OrganizationService(userContext) + + let success = true + try { + await organizationService.mergeSync(primaryOrgId, secondaryOrgId) + } catch (err) { + userContext.log.error(err, 'Error merging orgs') + success = false + } + + if (notifyFrontend) { + await doNotifyFrontend({ + log: userContext.log, + success, + tenantId, + primaryOrgId, + secondaryOrgId, + }) + } +} +export { orgMergeWorker } diff --git a/backend/src/serverless/microservices/nodejs/workerFactory.ts b/backend/src/serverless/microservices/nodejs/workerFactory.ts index 47aecaa53d..8dc8d43972 100644 --- a/backend/src/serverless/microservices/nodejs/workerFactory.ts +++ b/backend/src/serverless/microservices/nodejs/workerFactory.ts @@ -13,6 +13,7 @@ import { EagleEyeEmailDigestMessage, IntegrationDataCheckerMessage, OrganizationBulkEnrichMessage, + OrganizationMergeMessage, } from './messageTypes' import newActivityWorker from './automation/workers/newActivityWorker' import newMemberWorker from './automation/workers/newMemberWorker' @@ -26,6 +27,7 @@ import { eagleEyeEmailDigestWorker } from './eagle-eye-email-digest/eagleEyeEmai import { integrationDataCheckerWorker } from './integration-data-checker/integrationDataCheckerWorker' import { refreshSampleDataWorker } from './integration-data-checker/refreshSampleDataWorker' import { mergeSuggestionsWorker } from './merge-suggestions/mergeSuggestionsWorker' +import { orgMergeWorker } from './org-merge/orgMergeWorker' import { BulkorganizationEnrichmentWorker } from './bulk-enrichment/bulkOrganizationEnrichmentWorker' import { API_CONFIG } from '../../../conf' @@ -138,6 +140,15 @@ async function workerFactory(event: NodeMicroserviceMessage): Promise { default: throw new Error(`Invalid automation trigger ${automationRequest.trigger}!`) } + case 'org-merge': + const orgMergeMessage = event as OrganizationMergeMessage + return orgMergeWorker( + orgMergeMessage.tenantId, + orgMergeMessage.primaryOrgId, + orgMergeMessage.secondaryOrgId, + orgMergeMessage.notifyFrontend, + ) + default: throw new Error(`Invalid microservice ${service}`) } diff --git a/backend/src/serverless/utils/nodeWorkerSQS.ts b/backend/src/serverless/utils/nodeWorkerSQS.ts index 61727d89be..c525a059ed 100644 --- a/backend/src/serverless/utils/nodeWorkerSQS.ts +++ b/backend/src/serverless/utils/nodeWorkerSQS.ts @@ -152,3 +152,20 @@ export const sendBulkEnrichMessage = async ( } await sendNodeWorkerMessage(tenant, payload as NodeWorkerMessageBase) } + +export const sendOrgMergeMessage = async ( + tenantId: string, + primaryOrgId: string, + secondaryOrgId: string, + notifyFrontend: boolean = true, +): Promise => { + const payload = { + type: NodeWorkerMessageType.NODE_MICROSERVICE, + service: 'org-merge', + tenantId, + primaryOrgId, + secondaryOrgId, + notifyFrontend, + } + await sendNodeWorkerMessage(tenantId, payload as NodeWorkerMessageBase) +} diff --git a/backend/src/services/organizationService.ts b/backend/src/services/organizationService.ts index 5714f98c33..b965e5f84f 100644 --- a/backend/src/services/organizationService.ts +++ b/backend/src/services/organizationService.ts @@ -20,6 +20,12 @@ import { mergeUniqueStringArrayItems, } from './helpers/mergeFunctions' import SearchSyncService from './searchSyncService' +import { sendOrgMergeMessage } from '../serverless/utils/nodeWorkerSQS' +import { + MergeActionsRepository, + MergeActionType, + MergeActionState, +} from '../database/repositories/mergeActionsRepository' export default class OrganizationService extends LoggerBase { options: IServiceOptions @@ -37,7 +43,15 @@ export default class OrganizationService extends LoggerBase { return enrichP && (CLEARBIT_CONFIG.apiKey || IS_TEST_ENV) } - async merge(originalId, toMergeId) { + async mergeAsync(originalId, toMergeId) { + const tenantId = this.options.currentTenant.id + + await MergeActionsRepository.add(MergeActionType.ORG, originalId, toMergeId, this.options) + + await sendOrgMergeMessage(tenantId, originalId, toMergeId) + } + + async mergeSync(originalId, toMergeId) { this.options.log.info({ originalId, toMergeId }, 'Merging organizations!') const removeExtraFields = (organization: IOrganization): IOrganization => @@ -63,6 +77,23 @@ export default class OrganizationService extends LoggerBase { } } + const mergeStatusChanged = await MergeActionsRepository.setState( + MergeActionType.ORG, + originalId, + toMergeId, + MergeActionState.IN_PROGRESS, + // not using transaction here on purpose, + // so this change is visible until we finish + this.options, + ) + if (!mergeStatusChanged) { + this.log.info('[Merge Organizations] - Merging already in progress!') + return { + status: 203, + mergedId: originalId, + } + } + const repoOptions: IRepositoryOptions = await SequelizeRepository.createTransactionalRepositoryOptions(this.options) tx = repoOptions.transaction @@ -155,6 +186,14 @@ export default class OrganizationService extends LoggerBase { await SequelizeRepository.commitTransaction(tx) + await MergeActionsRepository.setState( + MergeActionType.ORG, + originalId, + toMergeId, + MergeActionState.DONE, + this.options, + ) + const searchSyncService = new SearchSyncService(this.options) await searchSyncService.triggerOrganizationSync(this.options.currentTenant.id, originalId) @@ -169,10 +208,23 @@ export default class OrganizationService extends LoggerBase { this.options.log.info({ originalId, toMergeId }, 'Organizations merged!') return { status: 200, mergedId: originalId } } catch (err) { - this.options.log.error(err, 'Error while merging organizations!') + this.options.log.error(err, 'Error while merging organizations!', { + originalId, + toMergeId, + }) + + await MergeActionsRepository.setState( + MergeActionType.ORG, + originalId, + toMergeId, + MergeActionState.ERROR, + this.options, + ) + if (tx) { await SequelizeRepository.rollbackTransaction(tx) } + throw err } }