我已經寫了從s3讀取數據的spark流作業。 作業了一系列mapwithstate其次maptopair電話,像下面的:輸入流數據不是在任務之間平均分配
JavaDStream<String> cdrLines = ssc.textFileStream(cdrInputFile);
JavaDStream<CDR> cdrRecords = cdrLines.map(x -> cdrStreamParser.parse(x));
JavaDStream<CDR> cdrRecordsFiltered = cdrRecords
.filter(t -> t != null);
JavaPairDStream<String, CDR> sTripletStream = cdrRecordsFiltered
.mapToPair(s -> new Tuple2<String, CDR>(s
.gettNumber(), s));
JavaPairDStream<String, Tuple2<CDR, List<StatusCode>>> stateDstream1 = sTripletStream
.mapWithState(
StateSpec.function(hsMappingFunc).initialState(
tripletRDD)).mapToPair(s -> s);
JavaPairDStream<String,Tuple2<CDR,List<StatusCode>>> stateDstream2 = stateDstream1
.mapWithState(StateSpec.function(cfMappingFunc).initialState(cfHistoryRDD))
.mapToPair(s -> s);
JavaPairDStream<String, Tuple2<CDR, List<StatusCode>>> stateDstream3 = stateDstream2
.mapWithState(StateSpec.function(imeiMappingFunc).initialState(imeiRDD))
.mapToPair(s -> s);
我spark.default.parallelism設置爲6。我看到第一個和最後maptopair階段的速度不夠快。第二和第三個maptopair階段非常緩慢。
這些階段中的每一個都貫穿6個任務。在第二和第三個maptopair階段,5個任務以2s運行。但一項任務需要很長時間〜3-4分鐘。與其他任務相比,該任務的洗牌數據非常高,從而導致瓶頸。
有沒有一種方法可以更均勻地分配所有任務之間的負載?
你能解釋你的代碼試圖實現什麼嗎?也許在更好的情況下,我們可以幫助制定更好的圖表。 –