2017-03-22 57 views
0

我正在讀取火花流作業中kafka主題的數據。我需要從數據中創建關鍵值RDD。如何在kafka主題數據中創建關鍵值RDD

val messages = KafkaUtils.createStream(streamingContext, "localhost:2181","abc",topics, StorageLevel.MEMORY_ONLY) 
messages.print() 

    create key value RDD out of CustomerId and Tokens 
    val xactionByCustomer = messages.map(_._2).map { 
    transaction => 
     val key = transaction.customerId 
     var tokens = transaction.tokens 
     (key, tokens) 
    } 

錯誤::

[error] /home/ec2-user/alok/marseille/src/main/scala/com/jcalc/feed/MarkovPredictor.scala:115: value customerId is not a member of String 
[error]  val key = transaction.customerId 
[error]       ^
[error] /home/ec2-user/alok/marseille/src/main/scala/com/jcalc/feed/MarkovPredictor.scala:116: value tokens is not a member of String 
[error]  var tokens = transaction.tokens 
[error]        ^
[error] two errors found 
[error] (compile:compileIncremental) Compilation failed 

樣本數據::

(null,W3Q6TF3CCI,X84N230CIH,NNN) 
(null,O8IV7KEXT0,G1D590G05V,NNS) 
(null,LBQKYNE081,MYU0O7JC5H,NHN) 
(null,SRB4P501SW,E0FTI4RN7X,LHL) 
(null,HELRFMAXVS,W6F704TN21,LHN) 
(null,FS4PLQLI63,TK1O9YHS15,NNN) 
(null,KI70UDVJLC,4ANBDAW7SU,LNN) 
(null,IP6IVPGCWQ,MD93GGGBKA,NNN) 
(null,976N9RPXSP,JKU0SV7UMH,LNL) 
(null,J4V3AB1YVT,J9WXC1BRAY,LHN) 

我只爲對RDD感興趣的第二&第4個值。 任何幫助?

回答

0

您的數據看起來像元組:(String, String, String, String)既然你有興趣2DN &第4個值映射:

val xactionByCustomer = messages.map(row => (row._2, row._4)) 

應該夠了。

+0

[error] /home/ec2-user/alok/sample/KafkaWordCount.scala:37:值_2不是字符串的成員 [error] val xactionByCustomer = messages.map(row =>(row._2, row._4)) [error]^ [error] /home/ec2-user/alok/sample/KafkaWordCount.scala:37:值_4不是字符串的成員 [error] val xactionByCustomer = messages.map( row =>(row._2,row._4)) [error]^ [error]發現兩個錯誤 [error](compile:compileIncremental)編譯失敗 – user3460401

+0

這很奇怪,因爲'messages.map(_._ 2) '在你的代碼中成功並生成了字符串。你能否用我建議的更新來粘貼整個代碼? – MirMasej

+0

我正在嘗試從Github實現1個POC,在下面提到的鏈接中,沒有105個提交Spark Streaming作業時出現錯誤。 https://github.com/bhomass/marseille/blob/master/src/main/scala/com/jcalc/feed/MarkovPredictor.scala – user3460401

相關問題