Atlas kafka数据导入失败问题与zkUtils中Zookeeper连接解析
Atlas kafka数据导入失败
atlas版本:apache-atlas-1.0.0
Atlas安装完后需要导入hive和kafka的数据
调用Atlas的import-kafka.sh导入kafka数据
结果显示Kafka Data Model imported successfully!!!
但通过atlasWeb界面看到并没有数据导入
查看import-kafka.log日志也没有报错信息
输出日志内容:
2018-09-27 09:31:00,812 INFO - [main:] ~ Looking for atlas-application.properties in classpath (ApplicationProperties:85)
2018-09-27 09:31:00,816 INFO - [main:] ~ Loading atlas-application.properties from jar:file:/opt/apache-atlas/apache-atlas-1.0.0/hook/kafka/atlas-kafka-plugin-impl/atlas-intg-1.0.0.jar!/atlas-application.properties (ApplicationProperties:98)
2018-09-27 09:31:00,848 INFO - [main:] ~ Property (set to default) atlas.graph.cache.db-cache = true (ApplicationProperties:242)
2018-09-27 09:31:00,848 INFO - [main:] ~ Property (set to default) atlas.graph.cache.db-cache-clean-wait = 20 (ApplicationProperties:242)
2018-09-27 09:31:00,850 INFO - [main:] ~ Property (set to default) atlas.graph.cache.db-cache-size = 0.5 (ApplicationProperties:242)
2018-09-27 09:31:00,850 INFO - [main:] ~ Property (set to default) atlas.graph.cache.tx-cache.size = 15000 (ApplicationProperties:242)
2018-09-27 09:31:00,850 INFO - [main:] ~ Property (set to default) atlas.graph.cache.tx-dirty-size = 120 (ApplicationProperties:242)
2018-09-27 09:31:04,472 INFO - [main:] ~ Client has only one service URL, will use that for all actions: http://cdh005:21000 (AtlasBaseClient:291)
2018-09-27 09:31:04,486 INFO - [ZkClient-EventThread-11-cdh003:2181/kafka,cdh005:2181/kafka,cdh006:2181/kafka:] ~ Starting ZkClient event thread. (ZkEventThread:65)
2018-09-27 09:31:04,492 INFO - [main:] ~ Client environment:zookeeper.version=3.4.5-cdh5.9.0--1, built on 10/21/2016 08:08 GMT (Environment:100)
2018-09-27 09:31:04,492 INFO - [main:] ~ Client environment:host.name=cdh005 (Environment:100)
2018-09-27 09:31:04,493 INFO - [main:] ~ Client environment:java.version=1.8.0_144 (Environment:100)
2018-09-27 09:31:04,493 INFO - [main:] ~ Client environment:java.vendor=Oracle Corporation (Environment:100)
2018-09-27 09:31:04,493 INFO - [main:] ~ Client environment:java.home=/usr/java/jdk1.8.0_144/jre (Environment:100)
2018-09-27 09:31:04,494 INFO - [main:] ~ Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (Environment:100)
2018-09-27 09:31:04,495 INFO - [main:] ~ Client environment:java.io.tmpdir=/tmp (Environment:100)
2018-09-27 09:31:04,495 INFO - [main:] ~ Client environment:java.compiler=<NA> (Environment:100)
2018-09-27 09:31:04,495 INFO - [main:] ~ Client environment:os.name=Linux (Environment:100)
2018-09-27 09:31:04,495 INFO - [main:] ~ Client environment:os.arch=amd64 (Environment:100)
2018-09-27 09:31:04,496 INFO - [main:] ~ Client environment:os.version=3.10.0-327.el7.x86_64 (Environment:100)
2018-09-27 09:31:04,496 INFO - [main:] ~ Client environment:user.name=cloudera-scm (Environment:100)
2018-09-27 09:31:04,496 INFO - [main:] ~ Client environment:user.home=/var/lib/cloudera-scm-server (Environment:100)
2018-09-27 09:31:04,496 INFO - [main:] ~ Client environment:user.dir=/opt/apache-atlas/apache-atlas-1.0.0/hook/kafka/atlas-kafka-plugin-impl (Environment:100)
2018-09-27 09:31:04,496 INFO - [main:] ~ Initiating client connection, connectString=cdh003:2181/kafka,cdh005:2181/kafka,cdh006:2181/kafka sessionTimeout=60000 [email protected] (ZooKeeper:438)
2018-09-27 09:31:04,508 INFO - [main:] ~ Waiting for keeper state SyncConnected (ZkClient:936)
2018-09-27 09:31:04,510 INFO - [main-SendThread(cdh003:2181):] ~ Opening socket connection to server cdh003/172.18.70.41:2181. Will not attempt to authenticate using SASL (unknown error) (ClientCnxn$SendThread:975)
2018-09-27 09:31:04,563 INFO - [main-SendThread(cdh003:2181):] ~ Socket connection established, initiating session, client: /172.18.80.81:51920, server: cdh003/172.18.70.41:2181 (ClientCnxn$SendThread:852)
2018-09-27 09:31:04,580 INFO - [main-SendThread(cdh003:2181):] ~ Session establishment complete on server cdh003/172.18.70.41:2181, sessionid = 0x26602e478142f04, negotiated timeout = 60000 (ClientCnxn$SendThread:1235)
2018-09-27 09:31:04,582 INFO - [main-EventThread:] ~ zookeeper state changed (SyncConnected) (ZkClient:713)
可以看到开始执行时,会现在jar包中寻找atlas-application.properties配置文件,但是已经把配置文件加入jar包中了。
猜测是zookeeper连接的问题,然而根据日志显示,zookeeper连接成功,没有报错,最终也显示Kafka Data Model imported successfully!!!
判断是因为根本没有拿到kafka的topic数据,所以最后虽然显示成功,但实际Atlas中并没有数据
日志看不到,只好把atlas源码撸下来看看kafka数据是怎么导入的了,通过IDEA导入Atlas的源码。导入kafka数据的bridge在如下路径
查看源码发现KafkaBridge中获取topic的方式如下:
this.zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), JaasUtils.isZkSecurityEnabled());
this.availableTopics = scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics());
通过zkUtils.getAllTopics()方法
为了方便测试,把这段逻辑单独拉出来试试看能否获取到topic信息
public static void main(String[] args) throws AtlasException {
String connection = "cdh003:2181/kafka,cdh005:2181/kafka,cdh006:2181/kafka";
ZkClient client = new ZkClient(connection, 10 * 1000);
ZkUtils zkUtils = new ZkUtils(client, new ZkConnection(connection), false);
List<String> list = JavaConversions.seqAsJavaList(zkUtils.getAllTopics());
System.out.println(list);
}
执行,果然发现获取到的list大小为0,猜测可能是连接路径的问题,继续深入源码,发现在构造ZkConnection对象时,调用它的connect方法会创建一个Zookeeper对象
public void connect(Watcher watcher) {
this._zookeeperLock.lock();
try {
if (this._zk != null) {
throw new IllegalStateException("zk client has already been started");
}
try {
LOG.debug("Creating new ZookKeeper instance to connect to " + this._servers + ".");
//此处创建Zookeeper对象
this._zk = new ZooKeeper(this._servers, this._sessionTimeOut, watcher);
} catch (IOException var6) {
throw new ZkException("Unable to connect to " + this._servers, var6);
}
} finally {
this._zookeeperLock.unlock();
}
}
new Zookeeper()对象构造方法如下
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
boolean canBeReadOnly)
throws IOException
{
LOG.info("Initiating client connection, connectString=" + connectString
+ " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
watchManager.defaultWatcher = watcher;
//注意这里,解析连接的字符串
ConnectStringParser connectStringParser = new ConnectStringParser(
connectString);
HostProvider hostProvider = new StaticHostProvider(
connectStringParser.getServerAddresses());
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), canBeReadOnly);
cnxn.start();
}
其中调用了ConnectStringParser解析连接字符串,ConnectStringParser的构造方法如下
public ConnectStringParser(String connectString) {
// parse out chroot, if any
//首先获取'/'符号第一次出现的index
int off = connectString.indexOf('/');
if (off >= 0) {
//通过该index 截取出chrootPath和connectString,看到这里原因就已经很明了了
//前面我们使用的连接字符串为connect = "cdh003:2181/kafka,cdh005:2181/kafka,cdh006:2181/kafka"
//所以截取出来的chrootPath为/kafka,cdh005:2181/kafka,cdh006:2181/kafka
//要获取topic信息的zk根路径为/kafka,因此获取不到topic信息
String chrootPath = connectString.substring(off);
// ignore "/" chroot spec, same as null
if (chrootPath.length() == 1) {
this.chrootPath = null;
} else {
PathUtils.validatePath(chrootPath);
this.chrootPath = chrootPath;
}
//截取出connectString
connectString = connectString.substring(0, off);
} else {
this.chrootPath = null;
}
String hostsList[] = connectString.split(",");
for (String host : hostsList) {
int port = DEFAULT_PORT;
int pidx = host.lastIndexOf(':');
if (pidx >= 0) {
// otherwise : is at the end of the string, ignore
if (pidx < host.length() - 1) {
port = Integer.parseInt(host.substring(pidx + 1));
}
host = host.substring(0, pidx);
}
serverAddresses.add(InetSocketAddress.createUnresolved(host, port));
}
}
重新修改连接字符串为"cdh003:2181,cdh005:2181,cdh006:2181/kafka"
,则截取出的chroot为 /kafka
,connectString为cdh003:2181,cdh005:2181,cdh006:2181
,然后再根据 ,和: 分割出多个连接的serverAddresses
测试获取topic信息成功,在服务器上修改atlas-application.properties的配置
将配置文件重新打入jar包
zip -u kafka-bridge-1.0.0.jar atlas-application.properties
执行import-kafka.sh,导入成功!!