我想從kafka主題使用scala應用程序流數據。我能夠從主題獲取數據,但如何創建一個數據框?不適當的輸出,同時創建一個數據幀
下面是數據(字符串,字符串格式)
{
"action": "AppEvent",
"tenantid": 298,
"lat": 0.0,
"lon": 0.0,
"memberid": 16390,
"event_name": "CATEGORY_CLICK",
"productUpccd": 0,
"device_type": "iPhone",
"device_os_ver": "10.1",
"item_name": "CHICKEN"
}
我試過幾個方法可以做到,但它沒有產生令人滿意的結果。
+--------------------+ | _1|
+--------------------+ |{"action":"AppEve...| |{"action":"AppEve...| |{"action":"AppEve...| |{"action":"AppEve...| |{"action":"AppEve...|
|{"action":"AppEve...| |{"action":"AppEve...| |{"action":"AppEve...|
|{"action":"AppEve...| |{"action":"AppEve...|
任何人都可以告訴如何做映射,以便每個字段進入像表一樣的獨立列。數據採用avro格式。
這裏是從主題獲取數據的代碼。
val ssc = new StreamingContext(sc, Seconds(2))
val kafkaConf = Map[String, String]("metadata.broker.list" -> "####",
"zookeeper.connect" -> "########",
"group.id" -> "KafkaConsumer",
"zookeeper.connection.timeout.ms" -> "1000000")
val topicMaps = Map("fishbowl" -> 1)
val messages = KafkaUtils.createStream[String, String,DefaultDecoder, DefaultDecoder](ssc, kafkaConf, topicMaps, StorageLevel.MEMORY_ONLY_SER).map(_._2)
請指導我如何使用foreachRDD FUNC和map()來創建一個適當的數據幀
你嘗試做搜索? [火花流+數據幀(http://stackoverflow.com/search?q=%5Bspark-streaming%5D+dataframe) – maasg
是沒有幫助的,因爲進出口新的scala.I不可能弄清楚如何轉換avro [字符串,字符串]到dataframe http://stackoverflow.com/questions/41237929/value-toint-is-not-a-member-of-object –
這是我的答案謝謝馬斯格找到答案 –