Skip to content

Commit b9f2392

Browse files
committed
feat: improve campaign retries
1 parent febd0b4 commit b9f2392

File tree

2 files changed

+48
-17
lines changed

2 files changed

+48
-17
lines changed

apps/api/src/chat/entities/campaign-message.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,6 @@ export class CampaignMessage extends BaseEntity {
1111
@prop({ required: true })
1212
readonly address: string
1313

14-
@prop({ required: true })
14+
@prop()
1515
messageId: string
1616
}

apps/api/src/chat/services/broadcast.consumer.ts

Lines changed: 47 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { JobNonRetriableError } from '@app/common/errors/job-non-retriable-error'
2+
import { wait } from '@app/common/utils/async.utils'
23
import { XmtpLib } from '@app/definitions/integration-definitions/xmtp/xmtp.lib'
34
import { getWalletName } from '@app/definitions/utils/address.utils'
45
import { sendXmtpMessage } from '@chainjet/tools/dist/messages'
@@ -62,16 +63,13 @@ export class BroadcastConsumer {
6263
})
6364
const client = await XmtpLib.getClient(accountCredential.credentials.keys)
6465

65-
// if this is a retry, filter out contacts that have already been sent a message
66-
if (job.attemptsMade > 0) {
67-
const campaignMessages = await this.campaignMessageService.find({
68-
campaign: campaign._id,
69-
})
70-
const campaignMessageAddresses = campaignMessages.map((campaignMessage) => campaignMessage.address)
71-
contacts = contacts.filter((contact) => !campaignMessageAddresses.includes(contact.address))
72-
}
73-
74-
this.logger.log(`Sending campaign ${campaign._id} to ${contacts.length} contacts`)
66+
// filter out contacts that have already been sent a message for this campaign
67+
const campaignMessages = await this.campaignMessageService.find({
68+
campaign: campaign._id,
69+
})
70+
const campaignMessageAddresses = campaignMessages.map((campaignMessage) => campaignMessage.address)
71+
const totalContacts = contacts.length
72+
contacts = contacts.filter((contact) => !campaignMessageAddresses.includes(contact.address))
7573

7674
if (campaign.state === CampaignState.Pending) {
7775
campaign.state = CampaignState.Running
@@ -87,13 +85,16 @@ export class BroadcastConsumer {
8785
)
8886
}
8987

90-
campaign.delivered = 0
91-
campaign.total = contacts.length
88+
campaign.delivered = campaignMessages.length
89+
campaign.total = totalContacts
9290
const uniqueAddresses = new Set<string>()
91+
let failed = 0
9392

9493
const walletName = (await getWalletName(user.address)) ?? user.address
9594
const unsubscribeMessage = `To unsubscribe from these messages: https://unsubscribe.chainjet.io/${walletName}`
9695

96+
this.logger.log(`Sending campaign ${campaign._id} to ${contacts.length} contacts`)
97+
9798
for (const contact of contacts) {
9899
const sendTo = contact.notificationAddress ?? contact.address
99100
if (uniqueAddresses.has(sendTo)) {
@@ -120,10 +121,21 @@ export class BroadcastConsumer {
120121
this.logger.log(
121122
`Sent broadcast message from ${user.address} to ${sendTo} (${campaign.processed}/${campaign.total})`,
122123
)
123-
} catch {}
124-
campaign.processed++
125-
job.progress(campaign.processed / campaign.total)
126-
124+
campaign.processed++
125+
job.progress(campaign.processed / campaign.total)
126+
} catch (e) {
127+
if (e.message.includes('is not on the XMTP network')) {
128+
await this.campaignMessageService.createOne({
129+
campaign: campaign._id,
130+
address: contact.address,
131+
})
132+
campaign.processed++
133+
job.progress(campaign.processed / campaign.total)
134+
} else {
135+
this.logger.error(`Failed to send broadcast message from ${user.address} to ${sendTo}: ${e.message}`)
136+
failed++
137+
}
138+
}
127139
// update the campaign status every 100 contacts
128140
if (campaign.processed > 0 && campaign.processed % 100 === 0) {
129141
await this.campaignService.updateOneNative(
@@ -141,6 +153,25 @@ export class BroadcastConsumer {
141153
}
142154
}
143155

156+
// if any messages failed to send with unexpected reasons, retry the job
157+
if (failed > 0) {
158+
await this.campaignService.updateOneNative(
159+
{
160+
_id: campaign._id,
161+
},
162+
{
163+
$set: {
164+
delivered: campaign.delivered,
165+
processed: campaign.processed,
166+
total: campaign.total,
167+
},
168+
},
169+
)
170+
this.logger.error(`Failed to send ${failed}/${campaign.total} messages for campaign ${campaign._id}. Retrying...`)
171+
await wait(10000)
172+
return await this.send(job)
173+
}
174+
144175
await this.campaignService.updateOneNative(
145176
{
146177
_id: campaign._id,

0 commit comments

Comments
 (0)