2016-03-29 31 views
0

我已經寫了從s3讀取數據的spark流作業。 作業了一系列mapwithstate其次maptopair電話,像下面的:輸入流數據不是在任務之間平均分配

JavaDStream<String> cdrLines = ssc.textFileStream(cdrInputFile); 
JavaDStream<CDR> cdrRecords = cdrLines.map(x -> cdrStreamParser.parse(x)); 
JavaDStream<CDR> cdrRecordsFiltered = cdrRecords 
     .filter(t -> t != null); 
JavaPairDStream<String, CDR> sTripletStream = cdrRecordsFiltered 
     .mapToPair(s -> new Tuple2<String, CDR>(s 
       .gettNumber(), s)); 

JavaPairDStream<String, Tuple2<CDR, List<StatusCode>>> stateDstream1 = sTripletStream 
     .mapWithState(
       StateSpec.function(hsMappingFunc).initialState(
         tripletRDD)).mapToPair(s -> s); 

JavaPairDStream<String,Tuple2<CDR,List<StatusCode>>> stateDstream2 = stateDstream1 
.mapWithState(StateSpec.function(cfMappingFunc).initialState(cfHistoryRDD)) 
     .mapToPair(s -> s); 

JavaPairDStream<String, Tuple2<CDR, List<StatusCode>>> stateDstream3 = stateDstream2 
     .mapWithState(StateSpec.function(imeiMappingFunc).initialState(imeiRDD)) 
     .mapToPair(s -> s); 

我spark.default.parallelism設置爲6。我看到第一個和最後maptopair階段的速度不夠快。第二和第三個maptopair階段非常緩慢。

這些階段中的每一個都貫穿6個任務。在第二和第三個maptopair階段,5個任務以2s運行。但一項任務需要很長時間〜3-4分鐘。與其他任務相比,該任務的洗牌數據非常高,從而導致瓶頸。

有沒有一種方法可以更均勻地分配所有任務之間的負載?

+1

你能解釋你的代碼試圖實現什麼嗎?也許在更好的情況下,我們可以幫助制定更好的圖表。 –

回答

0

這是用於CDR處理的用例。每個CDR事件都有這些字段telno,imei,imsi,callforward,timestamp。

我保持3種火花狀態信息:1。最後知道CDR事件(記錄)爲給定的電話號碼2. callforward號碼列表所有已知的IMEI的每個電話3.列表。 三個mapwithstate函數調用對應於以下功能: 第一步:由於CDR事件到來時,我需要做一些領域比較與同一個電話號碼最後已知的CDR事件。我維持火花狀態給定telno最新的事件,這樣我可以做的領域比較新的CDR事件進來 第二步:對於給定的telno,我想檢查callforward數量是已知的數量或沒有。所以我需要保持telno的歷史。 - >該州的呼叫號碼列表。 第三步:我需要保持所有的IMEI號碼清單碰到,至今在狀態,這樣在CDR事件的每IMEI,我們如果已知的或新的IMEI說。