2015-05-30 39 views
3

我使用Spark插入到HBase,但速度很慢。對於60,000條記錄,需要2-3分鐘。我有大約1000萬條記錄要保存。Spark插入到HBase慢

object WriteToHbase extends Serializable { 
    def main(args: Array[String]) { 
     val csvRows: RDD[Array[String] = ... 
     val dateFormatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss") 
     val usersRDD = csvRows.map(row => { 
      new UserTable(row(0), row(1), row(2), row(9), row(10), row(11)) 
     }) 
     processUsers(sc: SparkContext, usersRDD, dateFormatter) 
    }) 
} 

def processUsers(sc: SparkContext, usersRDD: RDD[UserTable], dateFormatter: DateTimeFormatter): Unit = { 

    usersRDD.foreachPartition(part => { 
     val conf = HBaseConfiguration.create() 
     val table = new HTable(conf, tablename) 

     part.foreach(userRow => { 
      val id = userRow.id 
      val name = userRow.name 
      val date1 = dateFormatter.parseDateTime(userRow.date1) 
      val hRow = new Put(Bytes.toBytes(id)) 
      hRow.add(cf, q, Bytes.toBytes(date1)) 
      hRow.add(cf, q, Bytes.toBytes(name)) 
      ... 
      table.put(hRow) 
     }) 
     table.flushCommits() 
     table.close() 
    }) 
} 

我在使用這種火花提交:

--num-executors 2 --driver-memory 2G --executor-memory 2G --executor-cores 2 

回答

3

這是緩慢的,因爲實現不充分利用數據的接近;服務器中的一部分Spark RDD可能會傳輸到另一臺服務器上運行的HBase RegionServer。

當前沒有Spark的RRD操作以有效的方式使用HBase數據存儲。

+0

如果您正在閱讀HBase,Spark實際上可以使用數據局部性。 –

0

你不得不看的方式,你可以星火工作分配您輸入的數據。在您目前使用的方法foreachPartition而不是您必須查看像地圖,mapToPair等轉換。您需要評估整個DAG生命週期以及哪些地方可以節省更多時間。

之後,基於並行性實現您可以調用saveAsNewAPIHadoopDataset Spark在HBase內寫入的動作更加快速和平行。像:

JavaPairRDD<ImmutableBytesWritable, Put> yourFinalRDD = yourRDD.<SparkTransformation>{()};  
yourFinalRDD.saveAsNewAPIHadoopDataset(yourHBaseConfiguration); 

注:凡yourHBaseConfiguration將是一個獨立的,並會單個對象中的執行人節點上的任務

請讓我知道,如果這個僞代碼不起作用之間共享爲你或找到任何困難。