0
我正在讀取火花流作業中kafka主題的數據。我需要從數據中創建關鍵值RDD。如何在kafka主題數據中創建關鍵值RDD
val messages = KafkaUtils.createStream(streamingContext, "localhost:2181","abc",topics, StorageLevel.MEMORY_ONLY)
messages.print()
create key value RDD out of CustomerId and Tokens
val xactionByCustomer = messages.map(_._2).map {
transaction =>
val key = transaction.customerId
var tokens = transaction.tokens
(key, tokens)
}
錯誤::
[error] /home/ec2-user/alok/marseille/src/main/scala/com/jcalc/feed/MarkovPredictor.scala:115: value customerId is not a member of String
[error] val key = transaction.customerId
[error] ^
[error] /home/ec2-user/alok/marseille/src/main/scala/com/jcalc/feed/MarkovPredictor.scala:116: value tokens is not a member of String
[error] var tokens = transaction.tokens
[error] ^
[error] two errors found
[error] (compile:compileIncremental) Compilation failed
樣本數據::
(null,W3Q6TF3CCI,X84N230CIH,NNN)
(null,O8IV7KEXT0,G1D590G05V,NNS)
(null,LBQKYNE081,MYU0O7JC5H,NHN)
(null,SRB4P501SW,E0FTI4RN7X,LHL)
(null,HELRFMAXVS,W6F704TN21,LHN)
(null,FS4PLQLI63,TK1O9YHS15,NNN)
(null,KI70UDVJLC,4ANBDAW7SU,LNN)
(null,IP6IVPGCWQ,MD93GGGBKA,NNN)
(null,976N9RPXSP,JKU0SV7UMH,LNL)
(null,J4V3AB1YVT,J9WXC1BRAY,LHN)
我只爲對RDD感興趣的第二&第4個值。 任何幫助?
[error] /home/ec2-user/alok/sample/KafkaWordCount.scala:37:值_2不是字符串的成員 [error] val xactionByCustomer = messages.map(row =>(row._2, row._4)) [error]^ [error] /home/ec2-user/alok/sample/KafkaWordCount.scala:37:值_4不是字符串的成員 [error] val xactionByCustomer = messages.map( row =>(row._2,row._4)) [error]^ [error]發現兩個錯誤 [error](compile:compileIncremental)編譯失敗 – user3460401
這很奇怪,因爲'messages.map(_._ 2) '在你的代碼中成功並生成了字符串。你能否用我建議的更新來粘貼整個代碼? – MirMasej
我正在嘗試從Github實現1個POC,在下面提到的鏈接中,沒有105個提交Spark Streaming作業時出現錯誤。 https://github.com/bhomass/marseille/blob/master/src/main/scala/com/jcalc/feed/MarkovPredictor.scala – user3460401