2016-11-17 37 views
1

我將單詞列表作爲DStream。例如:列表(汽車,速度,事故,速度,壞)。我想從這個列表中形成雙克。我有RDD,但面臨DStreams的問題。我正在使用foreachRDD函數。下面是我的 -DStream中的列表處理

我試圖在轉換後打印RDD的內容。

def printRDD(rddString: RDD[String]) ={ 
     val z = rddString.map(y => y.toString.split(",").filter(_.nonEmpty). 
     map(y => y.replaceAll("""\W""", "").toLowerCase) 
     .filter(_.nonEmpty) 
     .sliding(2).filter(_.size == 2).map{ case Array(a, b) => ((a, b), 1) }) 
     .flatMap(x => x) 
     println(z) 
} 
val x = lines.map(plainTextToLemmas(_, stopWords)) 
val words = x.flatMap(y=> y.toString.split(",")) 
words.foreachRDD(rdd => printRDD(rdd)) 

是否有任何方式顯示轉換函數printRDD後的內容。即使我在打印定義中使用println(z),它也會在flatMap中返回MapPartitionsRDD [18]。我正在使用Kafka火花流式傳輸來讀取輸入,我在控制檯上獲取單詞值。我認爲在調用printRDD函數後單詞不會改變。

+0

流處理後bigrams會發生什麼?該功能僅用於控制檯打印。 – maasg

回答

1

你可以做所有這些在DStream操作,內部沒有foreachRDD,然後調用printDStream

lines 
    .map(plainTextToLemmas(_, stopWords)) 
    .flatMap(y => y.toString.split(",")) 
    .map(y => y.toString.split(",").filter(_.nonEmpty)) 
    .map(y => y.replaceAll("""\W""", "").toLowerCase) 
    .filter(_.nonEmpty) 
    .sliding(2) 
    .filter(_.size == 2) 
    .flatMap { case Array(a, b) => ((a, b), 1) } 
    .print() 

這應該打印出DStream的對駕駛員控制檯的內容。

要注意的重要一點是,儘管你在一個DStream操作,它的方法「鑽入」了在給定的批次時間基本RDD和揭露RDD內的實際類型,所以你不應該需要使用foreachRDD可以達到內部的實際數據。