2015-11-24 24 views
1

我從cassandra獲取一些數據,當數據足夠大並且無法在內存中緩存一次時,我必須使用spark.cassandra.input.split.size_in_mb來設置機器可以獲得的數據量有多大。但我也想用緩存更多的時間數據,該代碼喜歡這樣的:當數據很大時,如何緩存來自cassandra的數據以啓動?

val conf = new SparkConf().setAppName("CassandraLogAnalyse") 
    .set("spark.cassandra.connection.host", "xxx") 
    .set("spark.cassandra.auth.username", "xxx") 
    .set("spark.cassandra.auth.password", "xxx") 
    .set("spark.cassandra.input.split.size_in_mb",'512') 
//Select Data from cassandra 
val sc = new SparkContext(conf) 

val loggly_http_in = sc.cassandraTable("loggly", "http_in").select("uid", "cjj_id", "request_uri", "request_body").where("app_context = ? and log_time > ?", "news", batch_time) 

loggly_http_in.cache() 

val rdd1 = loggly_http_in.map(...).filter(...)...... 
val rdd2 = loggly_http_in.map(...).filter(...)...... 

它是正確的嗎? 如果它是正確的,它是如何工作的? 當它錯了,什麼是正確的方法?

回答

1

spark.cassandra.input.split.size_in_mb設置與緩存無關。這個設置決定了每個Spark分區的大小。如果將其設置得太大,則可能會得到太少任務,並且一些節點可能仍未使用。如果將其設置得太低,您將從任務計劃中獲得更多開銷。

Spark可以緩存RDD(和多個RDD)的多個分區。因此,當您撥打cache()時,它會嘗試緩存儘可能多的RDD分區,因爲它可以找到可用的內存。如果您需要高速緩存而不是實際高速緩存,唯一的方法是將更多的Spark羣集內存分配給您的應用程序。

您使用cache看起來不錯。

不要忘記,你也可以緩存任何轉換後的RDD。例如。在過濾後緩存RDD可能需要比緩存從Cassandra獲取的原始RDD更少的內存。

+1

謝謝,我也有一個問題,這個頁面[鏈接](https://github.com/datastax/spark-cassandra-connector/blob/master/doc/FAQ.md)說:「這種方法確保單一C *分區請求將始終創建一個單獨的Spark任務。其中帶in的子句也會生成一個Spark分區。「如果我使用「where」並且只返回一個分區,那麼spark.cassandra.input.split.size_in_mb可以做什麼?或者我必須更改cassandra的結構並返回更多分區,只有這樣spark.cassandra.input.split.size_in_mb才能工作? –

+1

如果你返回一個Cassandra分區,那麼它不能被分割,因爲它只「佔據」環上的一個標記。連接器通過令牌分割表格。根據聚類列的範圍選擇理論上可以分割單個分區,但是我們還沒有找到一種方法可以對所有數據模型進行可靠和普遍的操作。在這裏也沒有多少好處,因爲您可以從單個分區獲得的最高數據本地並行性將僅等於複製因子。 –

相關問題