我有幾個XML文件&我已經把這些放在卡夫卡主題&我已經創建了卡夫卡主題的Dstream對象。由於我想解析主題中的xml數據,因此我無法進一步處理。如果任何在Spark流中處理過xml處理的人都可以給我提供他們的幫助。我在過去的兩天裏一直堅持這一點。Kafka Spark流XML解析/處理
我採取的方法是XML文件 - >卡夫卡主題 - >在Spark流中處理 - >再次將它放回卡夫卡。
我能夠將數據放回卡夫卡話題,但無法處理或做火花流中主題的數據。
我有幾個XML文件&我已經把這些放在卡夫卡主題&我已經創建了卡夫卡主題的Dstream對象。由於我想解析主題中的xml數據,因此我無法進一步處理。如果任何在Spark流中處理過xml處理的人都可以給我提供他們的幫助。我在過去的兩天裏一直堅持這一點。Kafka Spark流XML解析/處理
我採取的方法是XML文件 - >卡夫卡主題 - >在Spark流中處理 - >再次將它放回卡夫卡。
我能夠將數據放回卡夫卡話題,但無法處理或做火花流中主題的數據。
您期待什麼樣的處理?如果您期待任何一種數據提取,您可以做的是,foreach消息,將它們轉換爲json(xml to json非常簡單),並將jsonRDD和JsonRDD轉換爲DF直接轉換。因此,您將能夠在數據框上進行任何選擇或進行其他操作。
我需要你幾個輸入端,以提供準確的解決方案
1)你想出來的數據是什麼? 2)Dataframe中的數據是否足夠。?
如果你能夠解釋輸入,這將是非常有益的。
我已經添加了一個示例代碼來獲取數據框的XML數據。
val jsonStream = kafkaStream.transform(
y => {
y.filter(x => x._1 != null && x._2 != null).map(x => {
XML.toJSONObject(x).toString(4);
}
)
})
jsonStream.foreachRDD(x => {
val sqlContext = SQLContextSingleton.getInstance(x.sparkContext)
if (x != null) {
val df = sqlContext.read.json(x)
// Your DF Operations
}
}
}
)
object SQLContextSingleton {
@transient private var instance: HiveContext = _
def getInstance(sparkContext: SparkContext): HiveContext = {
if (instance == null) {
sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", "false");
sparkContext.hadoopConfiguration.set("spark.sql.parquet.mergeSchema", "true");
sparkContext.hadoopConfiguration.set("spark.sql.parquet.cacheMetadata","false");
instance = new HiveContext(sparkContext)
}
instance
}
}
嗨Srini,感謝您的快速回復。這個問題已經解決,這是一個非常複雜的用例,我們想要使用火花流連接3種類型的xml。終於完成了。我們使用JAXB來驗證xml的各自的模式。正如我所說的,這個用例非常複雜,有很多編碼,因此我沒有分享任何適合我的代碼。再一次感謝你。 – Harsha
你可以添加你的代碼,並具體問題是什麼以及你得到什麼錯誤或異常? – maasg
@Harsha在閱讀來自kafka的消息時遇到了同樣的問題,即將消息作爲每個標記作爲消息來獲取。你能否讓我知道你是如何解決這個問題的。 –
@ankush reddy使用JAXB來驗證XML的各自模式 – Harsha