2016-09-02 128 views
0

我使用星火1.3.1,我已經寫了一個小程序,對Cassandra的過濾數據,如卡桑德拉星火連接器和過濾數據

val sc = new SparkContext(conf) 
val rdd = sc.cassandraTable("foo", "bar") 
val date = DateTime.now().minusHours(1) 
val rdd2 = rdd.filter(r => r.getDate("date").after(date.toDate)) 
println(rdd2.count()) 
sc.stop() 

這個程序很長一段時間運行,打印郵件

16/09/01 21:10:31 INFO Executor: Running task 46.0 in stage 0.0 (TID 46) 
16/09/01 21:10:31 INFO TaskSetManager: Finished task 42.0 in stage 0.0 (TID 42) in 20790 ms on localhost (43/1350) 

如果我終止這個程序,我的代碼更改爲

val date = DateTime.now().minusHours(1) 
val rdd2 = rdd.filter(r => r.getDate("date").after(date.toDate)) 

它仍然運行了很LO NG時間的消息像

6/09/01 21:14:01 INFO Executor: Running task 8.0 in stage 0.0 (TID 8) 
16/09/01 21:14:01 INFO TaskSetManager: Finished task 4.0 in stage 0.0 (TID 4) in 19395 ms on localhost (5/1350) 

所以它看起來像程序將始終嘗試加載整個卡桑德拉表中存儲(或嘗試完全掃描的話),然後才應用濾鏡。這對我來說似乎極其低效。

如何以更好的方式編寫此代碼,以便spark不會嘗試將整個cassandra表(或完全掃描它)加載到RDD中,然後應用過濾器?

回答

1

你的代碼

val rdd = sc.cassandraTable("foo", "bar") 
val date = DateTime.now().minusDays(30) 
rdd.filter(r => r.getDate("date").after(date.toDate)).count // Count Filtered RDD 

第一塊所以,要小心。 RDD是不可變的,因此當您應用過濾器時,您需要使用返回的RDD,而不是您應用該函數的RDD。


val rdd = sc.cassandraTable("foo", "bar") 
val date = DateTime.now().minusDays(30) 
rdd.filter(r => r.getDate("date").after(date.toDate)) // Filters RDD 
println(rdd.cassandraCount()) // Ignores filtered rdd and counts everything 

更多efficency從卡桑德拉閱讀:

如果您的日期欄是一個聚集鍵可以使用.where功能謂詞下推卡桑德拉。除此之外,你可以做的修剪數據服務器端的事情不多。

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/3_selection.md#filtering-rows---where

+0

如何檢查日期列是否爲集羣密鑰?有一些我可以發出的命令嗎? –

+0

我試過你的建議,但將過濾器的結果分配給rdd2,然後對此進行計數。但它仍然在說'完成的任務4.0在階段0.0(TID 4)在112031毫秒在本地主機(5/1350)' –

+1

集羣密鑰是一個概念組織在Cassandra磁盤上的信息。這是你的Cassandra Schema的核心部分。既然你沒有發佈你的代碼,我無法回答爲什麼它會花費任何時間。你應該看到它通過許多任務。但幾乎沒有任何情況下,它會比沒有下推的全表掃描慢。 – RussS