2017-08-29 65 views

回答

0

下面是我使用的示例代碼,但這裏的問題是如果在架構中有任何更改,它將始終需要更改代碼。

import Util.XmlFileUtil 
import org.apache.spark.{SparkConf,SparkContext} 
import org.apache.spark.sql.hive.HiveContext 

object xPathReader extends App{ 

    System.setProperty("hadoop.home.dir","C:\\hadoop\\winutils") 

    val sparkConf = new SparkConf().setAppName("MstarXmlIngestion").setMaster("local[5]") 
    val sc = new SparkContext(sparkConf) 
    val hiveContext = new HiveContext(sc) 
    val fscXmlPath = "C:\\data\\xml" 
    val xmlRddList = XmlFileUtil.withCharset(sc, fscXmlPath, "UTF-8", "book") 

    import hiveContext.implicits._ 

    val xmlDf = xmlRddList.toDF("xml") 
    xmlDf.registerTempTable("TEMPTABLE") 

    hiveContext.sql("select xpath_string(xml,\"/book/@_id\") as BookId, xpath_string(xml,\"/book/description\") as description, CurrID, Price from TEMPTABLE " + 
     "LATERAL VIEW OUTER explode(xpath(xml,\"/book/market/price/text()\")) PRICE_LIST as Price " + 
     "LATERAL VIEW OUTER explode(xpath(xml,\"/book/currency/price/text()\")) CurrID_LIST as CurrID").show() 
} 


Another XmlFileUtil Class 

=================================================== 
import com.databricks.spark.xml.util.XmlFile; 
import org.apache.spark.SparkContext; 
import org.apache.spark.rdd.RDD; 

/** 
* Utility class to access private `XmlFile` scala object from spark xml package 
*/ 
public class XmlFileUtil { 
    public static RDD<String> withCharset(SparkContext context, String location, String charset, String rowTag) { 
     return XmlFile.withCharset(context, location, charset, rowTag); 
    } 
} 
0

可悲的是,我們不能使用所有的XML文件的通用解決方案。 如果你的XML文件改變了,代碼也會如此。

在我的代碼我使用了XML源SparkSQL通過databricks 你可以找到它here

+0

接受,我們必須改變代碼爲每架構更改。但是如果我們使用DF,那麼它會起作用。當然,我們必須做一些曲折。我會分享我的另一種方法 –

0

如這裏說的是另一種方法:

object XmlSecondApproach extends App{ 

    System.setProperty("hadoop.home.dir","C:\\hadoop\\winutils") 

    val sparkConf = new SparkConf().setAppName("Second Approach").setMaster("local[5]") 
    val sc = new SparkContext(sparkConf) 
    val hiveContext = new HiveContext(sc) 
    val rawXmlPath = "C:\\data\\xml\\RawXML" 

    val objXMLDR = new XmlDataReader 

    var rawDf = objXMLDR.getRawXmlDataframe(sc, rawXmlPath, "book") 

    rawDf.registerTempTable("tempBookTable") 

    rawDf.printSchema() 

    hiveContext.udf.register("customFunction",UDFWrappedArrayConverter.checkIfWrappedArray) 

    //When we get more than one arrays in a parent node then we need to handle parent repeated node with explode 
    hiveContext.sql("SELECT book.currency.price as curPrice, " +  
    "markET.price as mktPrice from tempBookTable " + 
    "LATERAL VIEW OUTER explode(book.market) Mrkt_List as markET").show() 
} 

//Define another scala class 
class XmlDataReader { 

    def getRawXmlDataframe(sc:SparkContext, xmlPath: String, rowTag: String): DataFrame = { 

    val xmlRddList = XmlFileUtil.withCharset(sc, xmlPath, "UTF-8", rowTag) 
    val xmlReader = new XmlReader() 
    val xmlDf = xmlReader.withAttributePrefix(Constant.ATTR_PREFIX).withValueTag(Constant.VALUE_TAG).withRowTag(rowTag.toLowerCase).xmlRdd(hiveContext, xmlRddList) 
    return xmlDf 
    } 
}