2016-12-19 32 views
2

我想從kafka主題使用scala應用程序流數據。我能夠從主題獲取數據,但如何創建一個數據框?不適當的輸出,同時創建一個數據幀

下面是數據(字符串,字符串格式)

{ 
    "action": "AppEvent", 
    "tenantid": 298, 
    "lat": 0.0, 
    "lon": 0.0, 
    "memberid": 16390, 
    "event_name": "CATEGORY_CLICK", 
    "productUpccd": 0, 
    "device_type": "iPhone", 
    "device_os_ver": "10.1", 
    "item_name": "CHICKEN" 
} 

我試過幾個方法可以做到,但它沒有產生令人滿意的結果。

+--------------------+ |     _1| 
+--------------------+ |{"action":"AppEve...| |{"action":"AppEve...| |{"action":"AppEve...| |{"action":"AppEve...| |{"action":"AppEve...| 
|{"action":"AppEve...| |{"action":"AppEve...| |{"action":"AppEve...| 
|{"action":"AppEve...| |{"action":"AppEve...| 

任何人都可以告訴如何做映射,以便每個字段進入像表一樣的獨立列。數據採用avro格式。

這裏是從主題獲取數據的代碼。

val ssc = new StreamingContext(sc, Seconds(2)) 
val kafkaConf = Map[String, String]("metadata.broker.list" -> "####", 
    "zookeeper.connect" -> "########", 
    "group.id" -> "KafkaConsumer", 
    "zookeeper.connection.timeout.ms" -> "1000000") 
val topicMaps = Map("fishbowl" -> 1) 
val messages = KafkaUtils.createStream[String, String,DefaultDecoder, DefaultDecoder](ssc, kafkaConf, topicMaps, StorageLevel.MEMORY_ONLY_SER).map(_._2) 

請指導我如何使用foreachRDD FUNC和map()來創建一個適當的數據幀

+0

你嘗試做搜索? [火花流+數據幀(http://stackoverflow.com/search?q=%5Bspark-streaming%5D+dataframe) – maasg

+0

是沒有幫助的,因爲進出口新的scala.I不可能弄清楚如何轉換avro [字符串,字符串]到dataframe http://stackoverflow.com/questions/41237929/value-toint-is-not-a-member-of-object –

+0

這是我的答案謝謝馬斯格找到答案 –

回答

2

要出RDD的創建數據幀不論其例類架構。 使用此邏輯之下

stream.foreachRDD(
    rdd => { 
    val dataFrame = sqlContext.read.json(rdd.map(_._2)) 
dataFrame.show() 
     }) 

這裏流是來自kafkaUtils.createStream創建了一個RDD()

+0

做得好。關於「不考慮其格式或案例類模式」的評論不完全正確=>這僅適用於JSON格式的記錄。 – maasg

+0

@maasg謝謝先生,編輯我的評論。當我用avro解決它(仍然它的模式在json中) –

相關問題