2017-02-24 67 views
0

我們使用foreachRDD將spark數據寫入AmazonDynamoDB,但速度非常慢,我們的消耗速率爲10,000/sec,寫入10,000需要35min ...是一段代碼..Spark:如何使用foreachPartition對數據庫進行調用

tempRequestsWithState is Dstream 

    tempRequestsWithState.foreachRDD { rdd => 

     if ((rdd != null) && (rdd.count() > 0) && (!rdd.isEmpty())) { 

      rdd.foreachPartition { 

      case (topicsTableName, hashKeyTemp, attributeValueUpdate) => { 

       val client = new AmazonDynamoDBClient() 
       val request = new UpdateItemRequest(topicsTableName, hashKeyTemp, attributeValueUpdate) 
       try client.updateItem(request) 


      catch { 

       case se: Exception => println("Error executing updateItem!\nTable ", se) 

      } 

      } 

      case null => 


      } 
     } 
     } 

從研究得知,使用foreachpartition和創建每個分區的連接將有助於..但不知道如何去寫代碼,它..will很大,如果有人可以欣賞幫助... ...此外任何其他建議,以加快寫作是非常感謝

回答

0

每個分區由執行程序(jvm進程)處理。所以在裏面你可以編寫代碼來初始化數據庫連接並寫入數據庫。在給出的代碼中,第一個case()的行是你編寫代碼的地方。因此,當你獲得更多的分區時,如果你有多個執行者寫入數據庫,這將並行完成。

rdd.foreachPartition { 
    //initialize database cnx 
    //write to db 
    //close connection 
} 
0

最好是使用一個單一的分區在db和單寫初始化CNX,爲了降低分貝連接的號碼,在foreachPartition功能使用寫入用 批次以增加插入線的數目。

rdd.repartition(1).foreachPartition { 
    //get singleton instance cnx 
    //write with batche to db 
    //close connection 
} 
相關問題