我試圖從Kinesis處理Json字符串。 Json字符串可以有幾種不同的形式。從室壁運動,我創建了一個DSTREAM:Spark Streaming Scala結合不同結構的json形成一個DataFrame
val kinesisStream = KinesisUtils.createStream(
ssc, appName, "Kinesis_Stream", "kinesis.ap-southeast-1.amazonaws.com",
"region", InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2)
val lines = kinesisStream.map(x => new String(x))
lines.foreachRDD((rdd, time) =>{
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits.StringToColumn
if(rdd.count() > 0){
// Process jsons here
// Json strings here would have either one of the formats below
}
})
的RDD串會有這些JSON字符串中的任何一個。 收藏:
[
{
"data": {
"ApplicationVersion": "1.0.3 (65)",
"ProjectId": 30024,
"TargetId": "4138",
"Timestamp": 0
},
"host": "host1"
},
{
"data": {
"ApplicationVersion": "1.0.3 (65)",
"ProjectId": 30025,
"TargetId": "4139",
"Timestamp": 0
},
"host": "host1"
}
]
和一些Json的字符串,如單對象,以便:
{
"ApplicationVersion": "1.0.3 (65)",
"ProjectId": 30026,
"TargetId": "4140",
"Timestamp": 0
}
我希望能夠從「數據」提取對象的關鍵,如果它是第一種類型的JSON字符串並結合第二種類型的Json並形成一個RDD/DataFrame,我該如何實現這一點?
最後,我想我的數據幀是這樣的:
+------------------+---------+--------+---------+
|ApplicationVersion|ProjectId|TargetId|Timestamp|
+------------------+---------+--------+---------+
| 1.0.3 (65)| 30024| 4138| 0|
| 1.0.3 (65)| 30025| 4139| 0|
| 1.0.3 (65)| 30026| 4140| 0|
+------------------+---------+--------+---------+
對不起,新斯卡拉和火花。我一直在尋找現有的例子,但不幸的是沒有找到解決方案。
非常感謝提前。
感謝您的快速響應!對不起,我忘了提及我正在使用Spark Streaming DStreams,我已經更新了我的問題。你的迴應仍然有幫助! – j3tr1
如果你能夠從你的DStream中提取字符串,代碼應該或多或少的工作。 – philantrovert
謝謝!這通過使用json4s指出了我的正確方向。這允許我在轉換爲DF之前先處理json字符串 – j3tr1