0

我有一個Spark Streaming應用程序,它正在讀取來自Kafka中單一主題的數據,並根據元素的內容處理它,並將它插入到Cassandra中的兩個不同的鍵空間中。一些數據可能會去KEYSPACE A,其他一些以KEYSPACE B.如果其他在Spark Streaming

我做目前使用的過濾器操作:

Functions.insertToCassandra(rdd.filter(element => element.tenant=="A"), keyspace = A, table = "tableName") 
Functions.insertToCassandra(rdd.filter(element => element.tenant=="B"), keyspace = B, table = "tableName") 

所以過濾器在每個RDD應用,那些有租戶領域的元素去密鑰空間A和擁有租戶字段B的密鑰空間B轉到密鑰空間B.

有沒有更有效的方法來做到這一點,而不是使用2次過濾操作(特別是因爲以後可能會有2個以上的密鑰空間)?在過濾器操作之前緩存rdd是否會提高性能?

我再說一遍,我有來自Kafka的DStream,我處理它,然後在「foreachRDD」操作中,我從上面的代碼片段向Cassandra插入數據。

謝謝

回答

0

你做

Functions.insertToCassandra(rdd.filter(element => element.tenant=="A"), keyspace = A, "tableName") 
Functions.insertToCassandra(rdd.filter(element=> element.tenant=="B"), keyspace = B, "tableName") 

之前,請務必做rdd.cache()

當你在做類似上面,你的火花正試圖兩次讀取數據RDD。 除非您緩存或廣播它,否則Spark永遠不會保留內存中的任何rdd。

如果數據集不是很大,另一種方法是一次讀取所有數據並緩存它。然後使用groupByKey,在這種情況下,key將是您的keyspace(element)。

+0

感謝您的回答。 我應該在過濾器轉換之後添加「rdd.unpersist(true)」,將其從內存中釋放出來嗎? –

+0

你可以,但是如果你在方法裏面有這段代碼的話。然後,一旦您超出該方法,它會自動將其從內存中移除。 –

+0

如果我沒有記錯的話,它也是非持久性的,它會將結果保存在驅動程序的內存中,並將其從工作人員內存中刪除。另一方面摧毀,從各處去除它。 –