2016-04-10 14 views
0

我在火花世界非常新,並且剛剛在6個月前開始編程java。所以我有困難:如何將JavaDstream對象映射到字符串? Spark Streaming和模型預測JAVA

JavaDStream<String> words = lines.flatMap(
      new FlatMapFunction<String, String>() { 
       public Iterable<String> call(String x) { 
        return Arrays.asList(x.split(",")); 
       } 
      }); 
    words.print(); String vec = words; 

我想將此JavaDStream轉換爲字符串。因爲之後我將能夠使用它作爲我的模型的輸入!我beleave我應該使用foreachRDD方法來完成...

 Double predictionDone = sameModel.predict(Vectors.dense(vec)); 
     System.out.println(predictionDone.toString()); 
+0

我建議你仔細閱讀Spark Streaming文檔以獲取所需的基礎以便應用它:http://spark.apache.org/docs/latest/streaming-programming-guide.html – maasg

+0

我在做它thx,我有2件,但我不能連接他們...我可以打開一個流,我也可以加載模型,並做預測 –

+0

我添加了一個廣泛的答案,以幫助你的方式,但你需要拿起在很多材料上。如果你從Java開始,我建議你跳轉到Scala。將幫助你更好地理解功能方面。 – maasg

回答

0

你要找的不是如何將DSTREAM對象映射到一個字符串,而是如何在每一個得分流中包含DSTREAM值間隔。 DStream是RDD的時間限制集合。您可以通過對其應用高級操作來處理DStream。在Spark內部,這些操作將應用於當時可用數據在每個時間間隔內構建的RDD。

而不是思考「如何從DStream到字符串」,正確的路徑是「如何訪問DStream的元素並將其評分函數應用於他們」。

在廣泛的線,你需要幾個步驟:

- 首先,構建你的流,使用supported DStream implementations的一個(或滾你自己):

JavaDStream<String> textDStream = ... 

-Apply transformations獲取數據在外形上,你需要它:

JavaDStream<String> wordsDStream = textDStream.flatMap(...).filter(...) 

- 一旦你有正確的形狀中的數據,你需要一個output operation適用於DSTREAM爲了實際上對數據做了些什麼。 foreachRDD是最通用的輸出運算符,它允許我們將actions應用於基礎RDD。

wordsDStream.foreachRDD{rdd => // here we get access to the RDD 
    rdd.foreach{word => // here we get access to the content of the RDD, 
          // which is the 'words' in the DStream 
     val score = model.score(word) 
     // do something with 'score' like write it to a db or file 
    } 
} 

(這是斯卡拉僞碼。在Java中的結構是一樣的,只有代碼更詳細)

在粗獷的線條,這是遵循結構。結合ML模型增加了一些複雜性,這可能是具有挑戰性的。

0
words.foreachRDD(new Function<JavaRDD<String>, Void>() { 
     public Void call(JavaRDD<String> rdd) throws Exception { 
      if(rdd!=null) 
      { 
       List<String> result = rdd.collect(); 
       double[] d = new double[69]; // model input expected lenght 
       int i=0; 


       for (String temp : result) { 
        double aDouble = Double.parseDouble(temp); 
        d[i]=aDouble; i++; 
        list_transactions.add(Vectors.dense(d)); //global variable 

       } 
      } 
      return null; 

這是解決方案!它的作品

@maasg,thx的幫助! 「如何訪問DStream的元素並將其評分函數應用於他們」這是我尋找的關鍵!

相關問題