diff --git a/services/apps/integration_stream_worker/src/repo/integrationStream.repo.ts b/services/apps/integration_stream_worker/src/repo/integrationStream.repo.ts index b831123098..9e08f9ce26 100644 --- a/services/apps/integration_stream_worker/src/repo/integrationStream.repo.ts +++ b/services/apps/integration_stream_worker/src/repo/integrationStream.repo.ts @@ -6,7 +6,6 @@ import { IntegrationStreamDataState, IntegrationStreamState, WebhookState, - WebhookType, } from '@crowd/types' import { IInsertableWebhookStream, @@ -31,26 +30,25 @@ export default class IntegrationStreamRepository extends RepositoryBase $(discourseType) - and iw.state = $(pendingState) - and iw."createdAt" < now() - interval '1 hour' - limit ${limit} - for update skip locked; + iw."createdAt" AS "createdAt", + i.platform AS "platform" + FROM "incomingWebhooks" iw + INNER JOIN integrations i ON iw."integrationId" = i.id + WHERE NOT EXISTS ( + SELECT 1 FROM integration.streams s WHERE iw.id = s."webhookId" + ) + AND iw.state = $(pendingState) + AND iw."createdAt" < NOW() - INTERVAL '1 hour' + LIMIT ${limit} + FOR UPDATE SKIP LOCKED; `, { - discourseType: WebhookType.DISCOURSE, pendingState: WebhookState.PENDING, }, )