我是Spark的新手。這是我想要做的事情。使用Spark和Scala將數據插入Hive Table的問題
我創建了兩個數據流;第一個從文本文件讀取數據並使用hivecontext將其註冊爲臨時表。另一個持續從Kafka獲得RDD,對於每個RDD,它創建數據流並將內容註冊爲可臨時的。最後,我將這兩個臨時表連接在一個鍵上以獲得最終結果集。我想將結果集插入配置單元表中。但我沒有想法。試圖遵循一些實例,但只能創建一個列中有一列的表格,並且不可讀。你能告訴我如何將結果插入到特定的數據庫和配置表中。請注意,我可以看到使用show函數進行連接的結果,因此真正的挑戰在於插入到hive表中。
以下是我正在使用的代碼。
imports.....
object MSCCDRFilter {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("Flume, Kafka and Spark MSC CDRs Manipulation")
val sc = new SparkContext(sparkConf)
val sqlContext = new HiveContext(sc)
import sqlContext.implicits._
val cgiDF = sc.textFile("file:///tmp/omer-learning/spark/dim_cells.txt").map(_.split(",")).map(p => CGIList(p(0).trim, p(1).trim, p(2).trim,p(3).trim)).toDF()
cgiDF.registerTempTable("my_cgi_list")
val CGITable=sqlContext.sql("select *"+
" from my_cgi_list")
CGITable.show() // this CGITable is a structure I defined in the project
val streamingContext = new StreamingContext(sc, Seconds(10)
val zkQuorum="hadoopserver:2181"
val topics=Map[String, Int]("FlumeToKafka"->1)
val messages: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(streamingContext,zkQuorum,"myGroup",topics)
val logLinesDStream = messages.map(_._2) //獲取數據
logLinesDStream.print()
val MSCCDRDStream = logLinesDStream.map(MSC_KPI.parseLogLine) // change MSC_KPI to MCSCDR_GO if you wanna change the class
// MSCCDR_GO and MSC_KPI are structures defined in the project
MSCCDRDStream.foreachRDD(MSCCDR => {
println("+++++++++++++++++++++NEW RDD ="+ MSCCDR.count())
if (MSCCDR.count() == 0) {
println("==================No logs received in this time interval=================")
} else {
val dataf=sqlContext.createDataFrame(MSCCDR)
dataf.registerTempTable("hive_msc")
cgiDF.registerTempTable("my_cgi_list")
val sqlquery=sqlContext.sql("select a.cdr_type,a.CGI,a.cdr_time, a.mins_int, b.Lat, b.Long,b.SiteID from hive_msc a left join my_cgi_list b"
+" on a.CGI=b.CGI")
sqlquery.show()
sqlContext.sql("SET hive.exec.dynamic.partition = true;")
sqlContext.sql("SET hive.exec.dynamic.partition.mode = nonstrict;")
sqlquery.write.mode("append").partitionBy("CGI").saveAsTable("omeralvi.msc_data")
val FilteredCDR = sqlContext.sql("select p.*, q.* " +
" from MSCCDRFiltered p left join my_cgi_list q " +
"on p.CGI=q.CGI ")
println("======================print result =================")
FilteredCDR.show()
streamingContext.start()
streamingContext.awaitTermination()
}
}
如果您可以將示例中的代碼減少到最小,可重現的示例,那麼任何人都可以運行代碼並自行觀察問題將會很有幫助。此外,非關鍵線路混淆了問題可能出現的地方。我建議你編輯你的問題,使代碼片段更有用。 –