1
我正在開發一個Spark Streaming應用程序,我需要在Python中使用來自兩臺服務器的輸入流,每個應用程序每秒向Spark上下文發送一條JSON消息。爲什麼Spark Streaming在發送兩個輸入流時停止工作?
我的問題是,如果我只在一個流上執行操作,那麼一切正常。但是如果我有兩個來自不同服務器的數據流,那麼Spark在它可以打印任何內容之前就凍結了,並且只有當兩個服務器都發送了他們必須發送的所有JSON消息(當它檢測到'socketTextStream
沒有接收數據時。
這裏是我的代碼:
JavaReceiverInputDStream<String> streamData1 = ssc.socketTextStream("localhost",996,
StorageLevels.MEMORY_AND_DISK_SER);
JavaReceiverInputDStream<String> streamData2 = ssc.socketTextStream("localhost", 9995,StorageLevels.MEMORY_AND_DISK_SER);
JavaPairDStream<Integer, String> dataStream1= streamData1.mapToPair(new PairFunction<String, Integer, String>() {
public Tuple2<Integer, String> call(String stream) throws Exception {
Tuple2<Integer,String> streamPair= new Tuple2<Integer, String>(1, stream);
return streamPair;
}
});
JavaPairDStream<Integer, String> dataStream2= streamData2.mapToPair(new PairFunction<String, Integer, String>() {
public Tuple2<Integer, String> call(String stream) throws Exception {
Tuple2<Integer,String> streamPair= new Tuple2<Integer, String>(2, stream);
return streamPair;
}
});
dataStream2.print(); //for example
注意到有沒有錯誤消息,在啓動上下文之後星火簡單的凍結,而我從端口得到JSON消息不顯示任何東西
。非常感謝。
完美的是,我設置了本地[2]而不是本地[3]。非常感謝。 –
順便說一下,如果您使用的是Kafka,您可以從同一個'DStream'中的兩個源文件中讀取,這使您無需爲每個源綁定一個專用線程。在這裏查看這個答案,瞭解如何在卡夫卡中做到這一點 - 在單個'DStream'中打開多個主題。 –