Datastax DSE Cassandra,Spark,Shark,Standalone Programm

Datastax DSE Cassandra,Spark,Shark,Standalone Programm

问题描述:

我使用Datastax Enterprise 4.5。我希望我做了正确的配置,我在datastax网站上解释过。我可以使用Windows服务写入Cassandra数据库,这可行,但我无法使用where函数使用Spark进行查询。Datastax DSE Cassandra,Spark,Shark,Standalone Programm

我用“./dse cassandra -k -t”(位于/ bin文件夹中)启动Cassandra节点(只有一个用于测试目的),所以hadoop和spark都同时运行。我可以毫无问题地写入卡桑德拉。

所以当'where'不是RowKey时,你不能在Cassandra查询中使用'where'子句。所以我需要使用Spark/Shark。我可以启动并使用鲨鱼(./dse shark)所需的所有查询,但我需要用Scala或Java编写独立程序。

所以,我想这个链接:https://github.com/datastax/spark-cassandra-connector

我可以查询简单的语句,如:

val conf = new SparkConf(true) 
    .set("spark.cassandra.connection.host", "MY_IP") 
    .setMaster("spark://MY_IP:7077") 
    .setAppName("SparkTest") 

// Connect to the Spark cluster: 
lazy val sc = new SparkContext(conf) 

val rdd = sc.cassandraTable("keyspace", "tablename") 
println(rdd.first) 

,这工作得很好,但如果我要求更多的行或计数:

println(rdd.count) 
rdd.toArray.foreach(println) 

然后我得到这个例外:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: All masters are unresponsive! Giving up. 
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031) 
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635) 
at scala.Option.foreach(Option.scala:236) 
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635) 
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1234) 
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) 
at akka.actor.ActorCell.invoke(ActorCell.scala:456) 
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) 
at akka.dispatch.Mailbox.run(Mailbox.scala:219) 
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) 
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

当我在Java中尝试这个时,我遇到了同样的问题。有谁知道这个问题?我不知道数据库配置是否正确,或者如果scala/Java程序工作正确。也许有些港口被*,但7077和4040已经开放。

旁注:如果我的卡珊德拉DB开始的火花,我可以做查询:

sc.cassandraTable("test","words").select("word").toArray.foreach(println) 

但是,如果我用一个“where”语句,如:

sc.cassandraTable("test","words").select("word").where("word = ?","foo").toArray.foreach(println) 

我得到这个例外:

java.io.IOException: Exception during query execution: SELECT "word" FROM "test"."words" WHERE token("word") > 0 AND word = ? ALLOW FILTERING 

你有想法为什么?我以为我可以使用spark中的where子句?

谢谢!

到目前为止,这是我的解决方案。这不是我所有问题的答案,但它适用于我,我想分享给你。

我使用hive jdbc驱动程序使用Java访问SharkServer。它是如何工作的:

开始sharkserver:bin/dse shark --service sharkserver -p <port>

依赖Maven的:

<dependency> 
    <groupId>org.apache.hive</groupId> 
    <artifactId>hive-jdbc</artifactId> 
    <version>0.13.1</version> 
</dependency> 
<dependency> 
    <groupId>org.apache.hadoop</groupId> 
    <artifactId>hadoop-core</artifactId> 
    <version>0.20.2</version> 
</dependency> 

Java代码:

import java.sql.Connection; 
import java.sql.DriverManager; 
import java.sql.ResultSet; 
import java.sql.SQLException; 
import java.sql.Statement; 

public class HiveJdbcClient { 
    private static String driverName = "org.apache.hadoop.hive.jdbc.HiveDriver"; 

    public static void main(String[] args) throws SQLException { 
    try { 
     Class.forName(driverName); 
    } catch (ClassNotFoundException e) { 
     e.printStackTrace(); 
     System.exit(1); 
    } 
    Connection con = DriverManager.getConnection("jdbc:hive://YOUR_IP:YOUR_PORT/default", "", ""); 
    Statement stmt = con.createStatement(); 
    String sql; 
    ResultSet res; 



    sql = "SELECT * FROM keyspace.colFam WHERE name = 'John'"; 
    res = stmt.executeQuery(sql); 
    while (res.next()) { 
     System.out.println(res.getString("name")); 
    } 
} 
} 

All masters are unresponsive! 

暗示您尝试连接的IP实际上并未被spark所约束。所以这基本上是一个网络配置错误。扫描以查看哪些接口正在监听7077,并确保连接到正确的接口。

至于第二个问题,where算子意味着你要对该子句做一个谓词下推。目前,您无法使用主键执行此操作。如果您想在单个主键上使用where,则可以使用filter来完成此操作,但您不会看到优秀的性能,因为这样做会执行整个表扫描。

+1

版本的连接器1.1将在'其中主键支持'。这个补丁已经提交给主分支。 – 2014-09-01 19:18:33

+0

但是,当“所有主人都没有反应!”为什么它会像'.first'这样的单个查询工作。这意味着连接和端口工作正常,对吧?还是有谬误?如果使用'.first',它是否以不同的方式工作(使用其他端口等)? – richie676 2014-09-02 06:32:22

+0

这意味着连接和端口工作正常,对不对?还是有谬误?如果使用'.first',它是否以不同的方式工作(使用其他端口等)? 我已经看到'.filter'函数,但是这会将所有数据加载到程序中并在那里进行筛选,但这肯定会变慢。 我只想在我的独立程序中具有与鲨鱼相同的功能,我认为spark会这么做。如果不是,请告诉我用什么来代替。 – richie676 2014-09-02 06:48:23