2016-04-28 122 views
0

我正在運行一個spark工作,其中一些數據從cassandra表中加載。從這些數據中,我做了一些插入和刪除語句。 並執行它們。 (使用的forEach)session.execute()沒有反映在cassandra上完成火花集羣

boolean deleteStatus= connector.openSession().execute(delete).wasApplied(); 
boolean insertStatus = connector.openSession().execute(insert).wasApplied(); 
System.out.println(delete+":"+deleteStatus); 
System.out.println(insert+":"+insertStatus); 

當我在本地運行它,我看到表中的相應的結果。

但是,當我在羣集上運行它時,有時會顯示結果並且有時候不會發生更改。 我看到了來自web-ui的spark的標準輸出,並且這兩個查詢都打印了查詢以及true。 (。數據被正確加載,但有時,只能插入被反射,有時只刪除,有時兩者,並且大多數時候都不)

規格:

  1. 上同樣的機器作爲火花從站cassandra節點(每個節點有兩個從機實例)
  2. 在另一臺機器上運行spark master。
  3. 修復在所有節點上完成。
  4. 卡桑德拉重啓

回答

0

布爾deleteStatus = connector.openSession()執行(刪除).wasApplied();

boolean insertStatus = connector.openSession()。execute(insert).wasApplied();

這是一個已知的反模式,您創建的每個查詢,這是非常昂貴的一個新的Session對象。

只需創建一次會話並將其重新用於所有查詢。

要查看正在執行並送往卡桑德拉查詢,使用慢速查詢記錄器功能作爲一個黑客:http://datastax.github.io/java-driver/manual/logging/#logging-query-latencies

的想法是把閾值設置到一個低得可笑的值,使得每一個查詢將被視爲緩慢並顯示在日誌中。

你應該使用這個技巧只爲當然

+0

測試它僅用於測試目的而進行的。 實際上,每個分區打開一個會話(forEachPartition),然後(forEachRemaining)打開Iterator