2017-02-03 20 views
1

我正在使用Spark Streaming從Kafka中讀取DStream的JSON字符串。我需要以JSON格式將輸入數據保存在S3上。這是我這樣做的方式,但是當批量數據量大約爲5Mb時,執行此代碼需要很長時間。可以優化嗎? 我需要保存在JSON文件中的數據(*.json),因爲使用這些文件進行讀取如下另一方案:將RDD另存爲JSON文件的問題,考慮到每個RDD的大小不會超過10 Mb

var df = sqlContext.read.json("s3n://" + bucketNameData + "/" + directoryS3 + "/*.json")

因此,爲了RDD另存爲的單層JSON文件,我嘗試了rdd.map(lambda x :json.loads(x)) .coalesce(1, shuffle=True).saveAsTextFile('examples/src/main/resources/demo.json'),但是它以一種不錯的方式保存數據,並且據我所知,我希望能夠像以上所示的那樣讀取它們(從json文件獲取df)。因此,我切換到amazonS3Client,但我覺得它可能會優化。也許我應該將rdd轉換爲DataFrame左右,然後以某種方式將它保存爲JSON?

val mySet = ssc.sparkContext.broadcast(Map("metadataBrokerList"->metadataBrokerList, 
              "bucketNameData"->bucketNameData, 
              "bucketNameCode"->bucketNameCode)) 

dstreamdata.foreachRDD(rdd => { 
     if (!rdd.isEmpty()) { 
     rdd.foreachPartition { iter => 
     val producer = UtilsTest.createProducer(mySet.value("metadataBrokerList")) 
     val amazonS3Client = UtilsTest.createS3() 
     iter.foreach { msg => 
      if (msg.nonEmpty) { 
      // Save messages to S3 
      val CONTENT_TYPE = "application/json" 
      val fileContentBytes = msg.getBytes(StandardCharsets.UTF_8) 
      val fileInputStream = new ByteArrayInputStream(fileContentBytes) 
      val metadata = new ObjectMetadata() 
      metadata.setContentType(CONTENT_TYPE) 
      metadata.setContentLength(fileContentBytes.length) 
      val datetime = Calendar.getInstance.getTime 
      val formatter = new SimpleDateFormat("yyyy-MMM-dd-HH-mm-ss") 
      val setID = formatter.format(datetime) 
      val filePath = mySet.value("bucketNameData") + "/file_" + setID + ".json" 
      val putObjectRequest = new PutObjectRequest(mySet.value("bucketNameData"), filePath, fileInputStream, metadata) 
      amazonS3Client.putObject(putObjectRequest) 

//--- 

回答

0

您的直接putObject()會更有效,因爲通常用於提交工作的rename()中沒有副本。在處理失敗和重播工作時,你可能會遇到問題。

如果你使用newHadooopRDD,你可以顯著減少開銷提交算法2:

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2

+0

謝謝您的回答。如果你使用newHadooopRDD,你是什麼意思?我可以在我的'pom.xml'中集成一些庫嗎? – Dinosaurius

+0

不,它意味着你用'SparkContext.newAPIHadoopRDD()創建RDD' –

相關問題