由于 AutoMQ [1] 对 Kafka [2] 的全面兼容,所以对于 SASL 安全认证配置 AutoMQ 与 Kafka 的实现是相同的,通过本文你可以学会如何通过配置 SASL 安全的使用 AutoMQ。
前置知识
每个服务器都必须定义一组监听器,用于接收来自客户端和其他服务器的请求,所有服务都是通过监听器暴露在外的,本篇关于如何构建安全的 Kafka 环境也是围绕监听器展开叙述,所以在学习后边的内容之前,需要先对监听器有个大概的了解。
基础概念
监听器的配置
我们可以对每个监听器进行配置,以便使用各种机制对客户端进行身份验证,并确保对服务器和客户端之间的流量进行加密。
Kafka 服务器支持监听多个端口上的连接。这是通过服务器配置中的 listeners 属性来配置的, 该属性接受一个以逗号分隔的监听器列表 。每个服务器上 必须至少定义一个监听器 。listeners 中定义的每个监听器的格式如下:
{ LISTENER_NAME } ://{hostname}:{port}
举例
listeners=SASL_PLAINTEXT://:9092,CONTROLLER://:9093 # 注意此处的 hostname 是可省的,hostname缺省代表绑定 0.0.0.0 即所有接口 # 此处的 LISTENER_NAME 就为 SASL_PLAINTEXT、CONTROLLER 两个
LISTENER_NAME 通常是一个描述性名称 ,用于定义监听器的用途。例如,许多配置使用单独的监听器来处理客户端流量,因此它们可能会在配置中将相应的监听器称为 CLIENT :
listeners=CLIENT://localhost:9092
这里之所以说 LISTENER_NAME 通常是一个描述性名称的原因,是因为我们 可以直接将 LISTENER_NAME 指定为协议名称,从而跳过协议配置的步骤。
使用上面的示例,我们可以使用以下定义跳过 CLIENT监听器的定义:
listeners=PLAINTEXT://localhost:9092
这个例子指明监听 localhost:9092 的监听器使用的是 PLAINTEXT 协议。
不过 Kafka 官网并不推荐使用这种命名方式:“we recommend users to provide explicit names for the listeners since it makes the intended usage of each listener clearer.”
如果你使用 别名命名监听器 ,那么这些监听器的 安全协议需要在单独的配置中定义 :
listener.security.protocol.map
该值是一个以逗号分隔的列表,其中列出了每个监听器与其安全协议的映射关系。例如,以下值配置指定 CLIENT 监听器使用SSL,而 BROKER 监听器使用 PLAINTEXT(明文)。
listener.security.protocol.map=CLIENT:SSL,BROKER:PLAINTEXT
Kafka 安全协议的可选项( 不区分大小写 )如下:
-
PLAINTEXT
-
SSL
-
SASL_PLAINTEXT
-
SASL_SSL
PLAINTEXT 协议不提供安全性,且除了明文协议外,其他的协议都需要额外进行配置(这里的额外配置指的不是上面的 map 映射配置,而是协议本身需要的额外配置,注意甄别)。
配置项
这里要对 Listener 相关配置项做详细解释,方便读者理解后面内容。
在解释这个配置项前,我们先明确几个概念。
-
VPC 指基于云计算平台的虚拟私人网络,同一 VPC 下的 Broker 可以通过私网 IP 进行相互通信。
-
同一 VPC 下部署的不同服务器之间可以同时通过公网 IP 及私网 IP 通信。
-
对于某一 VPC 来说,在其下启动的 Producer 或者 Consumer 称为 Internal Client,否则称为 External Client,下图是以 VPC 1 作为主视角。
-
VPC 中的 Broker 具有内网 IP,也可以同时具有公网 IP。

常见的Kafka集群
上面的是一个很常见的 Kafka 集群场景,而图中那些通信虚线箭头就是是通过 Kafka 中不同的 Listener 建立的,这些 Listener 依旧以 VPC 1 做为主视角分为 Internal Listener 和 External Listener。作用如下所示:

那么这些 Listener 的创建以及内外部如何通信都是由 Lisener 的配置项决定的,下面就开始详细介绍这些配置项。
- liseners
作用:用于指定 Kafka broker 监听 TCP 连接的地址和端口。
在上图可代表红蓝所有的 Listener。
配置格式如下:
liseners={listenreName}:{hostname}//{port},{listenreName2}:{hostname2}//{port2}
例如
listeners=SASL_PLAINTEXT://:9092,CONTROLLER://:9093 # hostname缺省代表绑定 0.0.0.0 即所有接口
它有这几个特点
-
可以同时配置多个, 并且用逗号隔开。
-
监听器的名称和端口 必须都是唯一的 ,不能有两个名称相同的监听器,即使它们的端口不同。
-
如果 hostname 为空,例如(listeners = ://:port),代表绑定 0.0.0.0 即所有接口
-
将 hostname 设置为 0.0.0.0 则会绑定所有的接口,也就是所有接口的请求都会被接受处理,但是注意,当设置为 0.0.0.0 的时候 advertised.listeners 必须要设置,详细原因下面会说。
-
listenerName 是监听器的名称,是一个唯一值,它并不是安全协议,只是说默认配置中已经做了安全协议名称对应的映射,默认映射如下:
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
关于 hostname 缺省绑定的接口,网上很多教程,包括官网对此的描述都是会绑定默认接口,中文网站教程很多都认为默认接口它绑定的是 localhost,实际在源码中缺省后绑定的是 0.0.0.0,监听的是所有接口。




- adveratised.liseners
listeners是用来监听网络请求的,那么其他 Broker 或者客户端想要与之通信是需要具体的 IP:PORT 的,这个属性就是来设置这个值的。
IaaS 环境中,这可能需要与 Broker 绑定的接口不同。如果未设置,将使用 listeners 的值。与 listeners 不同,公布 0.0.0.0 地址是无效的。此外,与 listeners 不同的是,该属性中可以有重复的端口,因此可以将其中一个监听器配置为另一个监听器的地址。这在某些使用外部负载平衡器的情况下非常有用。
它的配置格式如下:
advertised.listeners={listenreName}:{hostname}//{port},{listenreName2}:{hostname2}//{port2}
它有如下特点
默认情况下,不设置advertised.listeners会自动使用listeners属性。
不支持设置为 0.0.0.0 的形式,源码中对此也有要求,如果listeners 设置为 0.0.0.0 则必须设置advertised.listeners 属性,因为其他的 Broker 和 Client 需要知道你的具体 IP + 端口。

可以同时配置多个,并用逗号隔开。
- listener.security.protocol.map
监听器名称和安全协议之间的映射关系集合,如果监听器名称不是安全协议,则必须设置 listener.security.protocol.map 。
配置格式如下:
listener.security.protocol.map={监听器名称}:{安全协议名称},{监听器名称2}:{安全协议名称},{监听器名称2}:{安全协议名称2}
KafkaConfig.scala 中该属性描述如下:

它的默认值是在 Defaults.java 中配置了



所以说,一旦你的监听器有自定义名称,你就需要配置此映射,且配置后,默认映射会被覆盖掉,你需要做的就是把 所有的监听器名称配置它所需要的映射 。实际上,Kafka 官网也推荐自己把所有需要的映射写清楚,而不是使用它的默认值。
- inter.broker.lisener.name
用于 Broker 之间通信的 Listener 名称。如果未设置,则 Listener 名称由security.inter.broker.protocol定义(security.inter.broker.protocol 具有默认值是 PLAINTEXT)。
security.inter.broker.protocol 不可同时 与inter.broker.listener.name设置
配置格式
inter.broker.listener.name=监听器名称
值得一提的是,如果使用了这个属性,就必须要设置advertised.listeners属性,且设置的名称必须在advertised.listeners中包含,道理也很简单,设置这个属性的目的是在 Broker 之间进行通信,通信的过程就是通过本地配置的监听器名称,去查找其他 Broker 的监听器的 EndPoint,所以一般集群里面所有的 Broker 监听器名称都必须一致,否则找不到对应的 Endpoint 无法发起正常的请求,而设置advertised.listeners就是保证自己也能与其他的 Broker 进行通信,这里在 Kafka 源码 KafkaConfig.scala 中也有强制要求,并且可以看到 advertised.listeners的名称要在 listeners 中有配置:
val listenerNames = listeners.map(_.listenerName). toSet
if (processRoles.isEmpty || processRoles.contains(ProcessRole.BrokerRole) ) {
// validations
for all broker setups (i.e. ZooKeeper and KRaft broker-only and KRaft co-located) validateAdvertisedListenersNonEmptyForBroker() require(advertisedListenerNames.contains(interBrokerListenerName), s "${KafkaConfig.InterBrokerListenerNameProp} must be a listener name defined in ${KafkaConfig.AdvertisedListenersProp}. " + s "The valid options based on currently configured listeners are ${advertisedListenerNames.map(_.value).mkString(" , ")}" ) require(advertisedListenerNames.subsetOf(listenerNames), s "${KafkaConfig.AdvertisedListenersProp} listener names must be equal to or a subset of the ones defined in ${KafkaConfig.ListenersProp}. " + s "Found ${advertisedListenerNames.map(_.value).mkString(" , ")}. The valid options based on the current configuration " + s "are ${listenerNames.map(_.value).mkString(" , ")}" )
}
- security.inter.broker.protocol
用于在 Broker 之间进行通信的安全协议,有效值只有以下几个:PLAINTEXT
SSL
SASL_PLAINTEXT
SASL_SSL
它和inter.broker.listener.name的区别是,这个配置只有四个选项,都是安全协议,而inter.broker.listener.name是监听名称,需要通过这个监听名称找到它的安全协议以及 IP:PORT。
如果inter.broker.listener.name没有配置,就会默认使用security.inter.broker.protocol的配置。
一般自定义了监听器的名称,inter.broker.listener.name就是必须要设置的,不能用security.inter.broker.protocol来代替。
注意
在 KRaft 集群中,Broker 是指在 process.roles 中启用了 broker 角色的任何服务器,Controller 是指启用了 controller 角色的任何服务器。监听器配置取决于角色。inter.broker.listener.name 定义的监听器专门用于处理 Broker 之间的请求。另一方面,Controller 必须使用由 controller.listener.names 配置定义的单独监听器名称。该监听器名称不能设置为与inter.broker.listener.name相同的值。
Controller 既接收来自其他 Controller 的请求,也接收来自 Broker 的请求。因此,即使服务器没有启用 controller 角色(即它只是一个 Broker 角色),它仍必须定义 Controller 监听器以及配置它所需的任何安全属性,以便根据配置寻找其它的 Controller 发送请求。例如:
process.roles=broker
listeners=BROKER:
//localhost:9092 inter.broker.listener.name=BROKER
controller.quorum.voters= 0 @localhost : 9093 controller.listener.names=CONTROLLER listener.security.protocol.map=BROKER:SASL_SSL,CONTROLLER:SASL_SSL
在此示例中,Controller 监听器被配置为使用 SASL_SSL 安全协议,但它不包括在 listeners 中,因为 Broker 本身并不公开 Controller 监听器。本例中使用的端口来自 controller.quorum.voters 配置,该配置定义了 Controller 的完整列表。
对于同时启用了 Broker 和 Controller 角色的 KRaft 服务器,配置方法类似。唯一的区别是,Controller 监听器必须包含在 listeners 中:
process.roles=broker,controller
listeners=BROKER:
//localhost:9092,CONTROLLER://localhost:9093 inter.broker.listener.name=BROKER
controller.quorum.voters= 0 @localhost : 9093 controller.listener.names=CONTROLLER listener.security.protocol.map=BROKER:SASL_SSL,CONTROLLER:SASL_SSL
controller.quorum.voters 中定义的端口必须与暴露的 Controller 监听器之一完全匹配。例如,这里的 CONTROLLER 监听器绑定到了 9093 端口。这样,由 controller.quorum.voters 定义的连接字符串也必须使用 9093 端口,此处就是如此。
Controller 将接受 controller.listener.names 所定义的所有监听器的请求。通常只有一个 Controller 监听器,但也可以有更多监听器。例如,可通过群集滚动将活跃的监听器从一个端口或安全协议更改为另一个端口或安全协议(滚动一次暴露新监听器,滚动一次移除旧监听器)。当定义了多个控制器监听器时,将用列表中的第一个于向外请求。
Kafka 的传统做法是为客户端配置独立的监听器,以实现网络层面的隔离。这种方法确保了集群间的通信与客户端的通信是独立分开的。在 KRaft 模式中,Controller 监听器同样需要实现隔离,因为它们仅用于内部集群管理,客户端不会使用这些监听器。客户端应连接到 Broker 上配置的其他监听器。任何绑定到 Controller 的请求都会按以下方式转发:
在 KRaft 集群中,客户端会将 CreateTopics 和 DeleteTopics 等管理请求发送给 Broker 监听器。然后,Broker 根据 controller.listener.
names 中配置的第一个监听器将请求转发给对应活跃的 Controller。
使用 SASL 进行身份验证
Simple Authentication and Security Layer(简单认证安全层)。Kafka 使用 Java Authentication and Authorization Service 即 JAAS[3] 进行 SASL 配置。
对Kafka Broker
进行 JAAS 配置
Kafka 对 JAAS 的配置方式有两种,一种是单独的将 JAAS 配置文件作为 JVM 参数的方式配置,另一种则是在 Kafka 的配置文件中通过 sasl.jaas.config 属性配置,本文最后使用的配置方式为第二种。
通过 JAAS 配置文件配置 JAAS
KafkaServer 是每个 Kafka Server/Broker 使用的 JAAS 文件内容的部分。该部分为 Broker 提供 SASL 配置选项,包括 Broker 为进行 Broker 间通信而建立的任何 SASL 客户端连接。如果多个监听器被配置为使用 SASL,该部分可以用小写的监听器名称作为前缀,后面跟一个句点,例如:sasl_ssl.KafkaServer,只有一个监听器配置 SASL 的话则无需配置监听器名称。
Client 的部分是用于验证与 zookeeper 的 SASL 连接。由于 AutoMQ 使用的是 Kraft 模式,舍弃了 zookeeper,所以这部分不做描述。
具体 kafka_server_jaas.conf 文件内容如下:
yyy.KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username= "_automq" password= "automq-secret" user__automq= "automq-secret" ; # 注意这里的分号必须要有 }; # usernaem和password是用来给Broker之间通信的配置 # 除此之外还定义了一个用户_automq密码为automq-secret # 后面可以接着配其他监听器名开头的配置,如下 xxx.KafkaServer{ ... };
可以通过将 JAAS 配置文件位置作为 JVM 参数使其生效。
-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
通过sasl.jaas.config属性配置 JAAS
这里要对 Listener
Broker 可以直接在配置文件中使用sasl.jaas.config属性来配置 JAAS。该属性名称必须以监听器名称 + SASL 机制为前缀,即 listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config, 配置值中只能指定一个登录模块 。如果在一个监听器上配置了多个机制,则必须使用监听器和机制前缀为每个机制提供配置。例如:
listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username= "admin" \ password= "admin-secret" ; listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username= "admin" \ password= "admin-secret" \ user_admin= "admin-secret" \ user_alice= "alice-secret" ;
如果使用了多种方式进行配置,则它们的生效次序如下:
-
Broker 配置属性 listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config
-
静态 JAAS 配置的{listenerName}.KafkaServer
-
静态 JAAS 配置的 KafkaServer
SASL配置
SASL 可以使用 PLAINTEXT 或 SSL 作为传输层,所以安全协议可以设置为 SASL_PLAINTEXT 或 SASL_SSL。如果使用 SASL_SSL,则还必须配置 SSL。
SASL机制
Kafka 支持以下 SASL 机制
-
GSSAPI(Kerberos)[4]
-
PLAIN [5]
-
SCRAM-SHA-256 [6]
-
SCRAM-SHA-512 [7]
-
OAUTHBEARER [8]
本文只对 PLAIN 机制做配置介绍,其他的机制请前往官网继续学习。
Broker的SASL配置
在 server.properties 中配置 SASL 端口,在 listeners 参数中至少添加 SASL_PLAINTEXT 或 SASL_SSL 其中之一,该参数包含一个或多个逗号分隔的值:
listeners=SASL_PLAINTEXT://hostName:port,SASL_SSL://hostName2:prot2
如果只配置 SASL 端口(或希望 Kafka Broker 使用 SASL 相互验证),请确保为 Broker 间通信设置相同的 SASL 协议:
security.inter.broker.protocol=SASL_PLAINTEXT
总的 SASL 所需配置如下:
listeners=BROKER_SASL://:9092,CONTROLLER_SASL://:9093
inter.broker.listener.name=BROKER_SASL
sasl.enabled.mechanisms=SCRAM-SHA-256,PLAIN,SCRAM-SHA-512
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.mechanism.controller.protocol=PLAIN
listener.name.broker_sasl.plain.connections.max.reauth.ms=10000
controller.listener.names=CONTROLLER_SASL
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,BROKER_SASL:SASL_PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER_SASL:SASL_PLAINTEXT
listener.name.broker_sasl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="_automq" \
password="automq-secret" \
user__automq="automq-secret";
listener.name.controller_sasl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="_automq" \
password="automq-secret" \
user__automq="automq-secret";
通过以上配置启动服务器,我们就得到了一个有 SASL 认证的一个 AutoMQ 服务器。
Client的SASL配置
客户端的配置文件内容如下:
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="_automq" password="automq-secret";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
配置文件启动 AutoMQ
关于如何快速部署启动一个单节点的 AutoMQ 实例请参考官方文档 Direct S3 Cluster 部署 | AutoMQ [9]

export KAFKA_S3_ACCESS_KEY=<your-ak>
export KAFKA_S3_SECRET_KEY=<your-sk>
bin/kafka-server-start.sh /root/automq/config/kraft/server.properties
测试连接
创建 Topic
bin/kafka-topics.sh \
--bootstrap-server 47.253.200.218:9092 \
--command-config /root/automq/bin/sasl-client.properties \
--create \
--topic test2
创建成功。
生产者与消费者测试

bin/kafka-console-producer.sh \
--bootstrap-server 47.253.200.218:9092 \
--topic test2 \
--producer.config /root/automq/bin/sasl-client.properties
bin/kafka-console-consumer.sh \
--bootstrap-server 47.253.200.218:9092 \
--topic test2 \
--consumer.config /root/automq/bin/sasl-client.properties
收发消息成功。
总结
想要配置明白 SASL 需要对 Kafka 监听器有一定的了解,监听器的名称影响会影响到其他配置的设置,所以给监听器命名时要简明扼要,方便以后的问题排查。
参考资料
[1] AutoMQ: https://www.automq.com/zh
[2] Kafka: https://kafka.apache.org/
[3] JAAS: https://docs.oracle.com/javase/8/docs/technotes/guides/security/jaas/JAASRefGuide.html
[4] GSSAPI: https://kafka.apache.org/documentation/#security_sasl_kerberos
[5] PLAIN: https://kafka.apache.org/documentation/#security_sasl_plain
[6] SCRAM-SHA-256: https://kafka.apache.org/documentation/#security_sasl_scram
[7] SCRAM-SHA-512: https://kafka.apache.org/documentation/#security_sasl_scram
[8] OAUTHBEARER: https://kafka.apache.org/documentation/#security_sasl_oauthbearer
[9] AutoMQ 多节点集群部署:https://docs.automq.com/zh/automq/deployment/deploy-multi-nodes-cluster-on-linux
