2016-07-25 30 views
0

下面是我的代碼片段。我有DStream,我試圖將它保存到HDFS。只是想知道壓縮的有效方法。rdd.saveAsTextFile在foreachRDD內的驅動程序或工作程序上執行。

pairedDStream.foreachRDD { rdd =>   
    val time = Calendar.getInstance.getTimeInMillis; 
    val textOutputFolder = outputDir + "/output-" + time 
    if (args.length == 4) { 
     val compressionCodec = args(3) 
     rdd.saveAsTextFile(textOutputFolder, CommonUtils.getCompressionCodec(compressionCodec)) 
    } else { 
     rdd.saveAsTextFile(textOutputFolder, CommonUtils.getCompressionCodec(null)) 
    } 
    } 

回答

2

rdd.saveAsTextFile的工作器節點上執行,實際上所有RDD操作中dstream.foreachRDD並行地執行。 Spark文檔提到我們應該使用此dstream操作將每個RDD中的數據推送到外部系統。

foreachRDD(FUNC):到每個RDD從流生成施加 功能,FUNC,最通用的輸出操作。此功能 應將每個RDD中的數據推送到外部系統,例如將RDD保存到文件或將其通過網絡寫入數據庫。注意 函數func在運行 流應用程序的驅動程序進程中執行,並且通常會有RDD動作, 將強制計算流RDD。

Design Patterns for using foreachRDD部分還明確指出dstream.foreachRDD是一個強大的原語允許數據被髮送到外部系統。您可以進一步閱讀本節以瞭解如何在dstream中優化對RDD的操作。

希望這會有所幫助!

相關問題