-
Notifications
You must be signed in to change notification settings - Fork 72
Description
Describe the bug
Can't use kafka with sasl_ssl listener. No clear statement whether kafka with only sasl_ssl listener is supported, no configuration examples for tbmq mqtt broker
Your Server Environment
- Deployment: 3node cluster, kafka, redis, postgres, podman rootless, quadlet file, slirp4netns rootless net implementation, net mode host, kafka from confluent community repo not in container
- TBMQ version: 2.2.0
- OS name and version: AlmaLinux 9
Your Client Environment
N/A
To Reproduce
- Prepare AlmaLinux 9 (or any other el distro tbh, rel 10.x will be also ok) x3
- Install kafka from confluent repo
- Configure kafka to use kraft on controller as service 1 on other port x with mtls, configure separate broker on port y as service 2
- So communication inside kafka is like:
controller <-> broker = mtls,controller <-> controller = mtls,client <-> broker = sasl_ssl,broker <-> broker = sasl_ssl - Install
podmanandslirp4netnson AlmaLinux - Configure podman to use slirp4netns network implementation (or fix pasta to allow connection to host "external localhost")
- Pull tbmq, postgres, redis container
- Prepare quadlets and environmentfiles for ^, so set tbmq to use kafka on node{1,2,3}, on port y (not krafts one)
- important configuration parts below
Expected behavior
Have one parameter to setup tls scram kafka for tbmq <-> kafka communication
Actual behavior
Despite vars set:
TB_KAFKA_MSG_ALL_ADDITIONAL_CONSUMER_CONFIG
TB_KAFKA_MSG_ALL_ADDITIONAL_PRODUCER_CONFIG
TB_KAFKA_DEVICE_PERSISTED_MSG_ADDITIONAL_CONSUMER_CONFIG
TB_KAFKA_DEVICE_PERSISTED_MSG_ADDITIONAL_PRODUCER_CONFIG
TB_KAFKA_RETAINED_MSG_ADDITIONAL_CONSUMER_CONFIG
TB_KAFKA_RETAINED_MSG_ADDITIONAL_PRODUCER_CONFIG
TB_KAFKA_CLIENT_SESSION_ADDITIONAL_CONSUMER_CONFIG
TB_KAFKA_CLIENT_SESSION_ADDITIONAL_PRODUCER_CONFIG
TB_KAFKA_CLIENT_SUBSCRIPTIONS_ADDITIONAL_CONSUMER_CONFIG
TB_KAFKA_CLIENT_SUBSCRIPTIONS_ADDITIONAL_PRODUCER_CONFIG
TB_KAFKA_CLIENT_SESSION_EVENT_ADDITIONAL_CONSUMER_CONFIG
TB_KAFKA_CLIENT_SESSION_EVENT_ADDITIONAL_PRODUCER_CONFIG
TB_KAFKA_CLIENT_SESSION_EVENT_RESPONSE_ADDITIONAL_CONSUMER_CONFIG
TB_KAFKA_CLIENT_SESSION_EVENT_RESPONSE_ADDITIONAL_PRODUCER_CONFIG
TB_KAFKA_DISCONNECT_CLIENT_COMMAND_ADDITIONAL_CONSUMER_CONFIG
TB_KAFKA_DISCONNECT_CLIENT_COMMAND_ADDITIONAL_PRODUCER_CONFIG
TB_KAFKA_BASIC_DOWNLINK_MSG_ADDITIONAL_CONSUMER_CONFIG
TB_KAFKA_BASIC_DOWNLINK_MSG_ADDITIONAL_PRODUCER_CONFIG
TB_KAFKA_PERSISTED_DOWNLINK_MSG_ADDITIONAL_CONSUMER_CONFIG
TB_KAFKA_PERSISTED_DOWNLINK_MSG_ADDITIONAL_PRODUCER_CONFIG
TB_KAFKA_APPLICATION_REMOVED_EVENT_ADDITIONAL_CONSUMER_CONFIG
TB_KAFKA_APPLICATION_REMOVED_EVENT_ADDITIONAL_PRODUCER_CONFIG
TB_KAFKA_HISTORICAL_DATA_TOTAL_ADDITIONAL_CONSUMER_CONFIG
TB_KAFKA_HISTORICAL_DATA_TOTAL_ADDITIONAL_PRODUCER_CONFIG
TB_KAFKA_INTERNODE_NOTIFICATIONS_ADDITIONAL_CONSUMER_CONFIG
TB_KAFKA_INTERNODE_NOTIFICATIONS_ADDITIONAL_PRODUCER_CONFIG
TB_KAFKA_BLOCKED_CLIENT_ADDITIONAL_CONSUMER_CONFIG
TB_KAFKA_BLOCKED_CLIENT_ADDITIONAL_PRODUCER_CONFIG
TB_KAFKA_ADMIN_CONFIG
to security.protocol:SASL_SSL;sasl.mechanism:SCRAM-SHA-512;sasl.jaas.config:org.apache.kafka.common.security.scram.ScramLoginModule required username="mykafkauser" password="censored1";ssl.truststore.type:PKCS12;ssl.truststore.location:/certs/nca.p12;ssl.truststore.password:censored2
tried also SASL_SSL;sasl.mechanism:SCRAM-SHA-512 swapping places with sasl.jaas.config:org.apache.kafka.common.security.scram... and other variants with no luck
like
security.protocol:SASL_SSL;sasl.mechanism:SCRAM-SHA-512;sasl.jaas.config:org.apache.kafka.common.security.scram.ScramLoginModule\x20required\x20username\x3d\x22internode\x22\x20password\x3d\x22censoredpass1\x22\x3b;ssl.truststore.type:PKCS12;ssl.truststore.location:/certs/nca.p12;ssl.truststore.password:censoredpass2
security.protocol:SASL_SSL;sasl.mechanism:SCRAM-SHA-512;sasl.jaas.config:'org.apache.kafka.common.security.scram.ScramLoginModule required username="internode" password="censoredpass1";';ssl.truststore.type:PKCS12;ssl.truststore.location:/certs/nca.p12;ssl.truststore.password:censoredpass2
security.protocol:SASL_SSL;sasl.mechanism:SCRAM-SHA-512;sasl.jaas.config:"org.apache.kafka.common.security.scram.ScramLoginModule required username=internode password=censoredpass1";ssl.truststore.type:PKCS12;ssl.truststore.location:/certs/nca.p12;ssl.truststore.password:censoredpass2
security.protocol:SASL_SSL;sasl.mechanism:SCRAM-SHA-512;sasl.jaas.config:"org.apache.kafka.common.security.scram.ScramLoginModule required username=internode password=censoredpass1;";ssl.truststore.type:PKCS12;ssl.truststore.location:/certs/nca.p12;ssl.truststore.password:censoredpass2
security.protocol:SASL_SSL;sasl.mechanism:SCRAM-SHA-512;sasl.jaas.config:org.apache.kafka.common.security.scram.ScramLoginModule required username=internode password=censoredpass1;ssl.truststore.type:PKCS12;ssl.truststore.location:/certs/nca.p12;ssl.truststore.password:censoredpass2
security.protocol:SASL_SSL;sasl.mechanism:SCRAM-SHA-512;sasl.jaas.config:org.apache.kafka.common.security.scram.ScramLoginModule required "username=\"internode\"" "password=\"censoredpass1\"";ssl.truststore.type:PKCS12;ssl.truststore.location:/certs/nca.p12;ssl.truststore.password:censoredpass2
security.protocol:SASL_SSL;sasl.mechanism:SCRAM-SHA-512;sasl.jaas.config:org.apache.kafka.common.security.scram.ScramLoginModule required username=\"internode\" password=\"censoredpass1\";ssl.truststore.type:PKCS12;ssl.truststore.location:/certs/nca.p12;ssl.truststore.password:censoredpass2
security.protocol:SASL_SSL;sasl.mechanism:SCRAM-SHA-512;sasl.jaas.config:org.apache.kafka.common.security.scram.ScramLoginModule required username="internode" password="censoredpass1";ssl.truststore.type:PKCS12;ssl.truststore.location:/certs/nca.p12;ssl.truststore.password:censoredpass2;ssl.endpoint.identification.algorithm:https
-Djava.security.auth.login.config=/conf/kafka_client_jaas.conf;security.protocol:SASL_SSL;sasl.mechanism:SCRAM-SHA-512
Every attempt results in error: java.lang.IllegalArgumentException: JAAS config entry not terminated by semi-colon or Login module control flag not specified in JAAS config
example stacktrace below
Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.thingsboard.mqtt
at org.springframework.beans.BeanUtils.instantiateClass(BeanUtils.java:223)
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantia
at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.jav
... 84 common frames omitted
Caused by: org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient
at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:561)
at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:512)
at org.apache.kafka.clients.admin.Admin.create(Admin.java:137)
at org.thingsboard.mqtt.broker.queue.kafka.TbKafkaAdmin.<init>(TbKafkaAdmin.java:92)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccesso
at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructo
at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500)
at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:481)
at org.springframework.beans.BeanUtils.instantiateClass(BeanUtils.java:210)
... 86 common frames omitted
Caused by: org.apache.kafka.common.KafkaException: Failed to create new NetworkClient
at org.apache.kafka.clients.ClientUtils.createNetworkClient(ClientUtils.java:255)
at org.apache.kafka.clients.ClientUtils.createNetworkClient(ClientUtils.java:190)
at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:545)
... 95 common frames omitted
Caused by: java.lang.IllegalArgumentException: JAAS config entry not terminated by semi-colon
at org.apache.kafka.common.security.JaasConfig.parseAppConfigurationEntry(JaasConfig.java:121)
at org.apache.kafka.common.security.JaasConfig.<init>(JaasConfig.java:63)
at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:95)
at org.apache.kafka.common.security.JaasContext.loadClientContext(JaasContext.java:89)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:168)
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:82)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:120)
at org.apache.kafka.clients.ClientUtils.createNetworkClient(ClientUtils.java:224)
... 97 common frames omittedI can confirm user and pass and net are working because i had successfully configured akhq for test, where config looks simply like that:
properties:
bootstrap.servers: "..."
security.protocol: SASL_SSL
sasl.mechanism: SCRAM-SHA-512
sasl.jaas.config: >
org.apache.kafka.common.security.scram.ScramLoginModule required
username="internode" password="censoredpass1";
# ssl.endpoint.identification.algorithm: ""
ssl.truststore.type: PKCS12
ssl.truststore.location: /opt/certs/nca.p12
ssl.truststore.password: censoredpass2Screenshots
N/A
Additional context
although this may not be the most common scenario with quadlet file deployment using EnvironmentFile= in [Container] Scope of podman systemd unit, I believe there might be some error with parsing env variable into spring yaml file.
Possible fix would be probably some separate and global envs like:
TB_KAFKA_SECURITY_PROTOCOL=SASL_SSL
TB_KAFKA_SASL_MECHANISM=SCRAM-SHA-512
TB_KAFKA_SASL_JAAS_CONFIG=... #(auto append ; in the end or smart parsing)
...