3

我有一個關閉kafka的流應用程序,我想知道是否有一種方法可以從地圖功能內進行範圍查詢?從火星執行者查詢卡桑德拉

我將來自kafka的消息按時間範圍和密鑰分組,然後基於這些時間範圍和密鑰我想從cassandra中將數據提取到該dstream中。

喜歡的東西:

lookups 
    .map(lookup => ((lookup.key, lookup.startTime, lookup.endTime), lookup)) 
    .groupByKey() 
    .transform(rdd => { 
    val cassandraSQLContext = new CassandraSQLContext(rdd.context) 
    rdd.map(lookupPair => { 
     val tableName = //variable based on lookup 
     val startTime = aggLookupPair._1._2 
     val endTime = aggLookupPair._1._3 

     cassandraSQLContext 
     .cassandraSql(s"SELECT * FROM ${CASSANDRA_KEYSPACE}.${tableName} WHERE key=${...} AND start_time >= ${startTime} AND start_time < ${endTime};") 
     .map(row => { 
      //match to { 
      case /*case 1*/ => new object1(row) 
      case /*case 2*/ =>new object2(row) 
      } 
     }) 
     .collect() 
    }) 
    }) 

這給了我一個空指針異常:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 59.0 failed 1 times, most recent failure: Lost task 0.0 in stage 59.0 (TID 63, localhost): java.lang.NullPointerException 
at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:231) 
at org.apache.spark.sql.cassandra.CassandraSQLContext.cassandraSql(CassandraSQLContext.scala:70) 
at RollupFineGrainIngestionService$$anonfun$11$$anonfun$apply$2.apply(MyFile.scala:130) 
at RollupFineGrainIngestionService$$anonfun$11$$anonfun$apply$2.apply(MyFile.scala:123) 
at scala.collection.Iterator$$anon$11.next(Iterator.scala:370) 
at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:285) 
at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171) 
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) 
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 

我也試着ssc.cassandraTable(CASSANDRA_KEYSPACE, tableName).where("key = ?", ...)...但試圖訪問一個地圖內的StreamingContext時引發崩潰。

如果有人有任何建議,我將不勝感激。謝謝!

回答

2

如果您的查詢基於分區鍵,則可能需要使用joinWithCassandraTable

但是如果你需要更多的靈活性

CassandraConnector(sc.getConf).withSessionDo(session => ...) 

將允許您訪問遺囑執行人的會話池,並執行任何你想不管理連接。代碼都是可序列化的,可以放在地圖中。

+0

我不認爲'SparkConf'是可序列化的,我遇到了序列化問題。此外,我試圖避免joinWithCassandraTable,因爲我不能做範圍查詢。 – nickn

+0

您可以使用帶有JoinWithCassandraTable的範圍查詢來接受CassandraTableRDD接受的所有子句。 和CassandraConnector是可序列化的 val cc = CassandraConnector(sc.getConf) 然後在任何你喜歡的地方使用cc – RussS

+0

CassandraConnector工作,謝謝! – nickn