2017-04-26 28 views
0

我是Spark的新手。這是我想要做的事情。使用Spark和Scala將數據插入Hive Table的問題

我創建了兩個數據流;第一個從文本文件讀取數據並使用hivecontext將其註冊爲臨時表。另一個持續從Kafka獲得RDD,對於每個RDD,它創建數據流並將內容註冊爲可臨時的。最後,我將這兩個臨時表連接在一個鍵上以獲得最終結果集。我想將結果集插入配置單元表中。但我沒有想法。試圖遵循一些實例,但只能創建一個列中有一列的表格,並且不可讀。你能告訴我如何將結果插入到特定的數據庫和配置表中。請注意,我可以看到使用show函數進行連接的結果,因此真正的挑戰在於插入到hive表中。

以下是我正在使用的代碼。

imports..... 

    object MSCCDRFilter {   
    def main(args: Array[String]) { 
     val sparkConf = new SparkConf().setAppName("Flume, Kafka and Spark MSC CDRs Manipulation") 

     val sc = new SparkContext(sparkConf) 
     val sqlContext = new HiveContext(sc) 
     import sqlContext.implicits._ 
     val cgiDF = sc.textFile("file:///tmp/omer-learning/spark/dim_cells.txt").map(_.split(",")).map(p => CGIList(p(0).trim, p(1).trim, p(2).trim,p(3).trim)).toDF() 
     cgiDF.registerTempTable("my_cgi_list") 
     val CGITable=sqlContext.sql("select *"+ 
      " from my_cgi_list") 
     CGITable.show() // this CGITable is a structure I defined in the project 
       val streamingContext = new StreamingContext(sc, Seconds(10) 
     val zkQuorum="hadoopserver:2181" 
     val topics=Map[String, Int]("FlumeToKafka"->1) 

     val messages: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(streamingContext,zkQuorum,"myGroup",topics) 

     val logLinesDStream = messages.map(_._2) //獲取數據 
     logLinesDStream.print() 
     val MSCCDRDStream = logLinesDStream.map(MSC_KPI.parseLogLine) // change MSC_KPI to MCSCDR_GO if you wanna change the class 
// MSCCDR_GO and MSC_KPI are structures defined in the project  
     MSCCDRDStream.foreachRDD(MSCCDR => { 
      println("+++++++++++++++++++++NEW RDD ="+ MSCCDR.count()) 

      if (MSCCDR.count() == 0) { 
      println("==================No logs received in this time interval=================") 
      } else { 

      val dataf=sqlContext.createDataFrame(MSCCDR) 
      dataf.registerTempTable("hive_msc") 
      cgiDF.registerTempTable("my_cgi_list") 
      val sqlquery=sqlContext.sql("select a.cdr_type,a.CGI,a.cdr_time, a.mins_int, b.Lat, b.Long,b.SiteID from hive_msc a left join my_cgi_list b" 
      +" on a.CGI=b.CGI") 
      sqlquery.show() 
      sqlContext.sql("SET hive.exec.dynamic.partition = true;") 
      sqlContext.sql("SET hive.exec.dynamic.partition.mode = nonstrict;") 
      sqlquery.write.mode("append").partitionBy("CGI").saveAsTable("omeralvi.msc_data")  

      val FilteredCDR = sqlContext.sql("select p.*, q.* " + 
       " from MSCCDRFiltered p left join my_cgi_list q " + 
       "on p.CGI=q.CGI ") 

      println("======================print result =================") 
      FilteredCDR.show() 
     streamingContext.start() 
     streamingContext.awaitTermination() 
     } 
    } 
+0

如果您可以將示例中的代碼減少到最小,可重現的示例,那麼任何人都可以運行代碼並自行觀察問題將會很有幫助。此外,非關鍵線路混淆了問題可能出現的地方。我建議你編輯你的問題,使代碼片段更有用。 –

回答

1

我已經取得了一些成功寫入配置單元,使用以下:

dataFrame 
    .coalesce(n) 
    .write 
    .format("orc") 
    .options(Map("path" -> savePath)) 
    .mode(SaveMode.Append) 
    .saveAsTable(fullTableName) 

我們嘗試使用分區不是通過注視着,因爲我覺得有一些問題,我們需要的分區柱。

唯一的限制是併發寫入,其中表不存在,那麼任何任務嘗試創建表(因爲它在第一次嘗試寫入表時不存在)將例外。

請注意,在流式應用程序中寫入Hive通常是不好的設計,因爲您經常會寫很多小文件,而這些文件的讀取和存儲效率非常低。因此,如果您對Hive的寫入次數多於每隔一小時左右,則應確保包含壓縮邏輯,或添加更適合事務數據的中間存儲層。