Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Support for passing a function to terminateVisibilityTimeout #1

Merged
merged 4 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ Creates a new SQS consumer.
* `batchSize` - _Number_ - The number of messages to request from SQS when polling (default `1`). This cannot be higher than the AWS limit of 10.
* `visibilityTimeout` - _Number_ - The duration (in seconds) that the received messages are hidden from subsequent retrieve requests after being retrieved by a ReceiveMessage request.
* `heartbeatInterval` - _Number_ - The interval (in seconds) between requests to extend the message visibility timeout. On each heartbeat the visibility is extended by adding `visibilityTimeout` to the number of seconds since the start of the handler function. This value must less than `visibilityTimeout`.
* `terminateVisibilityTimeout` - _Boolean_ - If true, sets the message visibility timeout to 0 after a `processing_error` (defaults to `false`).
* `terminateVisibilityTimeout` - _Boolean_ or _Function_ - If a _Boolean_ and true, sets the message visibility timeout to 0 after a `processing_error` (defaults to `false`). If a _Function_, the function receives the SQS message as its first argument, and should return the visibility timeout to apply for the message.
* `waitTimeSeconds` - _Number_ - The duration (in seconds) for which the call will wait for a message to arrive in the queue before returning.
* `authenticationErrorTimeout` - _Number_ - The duration (in milliseconds) to wait before retrying after an authentication error (defaults to `10000`).
* `pollingWaitTimeMs` - _Number_ - The duration (in milliseconds) to wait before repolling the queue (defaults to `0`).
Expand Down
27 changes: 18 additions & 9 deletions src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ export interface ConsumerOptions {
waitTimeSeconds?: number;
authenticationErrorTimeout?: number;
pollingWaitTimeMs?: number;
terminateVisibilityTimeout?: boolean;
terminateVisibilityTimeout?: boolean | ((message: SQSMessage) => number);
heartbeatInterval?: number;
sqs?: SQS;
region?: string;
Expand Down Expand Up @@ -132,7 +132,7 @@ export class Consumer extends EventEmitter {
private waitTimeSeconds: number;
private authenticationErrorTimeout: number;
private pollingWaitTimeMs: number;
private terminateVisibilityTimeout: boolean;
private terminateVisibilityTimeout: boolean | ((message: SQSMessage) => number);
private heartbeatInterval: number;
private sqs: SQS;
private preReceiveMessageCallback?: () => Promise<void>;
Expand Down Expand Up @@ -225,7 +225,7 @@ export class Consumer extends EventEmitter {
try {
if (this.heartbeatInterval) {
heartbeat = this.startHeartbeat(async (elapsedSeconds) => {
return this.changeVisabilityTimeout(message, elapsedSeconds + this.visibilityTimeout);
return this.changeVisibilityTimeout(message, elapsedSeconds + this.visibilityTimeout);
});
}
await this.executeHandler(message);
Expand All @@ -235,7 +235,12 @@ export class Consumer extends EventEmitter {
this.emitError(err, message);

if (this.terminateVisibilityTimeout) {
await this.changeVisabilityTimeout(message, 0);
if (typeof this.terminateVisibilityTimeout === 'function') {
const visibilityTimeout = this.terminateVisibilityTimeout(message);
await this.changeVisibilityTimeout(message, visibilityTimeout);
} else {
await this.changeVisibilityTimeout(message, 0);
}
}
} finally {
clearInterval(heartbeat);
Expand Down Expand Up @@ -303,7 +308,7 @@ export class Consumer extends EventEmitter {
}
}

private async changeVisabilityTimeout(message: SQSMessage, timeout: number): Promise<PromiseResult<any, AWSError>> {
private async changeVisibilityTimeout(message: SQSMessage, timeout: number): Promise<PromiseResult<any, AWSError>> {
try {
return this.sqs
.changeMessageVisibility({
Expand Down Expand Up @@ -369,7 +374,7 @@ export class Consumer extends EventEmitter {
try {
if (this.heartbeatInterval) {
heartbeat = this.startHeartbeat(async (elapsedSeconds) => {
return this.changeVisabilityTimeoutBatch(messages, elapsedSeconds + this.visibilityTimeout);
return this.changeVisibilityTimeoutBatch(messages, () => elapsedSeconds + this.visibilityTimeout);
});
}
await this.executeBatchHandler(messages);
Expand All @@ -381,7 +386,11 @@ export class Consumer extends EventEmitter {
this.emit('error', err, messages);

if (this.terminateVisibilityTimeout) {
await this.changeVisabilityTimeoutBatch(messages, 0);
if (typeof this.terminateVisibilityTimeout === 'function') {
await this.changeVisibilityTimeoutBatch(messages, this.terminateVisibilityTimeout);
} else {
await this.changeVisibilityTimeoutBatch(messages, () => 0);
}
}
} finally {
clearInterval(heartbeat);
Expand Down Expand Up @@ -417,13 +426,13 @@ export class Consumer extends EventEmitter {
}
}

private async changeVisabilityTimeoutBatch(messages: SQSMessage[], timeout: number): Promise<PromiseResult<any, AWSError>> {
private async changeVisibilityTimeoutBatch(messages: SQSMessage[], getTimeout: (message: SQSMessage) => number): Promise<PromiseResult<any, AWSError>> {
const params = {
QueueUrl: this.queueUrl,
Entries: messages.map((message) => ({
Id: message.MessageId,
ReceiptHandle: message.ReceiptHandle,
VisibilityTimeout: timeout
VisibilityTimeout: getTimeout(message)
}))
};
try {
Expand Down
47 changes: 45 additions & 2 deletions test/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { assert } from 'chai';
import * as pEvent from 'p-event';

import * as sinon from 'sinon';
import { Consumer } from '../src/index';
import { Consumer, SQSMessage } from '../src/index';

const sandbox = sinon.createSandbox();

Expand Down Expand Up @@ -46,7 +46,10 @@ describe('Consumer', () => {
Messages: [{
ReceiptHandle: 'receipt-handle',
MessageId: '123',
Body: 'body'
Body: 'body',
Attributes: {

}
}]
};

Expand Down Expand Up @@ -577,6 +580,46 @@ describe('Consumer', () => {
});
});

it('terminate message visibility timeout with a function to calculate timeout on processing error', async () => {
const messageWithAttr = {
ReceiptHandle: 'receipt-handle-2',
MessageId: '1',
Body: 'body-2',
Attributes: {
ApproximateReceiveCount: 2
}
};

sqs.receiveMessage = stubResolve({
Messages: [messageWithAttr]
});

consumer = new Consumer({
queueUrl: 'some-queue-url',
attributeNames: ['ApproximateReceiveCount'],
region: 'some-region',
handleMessage,
sqs,
terminateVisibilityTimeout: (message: SQSMessage) => {
const receiveCount = Number.parseInt(message.Attributes?.ApproximateReceiveCount || '1') || 1;
// Add visibility timeout to (10 * receiveCount) seconds
return receiveCount * 10;
}
});

handleMessage.rejects(new Error('Processing error'));

consumer.start();
await pEvent(consumer, 'processing_error');
consumer.stop();

sandbox.assert.calledWith(sqs.changeMessageVisibility, {
QueueUrl: 'some-queue-url',
ReceiptHandle: 'receipt-handle-2',
VisibilityTimeout: 20
});
});

it('fires response_processed event for each batch', async () => {
sqs.receiveMessage = stubResolve({
Messages: [
Expand Down