Skip to content

Commit 3058133

Browse files
author
Daniel Bustamante Ospina
committed
Fix controlled retries for events (fix message duplication on retry strategy for events)
1 parent 5f38a5e commit 3058133

File tree

3 files changed

+16
-11
lines changed

3 files changed

+16
-11
lines changed

async/async-commons-starter/src/main/java/org/reactivecommons/async/impl/config/EventListenersConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public ApplicationEventListener eventListener(HandlerResolver resolver, MessageC
3030
final ApplicationEventListener listener = new ApplicationEventListener(receiver,
3131
appName + ".subsEvents", resolver, asyncProps.getDomain().getEvents().getExchange(),
3232
messageConverter, asyncProps.getWithDLQRetry(), asyncProps.getMaxRetries(), asyncProps.getRetryDelay(),asyncProps.getDomain().getEvents().getMaxLengthBytes(),
33-
discardNotifier, errorReporter);
33+
discardNotifier, errorReporter, appName);
3434

3535
listener.startListener();
3636

async/async-commons/src/main/java/org/reactivecommons/async/impl/listeners/ApplicationEventListener.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,10 @@
2020
import reactor.rabbitmq.BindingSpecification;
2121
import reactor.rabbitmq.ExchangeSpecification;
2222

23-
import java.util.Objects;
2423
import java.util.Optional;
25-
import java.util.Set;
2624
import java.util.function.Function;
27-
import java.util.stream.Collectors;
2825

26+
import static java.lang.String.format;
2927
import static reactor.core.publisher.Flux.fromIterable;
3028

3129
@Log
@@ -38,6 +36,7 @@ public class ApplicationEventListener extends GenericMessageListener {
3836
private final int retryDelay;
3937
private final Optional<Integer> maxLengthBytes;
4038
private final Matcher keyMatcher;
39+
private final String appName;
4140

4241

4342
public ApplicationEventListener(ReactiveMessageListener receiver,
@@ -49,7 +48,8 @@ public ApplicationEventListener(ReactiveMessageListener receiver,
4948
long maxRetries, int retryDelay,
5049
Optional<Integer> maxLengthBytes,
5150
DiscardNotifier discardNotifier,
52-
CustomErrorReporter errorReporter) {
51+
CustomErrorReporter errorReporter,
52+
String appName) {
5353
super(queueName, receiver, withDLQRetry, maxRetries, discardNotifier, "event", errorReporter);
5454
this.retryDelay = retryDelay;
5555
this.withDLQRetry = withDLQRetry;
@@ -58,17 +58,22 @@ public ApplicationEventListener(ReactiveMessageListener receiver,
5858
this.messageConverter = messageConverter;
5959
this.maxLengthBytes = maxLengthBytes;
6060
this.keyMatcher = new KeyMatcher();
61+
this.appName = appName;
6162
}
6263

6364
protected Mono<Void> setUpBindings(TopologyCreator creator) {
6465
if (withDLQRetry) {
66+
final String eventsDLQExchangeName = format("%s.%s.DLQ", appName, eventsExchange);
67+
final String retryExchangeName = format("%s.%s", appName, eventsExchange);
6568
final Mono<AMQP.Exchange.DeclareOk> declareExchange = creator.declare(ExchangeSpecification.exchange(eventsExchange).durable(true).type("topic"));
66-
final Mono<AMQP.Exchange.DeclareOk> declareExchangeDLQ = creator.declare(ExchangeSpecification.exchange(eventsExchange + ".DLQ").durable(true).type("topic"));
67-
final Mono<AMQP.Queue.DeclareOk> declareDLQ = creator.declareDLQ(queueName, eventsExchange, retryDelay, maxLengthBytes);
68-
final Mono<AMQP.Queue.DeclareOk> declareQueue = creator.declareQueue(queueName, eventsExchange + ".DLQ", maxLengthBytes);
69+
final Mono<AMQP.Exchange.DeclareOk> retryExchange = creator.declare(ExchangeSpecification.exchange(retryExchangeName).durable(true).type("topic"));
70+
final Mono<AMQP.Exchange.DeclareOk> declareExchangeDLQ = creator.declare(ExchangeSpecification.exchange(eventsDLQExchangeName).durable(true).type("topic"));
71+
final Mono<AMQP.Queue.DeclareOk> declareDLQ = creator.declareDLQ(queueName, retryExchangeName, retryDelay, maxLengthBytes);
72+
final Mono<AMQP.Queue.DeclareOk> declareQueue = creator.declareQueue(queueName, eventsDLQExchangeName, maxLengthBytes);
6973
final Flux<AMQP.Queue.BindOk> bindings = fromIterable(resolver.getEventListeners()).flatMap(listener -> creator.bind(BindingSpecification.binding(eventsExchange, listener.getPath(), queueName)));
70-
final Flux<AMQP.Queue.BindOk> bindingDLQ = fromIterable(resolver.getEventListeners()).flatMap(listener -> creator.bind(BindingSpecification.binding(eventsExchange + ".DLQ", listener.getPath(), queueName + ".DLQ")));
71-
return declareExchange.then(declareExchangeDLQ).then(declareQueue).then(declareDLQ).thenMany(bindings).thenMany(bindingDLQ).then();
74+
final Mono<AMQP.Queue.BindOk> bindingDLQ = creator.bind(BindingSpecification.binding(eventsDLQExchangeName, "#", queueName + ".DLQ"));
75+
final Mono<AMQP.Queue.BindOk> retryBinding = creator.bind(BindingSpecification.binding(retryExchangeName, "#", queueName));
76+
return declareExchange.then(retryExchange).then(declareExchangeDLQ).then(declareQueue).then(declareDLQ).thenMany(bindings).then(bindingDLQ).then(retryBinding).then();
7277
} else {
7378
final Flux<AMQP.Queue.BindOk> bindings = fromIterable(resolver.getEventListeners())
7479
.flatMap(listener -> creator.bind(BindingSpecification.binding(eventsExchange, listener.getPath(), queueName)));

async/async-commons/src/test/java/org/reactivecommons/async/impl/listeners/ApplicationEventListenerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ protected GenericMessageListener createMessageListener(HandlerResolver handlerRe
4444

4545
class StubGenericMessageListener extends ApplicationEventListener {
4646
public StubGenericMessageListener(HandlerResolver handlerResolver) {
47-
super(reactiveMessageListener, "queueName", handlerResolver, "", messageConverter, true, 10, 10, Optional.empty(), discardNotifier, errorReporter);
47+
super(reactiveMessageListener, "queueName", handlerResolver, "", messageConverter, true, 10, 10, Optional.empty(), discardNotifier, errorReporter, "");
4848
}
4949

5050
@Override

0 commit comments

Comments
 (0)