Skip to content

[Feature Request] Introduce common config for consumers and producers #279

@mszmidt

Description

@mszmidt

Describe the bug
Can't use kafka with sasl_ssl listener. No clear statement whether kafka with only sasl_ssl listener is supported, no configuration examples for tbmq mqtt broker

Your Server Environment

  • Deployment: 3node cluster, kafka, redis, postgres, podman rootless, quadlet file, slirp4netns rootless net implementation, net mode host, kafka from confluent community repo not in container
  • TBMQ version: 2.2.0
  • OS name and version: AlmaLinux 9

Your Client Environment

N/A

To Reproduce

  • Prepare AlmaLinux 9 (or any other el distro tbh, rel 10.x will be also ok) x3
  • Install kafka from confluent repo
  • Configure kafka to use kraft on controller as service 1 on other port x with mtls, configure separate broker on port y as service 2
  • So communication inside kafka is like: controller <-> broker = mtls, controller <-> controller = mtls, client <-> broker = sasl_ssl, broker <-> broker = sasl_ssl
  • Install podman and slirp4netns on AlmaLinux
  • Configure podman to use slirp4netns network implementation (or fix pasta to allow connection to host "external localhost")
  • Pull tbmq, postgres, redis container
  • Prepare quadlets and environmentfiles for ^, so set tbmq to use kafka on node{1,2,3}, on port y (not krafts one)
  • important configuration parts below

Expected behavior
Have one parameter to setup tls scram kafka for tbmq <-> kafka communication

Actual behavior
Despite vars set:

TB_KAFKA_MSG_ALL_ADDITIONAL_CONSUMER_CONFIG
TB_KAFKA_MSG_ALL_ADDITIONAL_PRODUCER_CONFIG

TB_KAFKA_DEVICE_PERSISTED_MSG_ADDITIONAL_CONSUMER_CONFIG
TB_KAFKA_DEVICE_PERSISTED_MSG_ADDITIONAL_PRODUCER_CONFIG

TB_KAFKA_RETAINED_MSG_ADDITIONAL_CONSUMER_CONFIG
TB_KAFKA_RETAINED_MSG_ADDITIONAL_PRODUCER_CONFIG

TB_KAFKA_CLIENT_SESSION_ADDITIONAL_CONSUMER_CONFIG
TB_KAFKA_CLIENT_SESSION_ADDITIONAL_PRODUCER_CONFIG

TB_KAFKA_CLIENT_SUBSCRIPTIONS_ADDITIONAL_CONSUMER_CONFIG
TB_KAFKA_CLIENT_SUBSCRIPTIONS_ADDITIONAL_PRODUCER_CONFIG

TB_KAFKA_CLIENT_SESSION_EVENT_ADDITIONAL_CONSUMER_CONFIG
TB_KAFKA_CLIENT_SESSION_EVENT_ADDITIONAL_PRODUCER_CONFIG

TB_KAFKA_CLIENT_SESSION_EVENT_RESPONSE_ADDITIONAL_CONSUMER_CONFIG
TB_KAFKA_CLIENT_SESSION_EVENT_RESPONSE_ADDITIONAL_PRODUCER_CONFIG

TB_KAFKA_DISCONNECT_CLIENT_COMMAND_ADDITIONAL_CONSUMER_CONFIG
TB_KAFKA_DISCONNECT_CLIENT_COMMAND_ADDITIONAL_PRODUCER_CONFIG

TB_KAFKA_BASIC_DOWNLINK_MSG_ADDITIONAL_CONSUMER_CONFIG
TB_KAFKA_BASIC_DOWNLINK_MSG_ADDITIONAL_PRODUCER_CONFIG

TB_KAFKA_PERSISTED_DOWNLINK_MSG_ADDITIONAL_CONSUMER_CONFIG
TB_KAFKA_PERSISTED_DOWNLINK_MSG_ADDITIONAL_PRODUCER_CONFIG

TB_KAFKA_APPLICATION_REMOVED_EVENT_ADDITIONAL_CONSUMER_CONFIG
TB_KAFKA_APPLICATION_REMOVED_EVENT_ADDITIONAL_PRODUCER_CONFIG

TB_KAFKA_HISTORICAL_DATA_TOTAL_ADDITIONAL_CONSUMER_CONFIG
TB_KAFKA_HISTORICAL_DATA_TOTAL_ADDITIONAL_PRODUCER_CONFIG

TB_KAFKA_INTERNODE_NOTIFICATIONS_ADDITIONAL_CONSUMER_CONFIG
TB_KAFKA_INTERNODE_NOTIFICATIONS_ADDITIONAL_PRODUCER_CONFIG

TB_KAFKA_BLOCKED_CLIENT_ADDITIONAL_CONSUMER_CONFIG
TB_KAFKA_BLOCKED_CLIENT_ADDITIONAL_PRODUCER_CONFIG

TB_KAFKA_ADMIN_CONFIG

to security.protocol:SASL_SSL;sasl.mechanism:SCRAM-SHA-512;sasl.jaas.config:org.apache.kafka.common.security.scram.ScramLoginModule required username="mykafkauser" password="censored1";ssl.truststore.type:PKCS12;ssl.truststore.location:/certs/nca.p12;ssl.truststore.password:censored2

tried also SASL_SSL;sasl.mechanism:SCRAM-SHA-512 swapping places with sasl.jaas.config:org.apache.kafka.common.security.scram... and other variants with no luck

like

security.protocol:SASL_SSL;sasl.mechanism:SCRAM-SHA-512;sasl.jaas.config:org.apache.kafka.common.security.scram.ScramLoginModule\x20required\x20username\x3d\x22internode\x22\x20password\x3d\x22censoredpass1\x22\x3b;ssl.truststore.type:PKCS12;ssl.truststore.location:/certs/nca.p12;ssl.truststore.password:censoredpass2
security.protocol:SASL_SSL;sasl.mechanism:SCRAM-SHA-512;sasl.jaas.config:'org.apache.kafka.common.security.scram.ScramLoginModule required username="internode" password="censoredpass1";';ssl.truststore.type:PKCS12;ssl.truststore.location:/certs/nca.p12;ssl.truststore.password:censoredpass2
security.protocol:SASL_SSL;sasl.mechanism:SCRAM-SHA-512;sasl.jaas.config:"org.apache.kafka.common.security.scram.ScramLoginModule required username=internode password=censoredpass1";ssl.truststore.type:PKCS12;ssl.truststore.location:/certs/nca.p12;ssl.truststore.password:censoredpass2
security.protocol:SASL_SSL;sasl.mechanism:SCRAM-SHA-512;sasl.jaas.config:"org.apache.kafka.common.security.scram.ScramLoginModule required username=internode password=censoredpass1;";ssl.truststore.type:PKCS12;ssl.truststore.location:/certs/nca.p12;ssl.truststore.password:censoredpass2
security.protocol:SASL_SSL;sasl.mechanism:SCRAM-SHA-512;sasl.jaas.config:org.apache.kafka.common.security.scram.ScramLoginModule required username=internode password=censoredpass1;ssl.truststore.type:PKCS12;ssl.truststore.location:/certs/nca.p12;ssl.truststore.password:censoredpass2
security.protocol:SASL_SSL;sasl.mechanism:SCRAM-SHA-512;sasl.jaas.config:org.apache.kafka.common.security.scram.ScramLoginModule required "username=\"internode\"" "password=\"censoredpass1\"";ssl.truststore.type:PKCS12;ssl.truststore.location:/certs/nca.p12;ssl.truststore.password:censoredpass2
security.protocol:SASL_SSL;sasl.mechanism:SCRAM-SHA-512;sasl.jaas.config:org.apache.kafka.common.security.scram.ScramLoginModule required username=\"internode\" password=\"censoredpass1\";ssl.truststore.type:PKCS12;ssl.truststore.location:/certs/nca.p12;ssl.truststore.password:censoredpass2
security.protocol:SASL_SSL;sasl.mechanism:SCRAM-SHA-512;sasl.jaas.config:org.apache.kafka.common.security.scram.ScramLoginModule required username="internode" password="censoredpass1";ssl.truststore.type:PKCS12;ssl.truststore.location:/certs/nca.p12;ssl.truststore.password:censoredpass2;ssl.endpoint.identification.algorithm:https
-Djava.security.auth.login.config=/conf/kafka_client_jaas.conf;security.protocol:SASL_SSL;sasl.mechanism:SCRAM-SHA-512

Every attempt results in error: java.lang.IllegalArgumentException: JAAS config entry not terminated by semi-colon or Login module control flag not specified in JAAS config

example stacktrace below

Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.thingsboard.mqtt
        at org.springframework.beans.BeanUtils.instantiateClass(BeanUtils.java:223)
        at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantia
        at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.jav
        ... 84 common frames omitted
Caused by: org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient
        at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:561)
        at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:512)
        at org.apache.kafka.clients.admin.Admin.create(Admin.java:137)
        at org.thingsboard.mqtt.broker.queue.kafka.TbKafkaAdmin.<init>(TbKafkaAdmin.java:92)
        at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccesso
        at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructo
        at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500)
        at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:481)
        at org.springframework.beans.BeanUtils.instantiateClass(BeanUtils.java:210)
        ... 86 common frames omitted
Caused by: org.apache.kafka.common.KafkaException: Failed to create new NetworkClient
        at org.apache.kafka.clients.ClientUtils.createNetworkClient(ClientUtils.java:255)
        at org.apache.kafka.clients.ClientUtils.createNetworkClient(ClientUtils.java:190)
        at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:545)
        ... 95 common frames omitted
Caused by: java.lang.IllegalArgumentException: JAAS config entry not terminated by semi-colon
        at org.apache.kafka.common.security.JaasConfig.parseAppConfigurationEntry(JaasConfig.java:121)
        at org.apache.kafka.common.security.JaasConfig.<init>(JaasConfig.java:63)
        at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:95)
        at org.apache.kafka.common.security.JaasContext.loadClientContext(JaasContext.java:89)
        at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:168)
        at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:82)
        at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:120)
        at org.apache.kafka.clients.ClientUtils.createNetworkClient(ClientUtils.java:224)
        ... 97 common frames omitted

I can confirm user and pass and net are working because i had successfully configured akhq for test, where config looks simply like that:

      properties:
        bootstrap.servers: "..."
        security.protocol: SASL_SSL
        sasl.mechanism: SCRAM-SHA-512
        sasl.jaas.config: >
          org.apache.kafka.common.security.scram.ScramLoginModule required
          username="internode" password="censoredpass1";
#        ssl.endpoint.identification.algorithm: ""
        ssl.truststore.type: PKCS12
        ssl.truststore.location: /opt/certs/nca.p12
        ssl.truststore.password: censoredpass2

Screenshots
N/A

Additional context
although this may not be the most common scenario with quadlet file deployment using EnvironmentFile= in [Container] Scope of podman systemd unit, I believe there might be some error with parsing env variable into spring yaml file.

Possible fix would be probably some separate and global envs like:

TB_KAFKA_SECURITY_PROTOCOL=SASL_SSL
TB_KAFKA_SASL_MECHANISM=SCRAM-SHA-512
TB_KAFKA_SASL_JAAS_CONFIG=... #(auto append ; in the end or smart parsing)
...

Metadata

Metadata

Labels

EnhancementNew feature or request

Type

No type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions