0

我正在使用來自kafka主題的XML文件。任何人都可以告訴我如何將XML解析爲數據框。如何解析流XML到數據框?

val df = sqlContext.read 
    .format("com.databricks.spark.xml") 
    //.option("rowTag","ns:header") 
    // .options(Map("rowTag"->"ntfyTrns:payloadHeader","rowTag"->"ns:header")) 
     .option("rowTag","ntfyTrnsDt:notifyTransactionDetailsReq") 
    .load("/home/ubuntu/SourceXML.xml") 
    df.show 
    df.printSchema() 
    df.select(col("ns:header.ns:captureSystem")).show() 

我能夠精確的XML。我的信息不知道如何傳遞或轉換或卡夫卡的主題加載RDD [字符串]以SQL READ API。

謝謝!

回答

1

我面臨同樣的情況,做了一些研究,我發現,一些人使用這種方法的RDD轉換爲使用下面的代碼數據幀,如圖here

val wrapped = rdd.map(xml => s"""<a>$xml</a>""") 
val df = new XmlReader().xmlRdd(sqlContext, wrapped) 

你只需要獲得在RDD從DSTREAM,我做這個使用pyspark

streamElement = ssc.textFileStream("s3n://your_path") 
streamElement.foreachRDD(process) 

其中的工藝方法具有以下的結構,所以你可以用你RDDS盡一切

def process(time, rdd): 
    return value