2014-10-08 78 views
3

我在Apache Spark Streaming庫上遇到以下問題。我已經重寫一個簡單的「字數統計」獨立的應用程序,看看流是如何工作的,所以這裏的代碼:Apache Spark流式簡單應用程序無法正常工作

SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount"); 
    JavaStreamingContext jssc = new JavaStreamingContext(conf, new Duration(1000)); 

    // Create a DStream that will connect to hostname:port, like localhost:9999 
    JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999); 

    // Split each line into words 
    JavaDStream<String> words = lines.flatMap(
     new FlatMapFunction<String, String>() { 
     @Override public Iterable<String> call(String x) { 
      return Arrays.asList(x.split(" ")); 
     } 
     }); 

    // Count each word in each batch 
    JavaPairDStream<String, Integer> pairs = words.mapToPair(
     new PairFunction<String, String, Integer>() { 
     @Override public Tuple2<String, Integer> call(String s) throws Exception { 
      return new Tuple2<String, Integer>(s, 1); 
     } 
     }); 



    JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(
     new Function2<Integer, Integer, Integer>() { 
     @Override public Integer call(Integer i1, Integer i2) throws Exception { 
      return i1 + i2; 
     } 
     }); 

    // Print the first ten elements of each RDD generated in this DStream to the console 
    wordCounts.print(); 

    jssc.start();    // Start the computation 
    jssc.awaitTermination(); // Wait for the computation to terminate 

當我運行這個獨立的應用程序日誌循環下面幾行:

14/10/08 13:16:44 INFO JobScheduler: Finished job streaming job 1412767004000 ms.0 from job set of time 1412767004000 ms 
14/10/08 13:16:44 INFO JobScheduler: Total delay: 0.023 s for time 1412767004000 ms (execution: 0.019 s) 
14/10/08 13:16:44 INFO ShuffledRDD: Removing RDD 428 from persistence list 
14/10/08 13:16:44 INFO BlockManager: Removing RDD 428 
14/10/08 13:16:44 INFO MappedRDD: Removing RDD 427 from persistence list 
14/10/08 13:16:44 INFO BlockManager: Removing RDD 427 
14/10/08 13:16:44 INFO FlatMappedRDD: Removing RDD 426 from persistence list 
14/10/08 13:16:44 INFO BlockManager: Removing RDD 426 
14/10/08 13:16:44 INFO BlockRDD: Removing RDD 425 from persistence list 
14/10/08 13:16:44 INFO SocketInputDStream: Removing blocks of RDD BlockRDD[425] at BlockRDD at ReceiverInputDStream.scala:69 of time 1412767004000 ms 
14/10/08 13:16:44 INFO BlockManager: Removing RDD 425 
14/10/08 13:16:44 INFO SocketReceiver: Stopped receiving 
14/10/08 13:16:44 INFO SocketReceiver: Closed socket to localhost:9999 
14/10/08 13:16:44 WARN ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Retrying connecting to localhost:9999 
14/10/08 13:16:44 INFO ReceiverSupervisorImpl: Stopping receiver with message: Restarting receiver with delay 2000ms: Retrying connecting to localhost:9999: 
14/10/08 13:16:44 INFO ReceiverSupervisorImpl: Called receiver onStop 
14/10/08 13:16:44 INFO ReceiverSupervisorImpl: Deregistering receiver 0 
14/10/08 13:16:44 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Retrying connecting to localhost:9999 
14/10/08 13:16:44 INFO ReceiverSupervisorImpl: Stopped receiver 0 
14/10/08 13:16:45 INFO ReceiverTracker: Stream 0 received 0 blocks 
14/10/08 13:16:45 INFO JobScheduler: Added jobs for time 1412767005000 ms 
14/10/08 13:16:45 INFO JobScheduler: Starting job streaming job 1412767005000 ms.0 from job set of time 1412767005000 ms 
14/10/08 13:16:45 INFO SparkContext: Starting job: take at DStream.scala:608 
14/10/08 13:16:45 INFO DAGScheduler: Registering RDD 435 (map at MappedDStream.scala:35) 
14/10/08 13:16:45 INFO DAGScheduler: Got job 217 (take at DStream.scala:608) with 1 output partitions (allowLocal=true) 
14/10/08 13:16:45 INFO DAGScheduler: Final stage: Stage 433(take at DStream.scala:608) 
14/10/08 13:16:45 INFO DAGScheduler: Parents of final stage: List(Stage 434) 
14/10/08 13:16:45 INFO DAGScheduler: Missing parents: List() 
14/10/08 13:16:45 INFO DAGScheduler: Submitting Stage 433 (ShuffledRDD[436] at combineByKey at ShuffledDStream.scala:42), which has no missing parents 
14/10/08 13:16:45 INFO MemoryStore: ensureFreeSpace(2256) called with curMem=23776, maxMem=277842493 
14/10/08 13:16:45 INFO MemoryStore: Block broadcast_217 stored as values in memory (estimated size 2.2 KB, free 264.9 MB) 
14/10/08 13:16:45 INFO DAGScheduler: Submitting 1 missing tasks from Stage 433 (ShuffledRDD[436] at combineByKey at ShuffledDStream.scala:42) 
14/10/08 13:16:45 INFO TaskSchedulerImpl: Adding task set 433.0 with 1 tasks 
14/10/08 13:16:45 INFO TaskSetManager: Starting task 0.0 in stage 433.0 (TID 217, localhost, PROCESS_LOCAL, 1008 bytes) 
14/10/08 13:16:45 INFO Executor: Running task 0.0 in stage 433.0 (TID 217) 
14/10/08 13:16:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 
14/10/08 13:16:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 0 non-empty blocks out of 0 blocks 
14/10/08 13:16:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 0 ms 
14/10/08 13:16:45 INFO Executor: Finished task 0.0 in stage 433.0 (TID 217). 822 bytes result sent to driver 
14/10/08 13:16:45 INFO TaskSetManager: Finished task 0.0 in stage 433.0 (TID 217) in 4 ms on localhost (1/1) 
14/10/08 13:16:45 INFO DAGScheduler: Stage 433 (take at DStream.scala:608) finished in 0.006 s 
14/10/08 13:16:45 INFO TaskSchedulerImpl: Removed TaskSet 433.0, whose tasks have all completed, from pool 
14/10/08 13:16:45 INFO SparkContext: Job finished: take at DStream.scala:608, took 0.009386933 s 
14/10/08 13:16:45 INFO SparkContext: Starting job: take at DStream.scala:608 
14/10/08 13:16:45 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 108 is 82 bytes 
14/10/08 13:16:45 INFO DAGScheduler: Got job 218 (take at DStream.scala:608) with 1 output partitions (allowLocal=true) 
14/10/08 13:16:45 INFO DAGScheduler: Final stage: Stage 435(take at DStream.scala:608) 
14/10/08 13:16:45 INFO DAGScheduler: Parents of final stage: List(Stage 436) 
14/10/08 13:16:45 INFO DAGScheduler: Missing parents: List() 
14/10/08 13:16:45 INFO DAGScheduler: Submitting Stage 435 (ShuffledRDD[436] at combineByKey at ShuffledDStream.scala:42), which has no missing parents 
14/10/08 13:16:45 INFO MemoryStore: ensureFreeSpace(2256) called with curMem=26032, maxMem=277842493 
14/10/08 13:16:45 INFO MemoryStore: Block broadcast_218 stored as values in memory (estimated size 2.2 KB, free 264.9 MB) 
14/10/08 13:16:45 INFO DAGScheduler: Submitting 1 missing tasks from Stage 435 (ShuffledRDD[436] at combineByKey at ShuffledDStream.scala:42) 
14/10/08 13:16:45 INFO TaskSchedulerImpl: Adding task set 435.0 with 1 tasks 
14/10/08 13:16:45 INFO TaskSetManager: Starting task 0.0 in stage 435.0 (TID 218, localhost, PROCESS_LOCAL, 1008 bytes) 
14/10/08 13:16:45 INFO Executor: Running task 0.0 in stage 435.0 (TID 218) 
14/10/08 13:16:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 
14/10/08 13:16:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 0 non-empty blocks out of 0 blocks 
14/10/08 13:16:45 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 1 ms 
14/10/08 13:16:45 INFO Executor: Finished task 0.0 in stage 435.0 (TID 218). 822 bytes result sent to driver 
14/10/08 13:16:45 INFO TaskSetManager: Finished task 0.0 in stage 435.0 (TID 218) in 3 ms on localhost (1/1) 
14/10/08 13:16:45 INFO TaskSchedulerImpl: Removed TaskSet 435.0, whose tasks have all completed, from pool 
14/10/08 13:16:45 INFO DAGScheduler: Stage 435 (take at DStream.scala:608) finished in 0.003 s 
14/10/08 13:16:45 INFO SparkContext: Job finished: take at DStream.scala:608, took 0.008348754 s 
------------------------------------------- 
Time: 1412767005000 ms 
------------------------------------------- 

和網絡用戶界面,我可以看到後續的截圖:

enter image description here

明顯的netcat -lk 9999當我寫s沒有做任何事情例如單詞。

有人能幫我弄清楚這個例子的工作原理嗎?

感謝

回答

7

正如評論給出

運行

nc -lk 9999 in console 

然後運行以下命令火花文件夾內

bin/run-example org.apache.spark.examples.streaming.JavaNetworkWordCount localhost 9999 

現在加入的話在控制檯選項卡你在哪跑nc

It is working! Life is beautiful! 

並檢查輸出,在火花文件夾

(beautiful!,1) 
(working!,1) 
(is,2) 
(It,1) 
(Life,1) 

如果不斷增加,它計劃將繼續鞏固.. 希望這有助於

3

您需要打開通信一個端口在您的本地系統中使用此命令

nc -lk 9999 

然後在spark後獲取控制檯中的所有文本(端口9999 )作爲流。

相關問題