2016-05-09 55 views
1

我正在開發一個Spark Streaming應用程序,我需要在Python中使用來自兩臺服務器的輸入流,每個應用程序每秒向Spark上下文發送一條JSON消息。爲什麼Spark Streaming在發送兩個輸入流時停止工作?

我的問題是,如果我只在一個流上執行操作,那麼一切正常。但是如果我有兩個來自不同服務器的數據流,那麼Spark在它可以打印任何內容之前就凍結了,並且只有當兩個服務器都發送了他們必須發送的所有JSON消息(當它檢測到'socketTextStream沒有接收數據時。

這裏是我的代碼:

JavaReceiverInputDStream<String> streamData1 = ssc.socketTextStream("localhost",996, 
      StorageLevels.MEMORY_AND_DISK_SER); 

    JavaReceiverInputDStream<String> streamData2 = ssc.socketTextStream("localhost", 9995,StorageLevels.MEMORY_AND_DISK_SER); 

    JavaPairDStream<Integer, String> dataStream1= streamData1.mapToPair(new PairFunction<String, Integer, String>() { 
     public Tuple2<Integer, String> call(String stream) throws Exception { 


      Tuple2<Integer,String> streamPair= new Tuple2<Integer, String>(1, stream); 

      return streamPair; 
     } 
    }); 

    JavaPairDStream<Integer, String> dataStream2= streamData2.mapToPair(new PairFunction<String, Integer, String>() { 
     public Tuple2<Integer, String> call(String stream) throws Exception { 


      Tuple2<Integer,String> streamPair= new Tuple2<Integer, String>(2, stream); 

      return streamPair; 
     } 
    }); 

dataStream2.print(); //for example 

注意到有沒有錯誤消息,在啓動上下文之後星火簡單的凍結,而我從端口得到JSON消息不顯示任何東西

非常感謝。

回答

2

看看從Spark Streaming documentation需注意以下問題,看看他們是否適用:

要記住的要點

  • 在本地運行一個Spark流程序,不要使用「本地」或「當地1「作爲主URL。這兩者中的任何一個意味着只有一個線程將用於本地運行任務。如果您使用的是基於接收器的輸入DStream(例如套接字,Kafka,Flume等),那麼將使用單線程來運行接收器,而不用線程來處理接收到的數據。因此,在本地運行時,請始終使用「local [n]」作爲主URL,其中n>要運行的接收器的數量(有關如何設置主站的信息,請參閱Spark Properties)。
  • 將邏輯擴展爲在羣集上運行時,分配給Spark Streaming應用程序的內核數量必須多於接收器的數量。否則系統將接收數據,但無法處理它。
+1

完美的是,我設置了本地[2]而不是本地[3]。非常感謝。 –

+0

順便說一下,如果您使用的是Kafka,您可以從同一個'DStream'中的兩個源文件中讀取,這使您無需爲每個源綁定一個專用線程。在這裏查看這個答案,瞭解如何在卡夫卡中做到這一點 - 在單個'DStream'中打開多個主題。 –

相關問題