CDH-Kerberos环境下Kafka集成Sentry进行权限管理
1.文档编写目的
Sentry在CDH平台中定位为统一的授权框架,即所有的组件都要受Sentry的管理,当然也是为了方便用户的操作,一个入口为所有数据相关进行授权。Fayson在前面的文章中介绍了大量Sentry与Hive/Impala的集成文章,其实Sentry除了可以给Hive/Impala表授权外,还可以管理HDFS ACL,Kafka,Solr,Kudu等。前两天Fayson也介绍过如何使用Sentry给Solr的collection进行赋权,参考《如何使用Sentry为Solr赋权》。本文Fayson主要介绍如何使用Sentry给Kafka的topic相关进行授权。
- 内容概述:
1.Kafka与Sentry的集成赋权介绍
2.启用Kafka的Sentry赋权
3.Kafka的赋权测试
4.总结
- 测试环境:
1.CM5.14.3/CDH5.14.2
2.CDK2.2.0(Apache Kafka0.10.2)
3.操作系统版本为Redhat7.3
4.采用root用户进行操作
5.集群已启用Kerberos
2.Kafka与Sentry的集成赋权介绍
从CDK2.1.x(Apache Kafka0.10.0) on CDH5.9.x开始,CDH开始支持通过Sentry给Kafka授权。我们知道Kafka可以单独作为集群部署,但是因为Sentry的安装需要HDFS,所以在部署Sentry与Kafka集成时,你必须安装HDFS服务,当Sentry安装完毕后,你可以停止HDFS服务。
2.1.可授权的资源
在Kafka集群中,可以赋权的东东我们称作资源(resources)或者实体(entities ),一旦启用Kafka的Sentry后,对这些资源或者实体进行操作,都需要对用户组进行赋权。Kafka中包含4种可以授权的资源:
1.集群(Cluster),它控制谁可以执行集群级别的操作,比如创建或者删除topic。它只有一个值,kafka-cluster,因为一个Kafka集群不能拥有多个集群资源。
2.Topic,它控制谁可以执行Topic级别的操作,比如生产和消费Topic。注意你在赋权的时候,它的值必须与Kafka的Topic的名称完全一致。从CDK3.1.0+CDH5.14.2开始,通配符(*)代表任何Topic权限。
3.消费者组(Consumergroup),它控制谁可以执行消费者组级别的操作,比如加入或者描述消费者组。它的值必须与消费者组的group.id完全匹配。从CDK3.1.0+CDH5.14.2开始,通配符(*)代表任何消费者组权限。当和Spark Streaming一起使用时,这一点比较重要,因为group.id可能是靠你的程序生成的。
4.主机(Host),它控制你可以从哪些主机执行操作。这个可以实现Kafka服务的IP过滤,通配符(*)代表允许所有主机。
2.2.可授权的操作
每个资源都可以被授权多种操作,以下是Kafka支持的授权操作,不是所有资源都支持以下所有操作。
1.ALL,代表资源的所有操作
2.read
3.write
4.create
5.delete
6.alter
7.describe
8.clusteraction
3.启用Kafka的Sentry赋权
在介绍本章之前,你需要一些预备知识才能继续往下阅读。首先CDH的Parcel默认不包含Kafka,你需要单独下载和安装,请参考Fayson之前的文章
启用Kafka的Sentry授权,集群必须安装Kerberos,Kerberos的安装请参考Fayson之前的文章
《如何在Redhat7.3的CDH5.14中启用Kerberos》
《如何在CDH6.0.0-beta1中启用Kerberos》
Kafka在启用Kerberos,以及使用过程中跟其他组件有些不一样,主要是需要引入jaas文件,请参考Fayson之前的文章
《如何通过Cloudera Manager为Kafka启用Kerberos及使用》
以下开始启用Kafka的Sentry授权集成
1.从Cloudera Manager修改Kafka的配置
以上三项,如果你正确启用了Kafka的Kerberos,默认已经启用
2.启用Kafka的Sentry,通过Cloudera Manager修改Kafka服务的配置
Sentry权限的cache勾选后可以提高性能。
3.如果是测试系统,建议将Sentry的cache间隔调整短一点,默认30s,可以改为1ms。
否则每次调整Kafka的相关授权以后,默认需要等待30s才能生效。
4.保存以上修改,然后重启集群服务,重启过程这里略。
4.创建测试需要使用的用户和principle
1.在所有节点创建fayson用户,并在Kerberos中创建fayson的principle。
[[email protected] shell]# sh ssh_do_all.sh node.list "useradd fayson"
(可左右滑动)
[[email protected] shell]# kadmin.local Authenticating as principal root/[email protected] with password. kadmin.local: addprinc fayson WARNING: no policy specified for [email protected]; defaulting to no policy Enter password for principal "[email protected]": Re-enter password for principal "[email protected]": Principal "[email protected]" created. kadmin.local:
(可左右滑动)
2.为了后面操作方便,我们这里还创建一个kafka的principle,当然你也可以到/var/run/cloudera-scm-agent/process目录下去拿kafka用户的principle。
[[email protected] shell]# kadmin.local Authenticating as principal root/[email protected] with password. kadmin.local: addprinc kafka/admin WARNING: no policy specified for kafka/[email protected]; defaulting to no policy Enter password for principal "kafka/[email protected]": Re-enter password for principal "kafka/[email protected]": Principal "kafka/[email protected]" created. kadmin.local:
(可左右滑动)
5.Kafka的赋权测试
1.创建测试需要用到的client.properties和jaas.conf文件
[[email protected] kafka]# cat jaas.conf KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useTicketCache=true; }; [[email protected] kafka]# cat client.properties security.protocol=SASL_PLAINTEXT sasl.kerberos.service.name=kafka group.id=testgroup
(可左右滑动)
这里Fayson的jaas文件没有引入keytab,到时执行命令的时候需要先kinit,为什么Kafka的Kerberos需要这2个文件,可以参考Fayson之前的文章《如何通过Cloudera Manager为Kafka启用Kerberos及使用》
2.首先我们使用fayson用户创建一个testTopic。
[[email protected] kafka]# kinit fayson Password for [email protected]: [[email protected] kafka]# kafka-topics --create --zookeeper cdh01.fayson.com:2181 --replication-factor 3 --partitions 3 --topic testTopic SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/KAFKA-2.2.0-1.2.2.0.p0.68/lib/kafka/libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/KAFKA-2.2.0-1.2.2.0.p0.68/lib/kafka/libs/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] Created topic "testTopic".
(可左右滑动)
注意:这里Fayson并没有使用管理员用户kafka给fayson赋权“集群”资源的权限,但是fayson用户依旧可以创建topic,包括删除,这是CDH的中Kafka和Sentry权限集成的bug,需要在后续版本才修复,目标版本是C6。
3.使用fayson用户启用producer脚本命令。
[[email protected] kafka]# export KAFKA_OPTS="-Djava.security.auth.login.config=/data/disk1/_fayson/kafka/jaas.conf" [[email protected] kafka]# kafka-console-producer --broker-list cdh02.fayson.com:9092,cdh03.fayson.com:9092,cdh04.fayson.com:9092 --topic testTopic --producer.config client.properties SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/KAFKA-2.2.0-1.2.2.0.p0.68/lib/kafka/libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/KAFKA-2.2.0-1.2.2.0.p0.68/lib/kafka/libs/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 18/06/13 00:39:20 WARN producer.ProducerConfig: The configuration 'group.id' was supplied but isn't a known config. 2 18/06/13 00:39:22 WARN clients.NetworkClient: Error while fetching metadata with correlation id 2 : {testTopic=UNKNOWN_TOPIC_OR_PARTITION} 18/06/13 00:39:22 WARN clients.NetworkClient: Error while fetching metadata with correlation id 6 : {testTopic=UNKNOWN_TOPIC_OR_PARTITION} 18/06/13 00:39:22 WARN clients.NetworkClient: Error while fetching metadata with correlation id 8 : {testTopic=UNKNOWN_TOPIC_OR_PARTITION} 18/06/13 00:39:22 WARN clients.NetworkClient: Error while fetching metadata with correlation id 9 : {testTopic=UNKNOWN_TOPIC_OR_PARTITION} 18/06/13 00:39:22 WARN clients.NetworkClient: Error while fetching metadata with correlation id 10 : {testTopic=UNKNOWN_TOPIC_OR_PARTITION} 18/06/13 00:39:22 WARN clients.NetworkClient: Error while fetching metadata with correlation id 11 : {testTopic=UNKNOWN_TOPIC_OR_PARTITION} 18/06/13 00:39:22 WARN clients.NetworkClient: Error while fetching metadata with correlation id 12 : {testTopic=UNKNOWN_TOPIC_OR_PARTITION} ^C18/06/13 00:39:22 WARN kerberos.KerberosLogin: [Principal=null]: TGT renewal thread has been interrupted and will exit.
(可左右滑动)
报错,无法produce到topic
4.我们给fayson用户组赋权可以写入数据到testTopic,注意需要使用管理员kafka用户登录Kerberos才能进行操作
[[email protected] kafka]# kinit kafka/admin Password for kafka/[email protected]: [[email protected] kafka]# klist Ticket cache: FILE:/tmp/krb5cc_0 Default principal: kafka/[email protected] Valid starting Expires Service principal 06/13/2018 00:41:45 06/14/2018 00:41:45 krbtgt/[email protected] renew until 06/20/2018 00:41:45
(可左右滑动)
创建角色kafka_role
[[email protected] kafka]# kafka-sentry -cr -r kafka_role SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/KAFKA-2.2.0-1.2.2.0.p0.68/lib/kafka/libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/KAFKA-2.2.0-1.2.2.0.p0.68/lib/kafka/libs/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 18/06/13 00:43:26 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
(可左右滑动)
列出Sentry中的角色
[[email protected] kafka]# kafka-sentry -lr SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/KAFKA-2.2.0-1.2.2.0.p0.68/lib/kafka/libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/KAFKA-2.2.0-1.2.2.0.p0.68/lib/kafka/libs/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 18/06/13 00:44:25 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable fayson hive_admin admin kafka_role kafka_role1
(可左右滑动)
给kafka_role角色赋权可以给testTopic写入权限
[[email protected] kafka]# kafka-sentry -gpr -r kafka_role -p "Topic=testTopic->action=write" SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/KAFKA-2.2.0-1.2.2.0.p0.68/lib/kafka/libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/KAFKA-2.2.0-1.2.2.0.p0.68/lib/kafka/libs/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 18/06/13 00:45:36 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
(可左右滑动)
给kafka_role角色赋权可以给testTopic的describe权限
[[email protected] kafka]# kafka-sentry -gpr -r kafka_role -p "Topic=testTopic->action=describe" SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/KAFKA-2.2.0-1.2.2.0.p0.68/lib/kafka/libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/KAFKA-2.2.0-1.2.2.0.p0.68/lib/kafka/libs/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 18/06/13 01:34:10 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
(可左右滑动)
把kafka_role加入到用户组fayson中
[[email protected] kafka]# kafka-sentry -arg -r kafka_role -g fayson SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/KAFKA-2.2.0-1.2.2.0.p0.68/lib/kafka/libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/KAFKA-2.2.0-1.2.2.0.p0.68/lib/kafka/libs/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 18/06/13 00:47:08 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
(可左右滑动)
再次使用fayson用户登录Kerberos,启用producer程序
[[email protected] kafka]# export KAFKA_OPTS="-Djava.security.auth.login.config=/data/disk1/_fayson/kafka/jaas.conf" [[email protected] kafka]# kafka-console-producer --broker-list cdh02.fayson.com:9092,cdh03.fayson.com:9092,cdh04.fayson.com:9092 --topic testTopic --producer.config client.properties
(可左右滑动)
执行成功,说明赋权testTopic的写入权限成功。
5.使用fayson用户启动consumer程序
export KAFKA_OPTS="-Djava.security.auth.login.config=/data/disk1/_fayson/kafka/jaas.conf" kafka-console-consumer --topic testTopic --from-beginning --bootstrap-server cdh02.fayson.com:9092,cdh03.fayson.com:9092,cdh04.fayson.com:9092 --consumer.config client.properties
(可左右滑动)
报错没有权限
6.给kafka_role角色赋权consumer相关的权限
kafka-sentry -gpr -r kafka_role -p "CONSUMERGROUP=testgroup->action=read" kafka-sentry -gpr -r kafka_role -p "CONSUMERGROUP=testgroup->action=describe" kafka-sentry -gpr -r kafka_role -p "Topic=testTopic->action=read"
(可左右滑动)
7.再次使用fayson用户登录Kerberos后启动producer和consumer
export KAFKA_OPTS="-Djava.security.auth.login.config=/data/disk1/_fayson/kafka/jaas.conf" kafka-console-producer --broker-list cdh02.fayson.com:9092,cdh03.fayson.com:9092,cdh04.fayson.com:9092 --topic testTopic --producer.config client.properties
(可左右滑动)
export KAFKA_OPTS="-Djava.security.auth.login.config=/data/disk1/_fayson/kafka/jaas.conf" kafka-console-consumer --topic testTopic --from-beginning --bootstrap-server cdh02.fayson.com:9092,cdh03.fayson.com:9092,cdh04.fayson.com:9092 --consumer.config client.properties
(可左右滑动)
消费成功,表明赋权消费者相关权限以后,消费成功。
6.总结
1.通过Sentry可以对Kafka的topic进行权限管理,主要是往topic写入数据以及读取topic的数据。
2.启用Kafka的Sentry赋权,CDH集群必须启用Kerberos,另外对于Kafka服务也需要启用Kerberos。
3.目前Kafka的授权,对于create和delete topic还不完善,需要等待后续版本。
4.如果只是测试系统,建议将Sentry权限的cache刷新间隔调低,Fayson这次测试由默认的30s改为了1ms,如果使用默认,将需要等待30s才能让新的权限生效。
5.在给Topic赋权read或者write权限时,务必同时带上describe权限,否则权限不生效。当然你也可以将权限设置为ALL。
6.在给Topic的赋权read,即消费Topic的时候,client.properties必须带上参数group.id,然后这个group.id的值必须也同样赋权,如:
kafka-sentry -gpr -r kafka_role -p "CONSUMERGROUP=testgroup->action=ALL"
否则依旧无法消费Topic
7.一旦对Kafka启用Sentry授权以后,kafka用户就是管理员,一切管理员操作都需要使用kafka用户来操作,这个与我们在Hive/Impala中使用Sentry时,hive用户是管理员原理是一样的,Fayson之前介绍Solr的Sentry赋权时,solr用户就是默认管理员,也是一样。
8.Fayson会在后续的文章继续介绍Kafka的Host资源授权相关。
参考:
https://www.cloudera.com/documentation/kafka/latest/topics/kafka_security.html#using_kafka_with_sentry