2014-09-01 117 views
2

我使用Datastax Enterprise 4.5。我希望我做了正確的配置,我在datastax網站上解釋過。我可以使用Windows服務寫入Cassandra數據庫,這可行,但我無法使用where函數使用Spark進行查詢。Datastax DSE Cassandra,Spark,Shark,Standalone Programm

我用「./dse cassandra -k -t」(位於/ bin文件夾中)啓動Cassandra節點(只有一個用於測試目的),所以hadoop和spark都同時運行。我可以毫無問題地寫入卡桑德拉。

所以當'where'不是RowKey時,你不能在Cassandra查詢中使用'where'子句。所以我需要使用Spark/Shark。我可以啓動並使用鯊魚(./dse shark)所需的所有查詢,但我需要用Scala或Java編寫獨立程序。

所以,我想這個鏈接:https://github.com/datastax/spark-cassandra-connector

我可以查詢簡單的語句,如:

val conf = new SparkConf(true) 
    .set("spark.cassandra.connection.host", "MY_IP") 
    .setMaster("spark://MY_IP:7077") 
    .setAppName("SparkTest") 

// Connect to the Spark cluster: 
lazy val sc = new SparkContext(conf) 

val rdd = sc.cassandraTable("keyspace", "tablename") 
println(rdd.first) 

,這工作得很好,但如果我要求更多的行或計數:

println(rdd.count) 
rdd.toArray.foreach(println) 

然後我得到這個例外:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: All masters are unresponsive! Giving up. 
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031) 
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635) 
at scala.Option.foreach(Option.scala:236) 
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635) 
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1234) 
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) 
at akka.actor.ActorCell.invoke(ActorCell.scala:456) 
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) 
at akka.dispatch.Mailbox.run(Mailbox.scala:219) 
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) 
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

當我在Java中嘗試這個時,我遇到了同樣的問題。有誰知道這個問題?我不知道數據庫配置是否正確,或者如果scala/Java程序工作正確。也許有些港口被封鎖,但7077和4040已經開放。

旁註:如果我的卡珊德拉DB開始的火花,我可以做查詢:

sc.cassandraTable("test","words").select("word").toArray.foreach(println) 

但是,如果我用一個「where」語句,如:

sc.cassandraTable("test","words").select("word").where("word = ?","foo").toArray.foreach(println) 

我得到這個例外:

java.io.IOException: Exception during query execution: SELECT "word" FROM "test"."words" WHERE token("word") > 0 AND word = ? ALLOW FILTERING 

你有想法爲什麼?我以爲我可以使用spark中的where子句?

謝謝!

回答

0

到目前爲止,這是我的解決方案。這不是我所有問題的答案,但它適用於我,我想分享給你。

我使用hive jdbc驅動程序使用Java訪問SharkServer。它是如何工作的:

開始sharkserver:bin/dse shark --service sharkserver -p <port>

依賴Maven的:

<dependency> 
    <groupId>org.apache.hive</groupId> 
    <artifactId>hive-jdbc</artifactId> 
    <version>0.13.1</version> 
</dependency> 
<dependency> 
    <groupId>org.apache.hadoop</groupId> 
    <artifactId>hadoop-core</artifactId> 
    <version>0.20.2</version> 
</dependency> 

Java代碼:

import java.sql.Connection; 
import java.sql.DriverManager; 
import java.sql.ResultSet; 
import java.sql.SQLException; 
import java.sql.Statement; 

public class HiveJdbcClient { 
    private static String driverName = "org.apache.hadoop.hive.jdbc.HiveDriver"; 

    public static void main(String[] args) throws SQLException { 
    try { 
     Class.forName(driverName); 
    } catch (ClassNotFoundException e) { 
     e.printStackTrace(); 
     System.exit(1); 
    } 
    Connection con = DriverManager.getConnection("jdbc:hive://YOUR_IP:YOUR_PORT/default", "", ""); 
    Statement stmt = con.createStatement(); 
    String sql; 
    ResultSet res; 



    sql = "SELECT * FROM keyspace.colFam WHERE name = 'John'"; 
    res = stmt.executeQuery(sql); 
    while (res.next()) { 
     System.out.println(res.getString("name")); 
    } 
} 
} 
2
All masters are unresponsive! 

暗示您嘗試連接的IP實際上並未被spark所約束。所以這基本上是一個網絡配置錯誤。掃描以查看哪些接口正在監聽7077,並確保連接到正確的接口。

至於第二個問題,where算子意味着你要對該子句做一個謂詞下推。目前,您無法使用主鍵執行此操作。如果您想在單個主鍵上使用where,則可以使用filter來完成此操作,但您不會看到優秀的性能,因爲這樣做會執行整個表掃描。

+1

版本的連接器1.1將在'其中主鍵支持'。這個補丁已經提交給主分支。 – 2014-09-01 19:18:33

+0

但是,當「所有主人都沒有反應!」爲什麼它會像'.first'這樣的單個查詢工作。這意味着連接和端口工作正常,對吧?還是有謬誤?如果使用'.first',它是否以不同的方式工作(使用其他端口等)? – richie676 2014-09-02 06:32:22

+0

這意味着連接和端口工作正常,對不對?還是有謬誤?如果使用'.first',它是否以不同的方式工作(使用其他端口等)? 我已經看到'.filter'函數,但是這會將所有數據加載到程序中並在那裏進行篩選,但這肯定會變慢。 我只想在我的獨立程序中具有與鯊魚相同的功能,我認爲spark會這麼做。如果不是,請告訴我用什麼來代替。 – richie676 2014-09-02 06:48:23

相關問題