2016-04-14 92 views
1

我需要合併兩個不同的流RDD。如何合併兩個不同類型的流RDDs

Uno的流類型是org.apache.spark.streaming.dstream.DStream [String],另一個類型是org.apache.spark.streaming.dstream.DStream [twitter4j.Status]。

我已經試過:

val streamRDD = stream.union(sentiments) 

但它不會成功:

[error] found : org.apache.spark.streaming.dstream.DStream[String] 
[error] required: org.apache.spark.streaming.dstream.DStream[twitter4j.Status] 
[error]  val streamRDD = stream.union(sentiments) 
[error]         ^
+1

你想要結果看起來像什麼?錯誤消息正確地描述了這個問題:你的'Dstream'具有不同的類型,所以不能合併。你期望得到的合併後的'DStream'是什麼?如果使用'String',則必須將另一個轉換爲'DStream [String]'第一個 –

回答

2

的問題是,union僅適用於同一元素類型的兩個DStream,當你有DStream[String]DStream[twitter4j.Status]String不是twitter4j.Status

我假設你有以下幾種類型:

val stream: DStream[twitter4j.Status] 
val sentiments: DStream[String] 

你有不同的選擇來解決這個問題:

    1. 你確信Stringtwitter4j.Status應該混爲一體DStream因爲它們在您的上下文中表示相同的信息:將DStream轉換爲與其他

      • A)轉換stream匹配sentiments,所以你需要一個轉換twitter4j.Status => String,可能可以使用_.toString這樣的:

        val stream2 = stream.map(_.toString) 
        val result = stream2.union(sentiments) 
        
      • b)將sentiments匹配stream,要求String => twitter4j.Status
  • Stringtwitter4j.Status是在上下文兩個不同的東西,你想保持在兩者之間的區別,但還是將它們組合成一個DStream
  • 一般來說,你可以使用一個Sum型來代表每一種情況下,我們這裏只有兩個,所以我們可以使用預定義Either

    type R = DStream[Either[String,twitter4j.Status] // shorter 
    val streamL: R = stream.map(Left(_)) 
    val sentimentR: R = sentiments.map(Right(_)) 
    val result: R = streamL.union(sentimentsR) 
    

    在最後你將有一個,其中每個元素可以是一個String包裹在Lefttwitter4j.Status包裹在Right,允許你區分這兩個處理流時。

+0

它像魅力一樣工作,非常感謝您 –

相關問題