我有一個Spark Streaming應用程序,它正在讀取來自Kafka中單一主題的數據,並根據元素的內容處理它,並將它插入到Cassandra中的兩個不同的鍵空間中。一些數據可能會去KEYSPACE A,其他一些以KEYSPACE B.如果其他在Spark Streaming
我做目前使用的過濾器操作:
Functions.insertToCassandra(rdd.filter(element => element.tenant=="A"), keyspace = A, table = "tableName")
Functions.insertToCassandra(rdd.filter(element => element.tenant=="B"), keyspace = B, table = "tableName")
所以過濾器在每個RDD應用,那些有租戶領域的元素去密鑰空間A和擁有租戶字段B的密鑰空間B轉到密鑰空間B.
有沒有更有效的方法來做到這一點,而不是使用2次過濾操作(特別是因爲以後可能會有2個以上的密鑰空間)?在過濾器操作之前緩存rdd是否會提高性能?
我再說一遍,我有來自Kafka的DStream,我處理它,然後在「foreachRDD」操作中,我從上面的代碼片段向Cassandra插入數據。
謝謝
感謝您的回答。 我應該在過濾器轉換之後添加「rdd.unpersist(true)」,將其從內存中釋放出來嗎? –
你可以,但是如果你在方法裏面有這段代碼的話。然後,一旦您超出該方法,它會自動將其從內存中移除。 –
如果我沒有記錯的話,它也是非持久性的,它會將結果保存在驅動程序的內存中,並將其從工作人員內存中刪除。另一方面摧毀,從各處去除它。 –