Skip to content

Commit

Permalink
Merge organizations asynchronously (#1825)
Browse files Browse the repository at this point in the history
  • Loading branch information
sausage-todd committed Nov 9, 2023
1 parent 5e3bec5 commit bdca46b
Show file tree
Hide file tree
Showing 14 changed files with 277 additions and 11 deletions.
17 changes: 10 additions & 7 deletions backend/src/api/organization/organizationMerge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@ import PermissionChecker from '../../services/user/permissionChecker'
export default async (req, res) => {
new PermissionChecker(req).validateHas(Permissions.values.organizationEdit)

const payload = await new OrganizationService(req).merge(
req.params.organizationId,
req.body.organizationToMerge,
)
const primaryOrgId = req.params.organizationId
const secondaryOrgId = req.body.organizationToMerge

track('Merge organizations', { ...payload }, { ...req })
const requestPayload = {
primary: primaryOrgId,
secondary: secondaryOrgId,
}

const status = payload.status || 200
await new OrganizationService(req).mergeAsync(primaryOrgId, secondaryOrgId)

await req.responseHandler.success(req, res, payload, status)
track('Merge organizations', requestPayload, { ...req })

await req.responseHandler.success(req, res, requestPayload)
}
2 changes: 1 addition & 1 deletion backend/src/bin/scripts/merge-duplicated-organizations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ async function mergeOrganizationsWithSameWebsite(): Promise<void> {
const primaryOrganizationId = orgInfo.organizationIds.shift()
for (const orgId of orgInfo.organizationIds) {
log.info(`Merging organization ${orgId} into ${primaryOrganizationId}!`)
await service.merge(primaryOrganizationId, orgId)
await service.mergeSync(primaryOrganizationId, orgId)
}
}
} while (mergeableOrganizations.length > 0)
Expand Down
2 changes: 1 addition & 1 deletion backend/src/bin/scripts/merge-organizations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ if (parameters.help || !parameters.originalId || !parameters.toMergeId || !param
log.info(`Merging ${toMergeId} into ${originalId}...`)
const service = new OrganizationService(options)
try {
await service.merge(originalId, toMergeId)
await service.mergeSync(originalId, toMergeId)
} catch (err) {
log.error(`Error merging organizations: ${err.message}`)
process.exit(1)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE "organizationToMerge" DROP COLUMN status;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE "mergeActions";
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE "organizationToMerge" ADD COLUMN status VARCHAR(16) NOT NULL DEFAULT 'ready';
13 changes: 13 additions & 0 deletions backend/src/database/migrations/V1699459698__merge-actions.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
CREATE TABLE "mergeActions" (
id UUID PRIMARY KEY NOT NULL DEFAULT uuid_generate_v4(),
"tenantId" UUID NOT NULL REFERENCES tenants (id) ON DELETE CASCADE,
type VARCHAR(16) NOT NULL, -- org or member
"primaryId" UUID NOT NULL,
"secondaryId" UUID NOT NULL,
"createdAt" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(),
"updatedAt" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(),
"state" VARCHAR(16) NOT NULL, -- pending, in-progress, done
UNIQUE ("tenantId", type, "primaryId", "secondaryId")
);

CREATE INDEX "mergeActions_main_idx" ON "mergeActions" (type, "primaryId", "secondaryId");
86 changes: 86 additions & 0 deletions backend/src/database/repositories/mergeActionsRepository.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import { QueryTypes } from 'sequelize'
import { IRepositoryOptions } from './IRepositoryOptions'
import SequelizeRepository from './sequelizeRepository'

enum MergeActionType {
ORG = 'org',
MEMBER = 'member',
}

enum MergeActionState {
PENDING = 'pending',
IN_PROGRESS = 'in-progress',
DONE = 'done',
ERROR = 'error',
}

class MergeActionsRepository {
static async add(
type: MergeActionType,
primaryId: string,
secondaryId: string,
options: IRepositoryOptions,
) {
const transaction = SequelizeRepository.getTransaction(options)
const tenantId = options.currentTenant.id

await options.database.sequelize.query(
`
INSERT INTO "mergeActions" ("tenantId", "type", "primaryId", "secondaryId", state)
VALUES (:tenantId, :type, :primaryId, :secondaryId, :state)
ON CONFLICT ("tenantId", "type", "primaryId", "secondaryId")
DO UPDATE SET state = :state
`,
{
replacements: {
tenantId,
type,
primaryId,
secondaryId,
state: MergeActionState.PENDING,
},
type: QueryTypes.INSERT,
transaction,
},
)
}

static async setState(
type: MergeActionType,
primaryId: string,
secondaryId: string,
state: MergeActionState,
options: IRepositoryOptions,
) {
const transaction = SequelizeRepository.getTransaction(options)
const tenantId = options.currentTenant.id

// eslint-disable-next-line @typescript-eslint/no-unused-vars
const [_, rowCount] = await options.database.sequelize.query(
`
UPDATE "mergeActions"
SET state = :state
WHERE "tenantId" = :tenantId
AND type = :type
AND "primaryId" = :primaryId
AND "secondaryId" = :secondaryId
AND state != :state
`,
{
replacements: {
tenantId,
type,
primaryId,
secondaryId,
state,
},
type: QueryTypes.UPDATE,
transaction,
},
)

return rowCount > 0
}
}

export { MergeActionsRepository, MergeActionType, MergeActionState }
11 changes: 11 additions & 0 deletions backend/src/database/repositories/organizationRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import OrganizationSyncRemoteRepository from './organizationSyncRemoteRepository
import isFeatureEnabled from '@/feature-flags/isFeatureEnabled'
import { SegmentData } from '@/types/segmentTypes'
import SegmentRepository from './segmentRepository'
import { MergeActionType, MergeActionState } from './mergeActionsRepository'

const { Op } = Sequelize

Expand Down Expand Up @@ -1567,9 +1568,17 @@ class OrganizationRepository {
JOIN "organizationToMerge" otm ON org.id = otm."organizationId"
JOIN "organizationSegments" os ON os."organizationId" = org.id
JOIN "organizationSegments" to_merge_segments on to_merge_segments."organizationId" = otm."toMergeId"
LEFT JOIN "mergeActions" ma
ON ma.type = :mergeActionType
AND ma."tenantId" = :tenantId
AND (
(ma."primaryId" = org.id AND ma."secondaryId" = otm."toMergeId")
OR (ma."primaryId" = otm."toMergeId" AND ma."secondaryId" = org.id)
)
WHERE org."tenantId" = :tenantId
AND os."segmentId" IN (:segmentIds)
AND to_merge_segments."segmentId" IN (:segmentIds)
AND (ma.id IS NULL OR ma.state = :mergeActionStatus)
),
count_cte AS (
Expand Down Expand Up @@ -1605,6 +1614,8 @@ class OrganizationRepository {
segmentIds,
limit,
offset,
mergeActionType: MergeActionType.ORG,
mergeActionStatus: MergeActionState.ERROR,
},
type: QueryTypes.SELECT,
},
Expand Down
8 changes: 8 additions & 0 deletions backend/src/serverless/microservices/nodejs/messageTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,11 @@ export type OrganizationBulkEnrichMessage = {
tenantId: string
maxEnrichLimit: number
}

export type OrganizationMergeMessage = {
service: string
tenantId: string
primaryOrgId: string
secondaryOrgId: string
notifyFrontend?: boolean
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import { getRedisClient, RedisPubSubEmitter } from '@crowd/redis'
import { ApiWebsocketMessage } from '@crowd/types'
import { REDIS_CONFIG } from '../../../../conf'
import getUserContext from '../../../../database/utils/getUserContext'
import OrganizationService from '../../../../services/organizationService'

async function doNotifyFrontend({ log, success, tenantId, primaryOrgId, secondaryOrgId }) {
const redis = await getRedisClient(REDIS_CONFIG, true)
const apiPubSubEmitter = new RedisPubSubEmitter(
'api-pubsub',
redis,
(err) => {
log.error({ err }, 'Error in api-ws emitter!')
},
log,
)

apiPubSubEmitter.emit(
'user',
new ApiWebsocketMessage(
'org-merge',
JSON.stringify({
success,
tenantId,
primaryOrgId,
secondaryOrgId,
}),
undefined,
tenantId,
),
)
}

async function orgMergeWorker(
tenantId: string,
primaryOrgId: string,
secondaryOrgId: string,
notifyFrontend: boolean,
) {
const userContext = await getUserContext(tenantId)

const organizationService = new OrganizationService(userContext)

let success = true
try {
await organizationService.mergeSync(primaryOrgId, secondaryOrgId)
} catch (err) {
userContext.log.error(err, 'Error merging orgs')
success = false
}

if (notifyFrontend) {
await doNotifyFrontend({
log: userContext.log,
success,
tenantId,
primaryOrgId,
secondaryOrgId,
})
}
}
export { orgMergeWorker }
11 changes: 11 additions & 0 deletions backend/src/serverless/microservices/nodejs/workerFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
EagleEyeEmailDigestMessage,
IntegrationDataCheckerMessage,
OrganizationBulkEnrichMessage,
OrganizationMergeMessage,
} from './messageTypes'
import newActivityWorker from './automation/workers/newActivityWorker'
import newMemberWorker from './automation/workers/newMemberWorker'
Expand All @@ -26,6 +27,7 @@ import { eagleEyeEmailDigestWorker } from './eagle-eye-email-digest/eagleEyeEmai
import { integrationDataCheckerWorker } from './integration-data-checker/integrationDataCheckerWorker'
import { refreshSampleDataWorker } from './integration-data-checker/refreshSampleDataWorker'
import { mergeSuggestionsWorker } from './merge-suggestions/mergeSuggestionsWorker'
import { orgMergeWorker } from './org-merge/orgMergeWorker'
import { BulkorganizationEnrichmentWorker } from './bulk-enrichment/bulkOrganizationEnrichmentWorker'
import { API_CONFIG } from '../../../conf'

Expand Down Expand Up @@ -138,6 +140,15 @@ async function workerFactory(event: NodeMicroserviceMessage): Promise<any> {
default:
throw new Error(`Invalid automation trigger ${automationRequest.trigger}!`)
}
case 'org-merge':
const orgMergeMessage = event as OrganizationMergeMessage
return orgMergeWorker(
orgMergeMessage.tenantId,
orgMergeMessage.primaryOrgId,
orgMergeMessage.secondaryOrgId,
orgMergeMessage.notifyFrontend,
)

default:
throw new Error(`Invalid microservice ${service}`)
}
Expand Down
17 changes: 17 additions & 0 deletions backend/src/serverless/utils/nodeWorkerSQS.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,20 @@ export const sendBulkEnrichMessage = async (
}
await sendNodeWorkerMessage(tenant, payload as NodeWorkerMessageBase)
}

export const sendOrgMergeMessage = async (
tenantId: string,
primaryOrgId: string,
secondaryOrgId: string,
notifyFrontend: boolean = true,
): Promise<void> => {
const payload = {
type: NodeWorkerMessageType.NODE_MICROSERVICE,
service: 'org-merge',
tenantId,
primaryOrgId,
secondaryOrgId,
notifyFrontend,
}
await sendNodeWorkerMessage(tenantId, payload as NodeWorkerMessageBase)
}
Loading

0 comments on commit bdca46b

Please # to comment.