Window下kafka 单机SASL_PLAINTEXT加密及身份认证
一:前情提要
SASL_PLAINTEXT是一种简单的用户名和密码认证机制,是一种kafka加密协议,PLAINTEXT是传输层协议
二:配置准备
1:JAVA_HOME
有JAVA_HOME的环境变量,且java版为1.8及以上,jdk目录无中文和空格;
2:KAFKA项目部署
下载解压KAFKA项目到一个无中文无空格的目录下。配置非加密下的KAFKA环境,参考Windows环境下kafka搭建;
三:JAAS配置
KAFKA使用JAVA认证和授权服务(JAAS)进行SASL配置
1:为zookeeper配置JAAS
在config目录下创建文件kafka_zoo_jaas.conf,文件内容具体如下:
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin"
user_admin="admin";
};
创建了一个Server节点,其中
- org.apache.kafka.common.security.plain.PlainLoginModule required是加密方式,用plain, 连接时必须身份验证。
- username,password是zookeeper之间通讯的用户名和密码,
- user_admin="admin"的结构是user_username="password",用户名是admin, 密码是admin,客户端连接到zookeeper时,使用这个用户名和密码。
2:为broker配置JAAS
在config目录下创建文件kafka_server_jaas.conf,具体内容如下:
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin";
};
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin"
user_admin="admin"
user_alice="alice";
};
- Client节点,是broker连接zookeeper时的认证信息
- KafkaServer节点:集群中,broker之间用节点中的username,password进行通讯
- KafkaServer节点:客户端(producer,consumer)连接broker时用user_username="password"结构中的账号密码登录
3:为客户端(producer,consumer等)配置JAAS
在config目录下创建文件kafka_client_jaas.conf,文件内容具体如下:客户端用client节点信息,连接认证zookeeper后broker
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin";
};
四、SASL配置
4.1:zookeeper的sasl配置
4.1.1:修改zookeeper.properties 修改后的文件内容为:主要就是新增后面三行
#数据文件目录
dataDir=./data/zookeeper
# 客户端连接时的端口
clientPort=2181
maxClientCnxns=0
#加密认证
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
4.1.2:修改zookeeper-server-start.bat
在文件中加入:这个目的是将kafka_zoo_jaas.conf将入zookeeper的jvm参数中
set KAFKA_OPTS=-Djava.security.auth.login.config=E:/demo/kafka/SASL_PLAINTEXT/broker/2181/kafka_2.12-1.1.1/config/kafka_zoo_jaas.conf
加入后,完整的zookeeper-server-start.bat信息如下
@echo off
IF [%1] EQU [] (
echo USAGE: %0 zookeeper.properties
EXIT /B 1
)
SetLocal
IF ["%KAFKA_LOG4J_OPTS%"] EQU [""] (
set KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%~dp0../../config/log4j.properties
)
IF ["%KAFKA_HEAP_OPTS%"] EQU [""] (
set KAFKA_HEAP_OPTS=-Xmx512M -Xms512M
)
set KAFKA_OPTS=-Djava.security.auth.login.config=E:/demo/kafka/SASL_PLAINTEXT/broker/2181/kafka_2.12-1.1.1/config/kafka_zoo_jaas.conf
"%~dp0kafka-run-class.bat" org.apache.zookeeper.server.quorum.QuorumPeerMain %*
EndLocal
4.1.3:启动zookeeper服务
在根目录下执行命令
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
出现到下面内容则为启动成功:
4.2:broker的sasl配置
4.2.1修改文件:server.properties 主要修改内容为:
listeners=SASL_PLAINTEXT://192.168.40.150:9091
#使用的认证协议
security.inter.broker.protocol=SASL_PLAINTEXT
#SASL机制
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
# 完成身份验证的类
#authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
# 如果没有找到ACL(访问控制列表)配置,则允许任何操作。
allow.everyone.if.no.acl.found=false
#超级管理员权限用户
super.users=User:admin
advertised.listeners=SASL_PLAINTEXT://192.168.40.150:9091
修改后,完整的server.properties文件为:
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
port=9091
host.name=192.168.40.150
advertised.port=9091
advertised.host.name=192.168.40.150
############################# Socket Server Settings #############################
listeners=SASL_PLAINTEXT://192.168.40.150:9091
advertised.listeners=SASL_PLAINTEXT://192.168.40.150:9091
#使用的认证协议
security.inter.broker.protocol=SASL_PLAINTEXT
#SASL机制
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
# 完成身份验证的类
#authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
# 如果没有找到ACL(访问控制列表)配置,则允许任何操作。
allow.everyone.if.no.acl.found=false
super.users=User:admin
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
############################# Log Basics #############################
log.dirs=./tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
#log.flush.interval.messages=10000
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
log.retention.hours=168
#log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
zookeeper.connect=192.168.40.150:2181
zookeeper.connection.timeout.ms=6000
############################# Group Coordinator Settings #############################
group.initial.rebalance.delay.ms=0
4.2.2:修改kafka-server-start.bat 在bat文件中加入:
set KAFKA_OPTS=-Djava.security.auth.login.config=E:/demo/kafka/SASL_PLAINTEXT/broker/2181/kafka_2.12-1.1.1/config/kafka_server_jaas.conf
修改完后的 kafka-server-start.bat完整信息为:
@echo off
IF [%1] EQU [] (
echo USAGE: %0 server.properties
EXIT /B 1
)
SetLocal
set KAFKA_OPTS=-Djava.security.auth.login.config=E:/demo/kafka/SASL_PLAINTEXT/broker/2181/kafka_2.12-1.1.1/config/kafka_server_jaas.conf
IF ["%KAFKA_LOG4J_OPTS%"] EQU [""] (
set KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%~dp0../../config/log4j.properties
)
IF ["%KAFKA_HEAP_OPTS%"] EQU [""] (
rem detect OS architecture
wmic os get osarchitecture | find /i "32-bit" >nul 2>&1
IF NOT ERRORLEVEL 1 (
rem 32-bit OS
set KAFKA_HEAP_OPTS=-Xmx512M -Xms512M
) ELSE (
rem 64-bit OS
set KAFKA_HEAP_OPTS=-Xmx1G -Xms1G
)
)
"%~dp0kafka-run-class.bat" kafka.Kafka %*
EndLocal
4.2.3 启动 broker
.\bin\windows\kafka-server-start.bat .\config\server.properties
出现这样的信息则为成功:
4.3:producer的sasl配置
4.3.1:修改producer.properties文件,在文件末尾新增一下内容:
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
4.3.2:启动kafka-console-producer.bat
.\bin\windows\kafka-console-producer.bat --broker-list 192.168.40.150:9091 --topic testTopic --producer.config .\config\producer.properties
这里使用的已经创建好的topic(topicTest)。和以往的启用,多了--producer.config。下面这样则为成功
4.4 consumer的sasl配置
4.4.1:修改consumer.properties文件,在文件的末尾新增以下内容:
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
4.4.2:启动 kafka-console-consumer.bat
.\bin\windows\kafka-console-consumer.bat --bootstrap-server 192.168.40.150:9091 --topic testTopic --consumer.config .\config\consumer.properties
读取testTopic内容的信息,和以往的启用相比,多了--consumer.config。出现下面界面则为成功,打印刚刚producer新增的数据
注意:这里因为在producer.properties,consumer.properties文件中配置了sasl.jaas.config这个其实就是配置认证信息,除了这样配置以外,还可以像broker一样,在kafka-console-producer.bat, kafka-console-consumer.bat文件中加入:
set KAFKA_OPTS= -Djava.security.auth.login.config=file:%~dp0../../config/kafka_server_jaas.conf