0
我使用星火1.3.1,我已經寫了一個小程序,對Cassandra的過濾數據,如卡桑德拉星火連接器和過濾數據
val sc = new SparkContext(conf)
val rdd = sc.cassandraTable("foo", "bar")
val date = DateTime.now().minusHours(1)
val rdd2 = rdd.filter(r => r.getDate("date").after(date.toDate))
println(rdd2.count())
sc.stop()
這個程序很長一段時間運行,打印郵件
16/09/01 21:10:31 INFO Executor: Running task 46.0 in stage 0.0 (TID 46)
16/09/01 21:10:31 INFO TaskSetManager: Finished task 42.0 in stage 0.0 (TID 42) in 20790 ms on localhost (43/1350)
如果我終止這個程序,我的代碼更改爲
val date = DateTime.now().minusHours(1)
val rdd2 = rdd.filter(r => r.getDate("date").after(date.toDate))
它仍然運行了很LO NG時間的消息像
6/09/01 21:14:01 INFO Executor: Running task 8.0 in stage 0.0 (TID 8)
16/09/01 21:14:01 INFO TaskSetManager: Finished task 4.0 in stage 0.0 (TID 4) in 19395 ms on localhost (5/1350)
所以它看起來像程序將始終嘗試加載整個卡桑德拉表中存儲(或嘗試完全掃描的話),然後才應用濾鏡。這對我來說似乎極其低效。
如何以更好的方式編寫此代碼,以便spark不會嘗試將整個cassandra表(或完全掃描它)加載到RDD中,然後應用過濾器?
如何檢查日期列是否爲集羣密鑰?有一些我可以發出的命令嗎? –
我試過你的建議,但將過濾器的結果分配給rdd2,然後對此進行計數。但它仍然在說'完成的任務4.0在階段0.0(TID 4)在112031毫秒在本地主機(5/1350)' –
集羣密鑰是一個概念組織在Cassandra磁盤上的信息。這是你的Cassandra Schema的核心部分。既然你沒有發佈你的代碼,我無法回答爲什麼它會花費任何時間。你應該看到它通過許多任務。但幾乎沒有任何情況下,它會比沒有下推的全表掃描慢。 – RussS