From c221fcd9127d0ed1a917b9ea40c083f3432c3272 Mon Sep 17 00:00:00 2001 From: coderzc Date: Wed, 30 Apr 2025 00:57:50 +0800 Subject: [PATCH] Fix user properties lost when enable authorized --- .../broker/impl/PulsarMessageConverter.java | 5 +- .../mqtt/common/utils/MqttMessageUtils.java | 2 + ...horizationProxyReasonCodeOnAllAckTest.java | 57 +++++++++++++++++++ 3 files changed, 63 insertions(+), 1 deletion(-) diff --git a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/impl/PulsarMessageConverter.java b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/impl/PulsarMessageConverter.java index 5f22800e6..565c2bde9 100644 --- a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/impl/PulsarMessageConverter.java +++ b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/impl/PulsarMessageConverter.java @@ -16,6 +16,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static io.streamnative.pulsar.handlers.mqtt.common.Constants.MQTT_PROPERTIES; import static io.streamnative.pulsar.handlers.mqtt.common.Constants.MQTT_PROPERTIES_PREFIX; +import static io.streamnative.pulsar.handlers.mqtt.common.utils.MqttMessageUtils.AUTHENTICATE_ROLE_KEY; import com.google.common.base.Splitter; import com.google.common.collect.Lists; import io.netty.buffer.ByteBuf; @@ -100,8 +101,10 @@ public static MessageImpl toPulsarMsg(MQTTServerConfiguration configurat metadata.setPartitionKey(pair.value); metadata.setPartitionKeyB64Encoded(false); } - metadata.addProperty().setKey(getPropertiesPrefix(prop.propertyId()) + pair.key) + if (!pair.key.equalsIgnoreCase(AUTHENTICATE_ROLE_KEY)) { + metadata.addProperty().setKey(getPropertiesPrefix(prop.propertyId()) + pair.key) .setValue(pair.value); + } }); } else if (MqttProperties.MqttPropertyType.RESPONSE_TOPIC.value() == prop.propertyId()) { MqttProperties.StringProperty property = (MqttProperties.StringProperty) prop; diff --git a/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/utils/MqttMessageUtils.java b/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/utils/MqttMessageUtils.java index 6b8a92e99..8211588ab 100644 --- a/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/utils/MqttMessageUtils.java +++ b/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/utils/MqttMessageUtils.java @@ -203,6 +203,7 @@ public static MqttPublishMessage createMqttPublishMessage(MqttPublishMessage pub String authData) { final MqttPublishVariableHeader header = publishMessage.variableHeader(); MqttProperties properties = new MqttProperties(); + header.properties().listAll().forEach(properties::add); properties.add(new MqttProperties.UserProperty(AUTHENTICATE_ROLE_KEY, authData)); MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader( header.topicName(), header.packetId(), properties); @@ -251,6 +252,7 @@ public static MqttSubscribeMessage createMqttSubscribeMessage(MqttSubscribeMessa String authData) { final MqttMessageIdAndPropertiesVariableHeader header = subscribeMessage.idAndPropertiesVariableHeader(); MqttProperties properties = new MqttProperties(); + header.properties().listAll().forEach(properties::add); properties.add(new MqttProperties.UserProperty(AUTHENTICATE_ROLE_KEY, authData)); MqttMessageIdAndPropertiesVariableHeader variableHeader = new MqttMessageIdAndPropertiesVariableHeader( header.messageId(), properties); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/proxy/MQTT5AuthorizationProxyReasonCodeOnAllAckTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/proxy/MQTT5AuthorizationProxyReasonCodeOnAllAckTest.java index fa07843af..b9aa89ddd 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/proxy/MQTT5AuthorizationProxyReasonCodeOnAllAckTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/proxy/MQTT5AuthorizationProxyReasonCodeOnAllAckTest.java @@ -16,6 +16,8 @@ import com.hivemq.client.mqtt.MqttGlobalPublishFilter; import com.hivemq.client.mqtt.datatypes.MqttQos; import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient; +import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperties; +import com.hivemq.client.mqtt.mqtt5.datatypes.Mqtt5UserProperty; import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5ConnAckException; import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5PubAckException; import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5SubAckException; @@ -33,12 +35,14 @@ import java.util.HashSet; import java.util.Random; import java.util.Set; +import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.common.policies.data.AuthAction; import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.Test; +@Slf4j public class MQTT5AuthorizationProxyReasonCodeOnAllAckTest extends AuthorizationConfig { private final Random random = new Random(); @@ -200,5 +204,58 @@ public void testAuthenticationSuccess() { .send(); Assert.assertEquals(connAck.getReasonCode(), Mqtt5ConnAckReasonCode.SUCCESS); } + + @Test + public void testPublishWithUserPropertiesAndEnableAuthorization() throws Exception { + Set userActions = new HashSet<>(); + userActions.add(AuthAction.produce); + userActions.add(AuthAction.consume); + admin.namespaces().grantPermissionOnNamespace("public/default", "user1", userActions); + + + final String topic = "testPublishWithUserProperties"; + Mqtt5BlockingClient client1 = MQTT5ClientUtils.createMqtt5ProxyClient( + getMqttProxyPortList().get(random.nextInt(mqttProxyPortList.size()))); + Mqtt5UserProperties userProperty = Mqtt5UserProperties.builder() + .add("user-1", "value-1") + .add("user-2", "value-2") + .build(); + Mqtt5UserProperty userProperty1 = Mqtt5UserProperty.of("user-1", "value-1"); + Mqtt5UserProperty userProperty2 = Mqtt5UserProperty.of("user-2", "value-2"); + client1.connectWith() + .simpleAuth() + .username("user1") + .password("pass1".getBytes(StandardCharsets.UTF_8)) + .applySimpleAuth() + .send(); + Mqtt5Publish publishMessage = Mqtt5Publish.builder().topic(topic).qos(MqttQos.AT_LEAST_ONCE) + .userProperties(userProperty) + .asWill().build(); + + Mqtt5BlockingClient client2 = MQTT5ClientUtils.createMqtt5ProxyClient( + getMqttProxyPortList().get(random.nextInt(mqttProxyPortList.size()))); + client2.connectWith() + .simpleAuth() + .username("user1") + .password("pass1".getBytes(StandardCharsets.UTF_8)) + .applySimpleAuth() + .send(); + client2.subscribeWith() + .topicFilter(topic) + .qos(MqttQos.AT_LEAST_ONCE) + .send(); + Mqtt5BlockingClient.Mqtt5Publishes publishes = client2.publishes(MqttGlobalPublishFilter.ALL); + client1.publish(publishMessage); + Mqtt5Publish message = publishes.receive(); + Assert.assertNotNull(message); + log.info("Received message properties: {}", message.getUserProperties()); + // Validate the user properties order, must be the same with set order. + Assert.assertEquals(message.getUserProperties().asList().get(0).compareTo(userProperty1), 0); + Assert.assertEquals(message.getUserProperties().asList().get(1).compareTo(userProperty2), 0); + publishes.close(); + client2.unsubscribeWith().topicFilter(topic).send(); + client1.disconnect(); + client2.disconnect(); + } }