Skip to content

Commit 600aa84

Browse files
committed
feat: improve campaign retries
1 parent febd0b4 commit 600aa84

File tree

2 files changed

+45
-15
lines changed

2 files changed

+45
-15
lines changed

apps/api/src/chat/entities/campaign-message.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,6 @@ export class CampaignMessage extends BaseEntity {
1111
@prop({ required: true })
1212
readonly address: string
1313

14-
@prop({ required: true })
14+
@prop()
1515
messageId: string
1616
}

apps/api/src/chat/services/broadcast.consumer.ts

Lines changed: 44 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { JobNonRetriableError } from '@app/common/errors/job-non-retriable-error'
2+
import { wait } from '@app/common/utils/async.utils'
23
import { XmtpLib } from '@app/definitions/integration-definitions/xmtp/xmtp.lib'
34
import { getWalletName } from '@app/definitions/utils/address.utils'
45
import { sendXmtpMessage } from '@chainjet/tools/dist/messages'
@@ -62,15 +63,6 @@ export class BroadcastConsumer {
6263
})
6364
const client = await XmtpLib.getClient(accountCredential.credentials.keys)
6465

65-
// if this is a retry, filter out contacts that have already been sent a message
66-
if (job.attemptsMade > 0) {
67-
const campaignMessages = await this.campaignMessageService.find({
68-
campaign: campaign._id,
69-
})
70-
const campaignMessageAddresses = campaignMessages.map((campaignMessage) => campaignMessage.address)
71-
contacts = contacts.filter((contact) => !campaignMessageAddresses.includes(contact.address))
72-
}
73-
7466
this.logger.log(`Sending campaign ${campaign._id} to ${contacts.length} contacts`)
7567

7668
if (campaign.state === CampaignState.Pending) {
@@ -87,9 +79,17 @@ export class BroadcastConsumer {
8779
)
8880
}
8981

90-
campaign.delivered = 0
82+
campaign.delivered = campaign.delivered ?? 0
9183
campaign.total = contacts.length
9284
const uniqueAddresses = new Set<string>()
85+
let failed = 0
86+
87+
// filter out contacts that have already been sent a message for this campaign
88+
const campaignMessages = await this.campaignMessageService.find({
89+
campaign: campaign._id,
90+
})
91+
const campaignMessageAddresses = campaignMessages.map((campaignMessage) => campaignMessage.address)
92+
contacts = contacts.filter((contact) => !campaignMessageAddresses.includes(contact.address))
9393

9494
const walletName = (await getWalletName(user.address)) ?? user.address
9595
const unsubscribeMessage = `To unsubscribe from these messages: https://unsubscribe.chainjet.io/${walletName}`
@@ -120,10 +120,21 @@ export class BroadcastConsumer {
120120
this.logger.log(
121121
`Sent broadcast message from ${user.address} to ${sendTo} (${campaign.processed}/${campaign.total})`,
122122
)
123-
} catch {}
124-
campaign.processed++
125-
job.progress(campaign.processed / campaign.total)
126-
123+
campaign.processed++
124+
job.progress(campaign.processed / campaign.total)
125+
} catch (e) {
126+
if (e.message.includes('is not on the XMTP network')) {
127+
await this.campaignMessageService.createOne({
128+
campaign: campaign._id,
129+
address: contact.address,
130+
})
131+
campaign.processed++
132+
job.progress(campaign.processed / campaign.total)
133+
} else {
134+
this.logger.error(`Failed to send broadcast message from ${user.address} to ${sendTo}: ${e.message}`)
135+
failed++
136+
}
137+
}
127138
// update the campaign status every 100 contacts
128139
if (campaign.processed > 0 && campaign.processed % 100 === 0) {
129140
await this.campaignService.updateOneNative(
@@ -141,6 +152,25 @@ export class BroadcastConsumer {
141152
}
142153
}
143154

155+
// if any messages failed to send with unexpected reasons, retry the job
156+
if (failed > 0) {
157+
await this.campaignService.updateOneNative(
158+
{
159+
_id: campaign._id,
160+
},
161+
{
162+
$set: {
163+
delivered: campaign.delivered,
164+
processed: campaign.processed,
165+
total: campaign.total,
166+
},
167+
},
168+
)
169+
this.logger.error(`Failed to send ${failed}/${campaign.total} messages for campaign ${campaign._id}. Retrying...`)
170+
await wait(10000)
171+
return await this.send(job)
172+
}
173+
144174
await this.campaignService.updateOneNative(
145175
{
146176
_id: campaign._id,

0 commit comments

Comments
 (0)