2013-06-28 64 views
0

我正試圖在我的本地機器上運行一個簡單的Spark-Streaming示例。
我有這樣寫道/ BS/CS到插座的線:來自套接字的Spark Streaming不能用於減少操作

serverSocket = new ServerSocket(Constants.PORT); 
s1 = serverSocket.accept(); 
while(true) { 
    Thread.sleep(random.nextInt(100)); 
    String character = alphabet.get(random.nextInt(alphabet.size())) ; 
    PrintWriter out = new PrintWriter(s1.getOutputStream()); 
    out.println(character); 
    out.flush(); 
} 

我的主程序,在這裏我試着算子的編號/ BS/CS如下所示(不減少步驟) :

public static void main(String[] args) { 
    // start socket writer thread 
    System.setProperty("spark.cleaner.ttl", "10000"); 
    JavaSparkContext sc = new JavaSparkContext(
      "local", 
      "Test", 
      Constants.SPARK_HOME, 
      new String[]{"target/spark-standalone-0.0.1-SNAPSHOT.jar"}); 
    Duration batchDuration = new Duration(TIME_WINDOW_MS); 
    JavaStreamingContext streamingContext = new JavaStreamingContext(sc, batchDuration); 
    JavaDStream<String> stream = streamingContext.socketTextStream("localhost", Constants.PORT); 
    stream.print(); 
    JavaPairDStream<String, Long> texts = stream.map(new PairFunction<String, String, Long>() { 

      @Override 
      public Tuple2<String, Long> call(String t) throws Exception { 
       return new Tuple2<String, Long>("batchCount" + t, 1l); 
      } 

     }); 
    texts.print(); 
    streamingContext.checkpoint("checkPointDir"); 
    streamingContext.start(); 

在這種情況下,一切工作得很好(樣本輸出批次):

Time: 1372413296000 ms 
------------------------------------------- 
B 
A 
B 
C 
C 
C 
A 
B 
C 
C 
... 

------------------------------------------- 
Time: 1372413296000 ms 
------------------------------------------- 
(batchCountB,1) 
(batchCountA,1) 
(batchCountB,1) 
(batchCountC,1) 
(batchCountC,1) 
(batchCountC,1) 
(batchCountA,1) 
(batchCountB,1) 
(batchCountC,1) 
(batchCountC,1) 
... 

但是,如果我的地圖後添加還原步驟不起作用anymo回覆。 texts.print此代碼後進入()

JavaPairDStream<String, Long> reduced = texts.reduceByKeyAndWindow(new Function2<Long, Long, Long>() { 

    @Override 
    public Long call(Long t1, Long t2) throws Exception { 
     return t1 + t2; 
    } 
    }, new Duration(TIME_WINDOW_MS)); 
reduced.print(); 

在這種情況下,我只得到了第一個「流」變量和「文本」變量並沒有爲降低輸出。在第一批處理後也沒有任何反應。我還將火花日誌級別設置爲DEBUG,但沒有遇到任何異常或其他奇怪的事情。

這裏會發生什麼?爲什麼我被鎖定?

回答

2

只是爲了記錄:我在Spark用戶組中得到了答案。
該錯誤是一個具有使用的

"local[2]" 

代替

"local" 

作爲參數來實例化火花背景下,爲了使併發處理。