-
Notifications
You must be signed in to change notification settings - Fork 45
Open
Labels
Description
Describe the bug
aop/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/impl/PersistentExchange.java
Lines 287 to 303 in 06bf657
| public void removeQueue(AmqpQueue queue) { | |
| queues.remove(queue); | |
| if (exchangeType == Type.Direct) { | |
| for (Map.Entry<String, Set<AmqpQueue>> entry : bindingKeyQueueMap.entrySet()) { | |
| bindingKeyQueueMap.computeIfPresent(entry.getKey(), (k, v) -> { | |
| v.remove(queue); | |
| if (v.isEmpty()) { | |
| return null; | |
| } | |
| return v; | |
| }); | |
| } | |
| } | |
| updateExchangeProperties(); | |
| deleteCursor(queue.getName()); | |
| } | |
in removeQueue method, we remove value from a map when iterating it, which cause potential ConcurrentModificationException.
To Reproduce
bind queue with Direct exchange, then remove the queue
Expected behavior
queue removed successfully
Screenshots
none
Additional context
none