Apache Kafka带火花的安全和非安全连接1.6.3
问题描述:
尝试使用启用Kerberos的Apache Kafka(0.9)与Apache spark 1.6.3时发生错误.Zookeeper版本为3.4.5 我必须连接两个kafka。一个是启用keberos,另一个不是,所以我没有在spark executor的额外java opts中设置java.security.auth.login.config属性。Apache Kafka带火花的安全和非安全连接1.6.3
Kafka Initialization failed: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:648)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:542)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:524)
at com.spark.receiver.helper.KafkaChannelHelper.initializeConnection(KafkaChannelHelper.java:277)
at com.spark.receiver.helper.KafkaChannelHelper$2.run(KafkaChannelHelper.java:240)
Caused by: org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in `/home/user/kafka_client.conf`.
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:74)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:60)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:79)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:577)
... 4 more
Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in `/home/user/kafka_client.conf`.
at org.apache.kafka.common.security.kerberos.Login.login(Login.java:294)
at org.apache.kafka.common.security.kerberos.Login.<init>(Login.java:104)
at org.apache.kafka.common.security.kerberos.LoginManager.<init>(LoginManager.java:44)
at org.apache.kafka.common.security.kerberos.LoginManager.acquireLoginManager(LoginManager.java:85)
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:55)
... 7 more
java.security.auth.login.config被设置在消费者,其连接到kafkaConsumer itself.The代码是:
public void initializeConnection() {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
System.setProperty("java.security.auth.login.config", jassFilePath);
try {
this.consumer = new KafkaConsumer<String, byte[]>(props);
} catch (Exception e) {
LOGGER.error("Kafka Initialization failed: ", e);
}
}
kafka_client.conf只包含以下部分:
KafkaClient{
com.sun.security.auth.module.Krb5LoginModule required
debug=true
useKeyTab=true
keyTab="/etc/security/keytabs/user.keytab"
storeKey=true
principal="[email protected]"
serviceName="kafka";
};
答
在向安全环境发布数据/从安全环境使用数据之前应考虑两件事: -
- 配置security.protocol
Properties props = new Properties(); props.put("security.protocol", "PLAINTEXTSASL");
- 传递JAAS配置与Java虚拟机选项
java -Djava.security.auth.login.config=/home/kafka-user/kafka-jaas.conf \ -Djava.security.krb5.conf=/etc/krb5.conf \ -Djavax.security.auth.useSubjectCredsOnly=false \ -cp hdp-kafka-sample-1.0-SNAPSHOT.jar:/usr/hdp/current/kafka-broker/libs/* \ hdp.sample.KafkaProducer one.hdp:6667 test
检查在secure-kafka-java-producer-with-kerberos全交代一起。