Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"version": "0.2.0",
"configurations": [
{
"address": "127.0.0.1",
"localRoot": "${workspaceFolder}",
"name": "Debug",
"port": 9229,
"remoteRoot": "/app",
"request": "attach",
"restart": true,
"skipFiles": [
"<node_internals>/**"
],
"type": "pwa-node"
}
]
}
2 changes: 1 addition & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@
}
],
"editor.codeActionsOnSave": {
"source.fixAll.eslint": true
"source.fixAll.eslint": "explicit"
}
}
17 changes: 17 additions & 0 deletions docker-compose.override.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
version: "3"

services:
rabbitmq:
image: rabbitmq:3-management
environment:
RABBITMQ_DEFAULT_VHOST: test
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:15672"]
interval: 30s
timeout: 10s
retries: 5
ports:
- "15672:15672"
- "5672:5672"
volumes:
- ./docker/rabbitmq/enabled_plugins:/etc/rabbitmq/enabled_plugins
14 changes: 14 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
version: '3'
services:
worker: &base
image: rabbit-base-image
build:
context: .
dockerfile: docker/dev/Dockerfile
volumes:
- ./:/app
# env_file: .env
ports:
- 9229:9229
depends_on:
- rabbitmq
5 changes: 5 additions & 0 deletions docker/dev/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
FROM node:20-alpine

WORKDIR /app

CMD yarn test
1 change: 1 addition & 0 deletions docker/rabbitmq/enabled_plugins
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[rabbitmq_management,rabbitmq_prometheus,rabbitmq_shovel,rabbitmq_shovel_management].
6 changes: 4 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@
"format": "prettier-eslint src/**/*.ts --write",
"lint": "eslint --ext .ts src",
"build": "tsc",
"prepare": "tsc"
"prepare": "tsc",
"test": "tsnd --inspect=0.0.0.0:9229 --respawn --no-notify --poll --no-deps --ignore-watch node_modules --transpile-only src/test.ts"

},
"devDependencies": {
"@eduzz/eslint-config-houston": "^1.0.14",
"@types/amqplib": "^0.10.1",
"@types/amqplib": "^0.10.4",
"@types/node": "^18.15.11",
"@types/winston": "^2.4.4",
"@typescript-eslint/eslint-plugin": "^5.58.0",
Expand Down
18 changes: 13 additions & 5 deletions src/Queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export class Queue {
}

public async listen<T = unknown>(callback: (data: T, message?: amqp.ConsumeMessage) => Promise<boolean>) {
if (this.options.retryTimeout > 0 && this.options.deadLetterAfter <= 0) {
if (this.hasTimeout() && !this.hasDeadLetter()) {
throw new Error('If you use retryTimeout, you need to specify a deadLetterAfter');
}

Expand Down Expand Up @@ -197,7 +197,7 @@ export class Queue {
}

private async configureDLQQueue(ch: amqp.Channel) {
if (this.options.deadLetterAfter < 1) {
if (!this.hasDeadLetter()) {
return;
}

Expand All @@ -219,7 +219,7 @@ export class Queue {

let args: Record<string, any> = {};

if (this.options.retryTimeout > 0) {
if (this.hasTimeout()) {
args = {
'x-dead-letter-exchange': exchange,
'x-dead-letter-routing-key': this.options.names.retryTopic,
Expand Down Expand Up @@ -255,13 +255,13 @@ export class Queue {
await ch.bindQueue(this.options.names.queueName, exchange, topic);
}

if (this.options.enableNack && this.options.retryTimeout) {
if (this.options.enableNack && this.hasTimeout()) {
await ch.bindQueue(this.options.names.queueName, exchange, this.options.names.retryTopic);
}
}

private async handleFailedMessage(channel: amqp.Channel, msg: amqp.ConsumeMessage) {
if (!msg.properties?.headers['x-death']) {
if (!this.hasDeadLetter() || !msg.properties?.headers['x-death']) {
channel.nack(msg, false, false);
return;
}
Expand All @@ -276,4 +276,12 @@ export class Queue {

channel.nack(msg, false, false);
}

private hasTimeout() {
return this.options.retryTimeout > 0;
}

private hasDeadLetter() {
return this.options.deadLetterAfter > 0;
}
}
61 changes: 27 additions & 34 deletions src/test.ts
Original file line number Diff line number Diff line change
@@ -1,39 +1,32 @@
import { Connection } from './Connection';
import { sleep } from './fn';

(async () => {
const connection = new Connection({
dsn: 'amqps://doehrbmi:1sfqvtXmdi8MCz0xOJ80r-6utLBjfj24@moose.rmq.cloudamqp.com/doehrbmi',
exchange: 'xpto',
connectionName: 'yay',
logLevel: 'debug',
const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));

const connection = new Connection({
dsn: 'amqp://guest:guest@rabbitmq/test',
exchange: 'test',
connectionName: 'test',
});

let count = 0;
connection
.queue('events')
.topic('event.sent')
.durable(true)
.prefetch(100)
.retryTimeout(1)
.deadLetterAfter(5000)
.listen(async (message) => {
count++;
console.log('count:', count);
throw new Error('error');
});

try {
await connection.connect();

await connection
.queue('adasdasdqewwq')
.topic('xpto')
.listen(async (payload) => {
console.log('RECEIVED', payload);
await sleep(1000);

return true;
});

await connection.delayQueue('myNiceDelayQueue').timeout(30000).from('xpto.from').to('xpto').create();

const publisher = connection.topic('xpto').persistent();

let id = 0;

setInterval(async () => {
await publisher.send({
payload: { x: ++id },
});
}, 1000);
} catch (err) {
console.log(err);
}
(async () => {
const publisher = connection.topic('event.sent').persistent();
console.log('sending message');
await sleep(3000);
await publisher.send({ payload: 'message' });
})();

console.log('started');