2017-06-30 42 views
0

我正在嘗試一個簡單的Flink程序,它只需要一個文件,反轉文件中的字符串&寫出來。Apache Flink:按照mapPartition的順序處理數據

該方案的工作原理,只有個別線路出現故障。

E.g.

文件輸入

Thing,Name 
Person,Vineet 
Fish,Karp 
Dog,Fido 

輸出文件

Fish,praK 
Thing,emaN 
Person,teeniV 
Dog,odiF 

我期待:

Thing,emaN 
Person,teeniV 
Fish,praK 
Dog,odiF 

下面是我寫的,以實現這一目標的計劃:

package testflink; 

import java.util.Iterator; 
import java.util.StringJoiner; 

import org.apache.flink.api.java.ExecutionEnvironment; 
import org.apache.flink.api.java.operators.DataSource; 
import org.apache.flink.core.fs.FileSystem.WriteMode; 
import org.apache.flink.util.Collector; 

public class BatchJob { 

    public static void main(String[] args) throws Exception { 
     final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 
     System.err.println(env.getParallelism()); 
     DataSource<String> file = env.readTextFile("./data.csv"); 
     file.mapPartition((Iterable<String> values, Collector<String> out) -> { 
      System.err.println("************* " + out.hashCode() + " Begin"); 
      Iterator<String> iterator = values.iterator(); 
      while (iterator.hasNext()) { 
       String tuple = iterator.next(); 
       System.err.println("************* " + out.hashCode() + tuple); 
       String[] split = tuple.split(","); 
       String tuple1Rev = new StringBuilder(split[1]).reverse().toString(); 
       out.collect(new StringJoiner(",").add(split[0]).add(tuple1Rev).toString()); 
      } 
      System.err.println("************* " + out.hashCode() + " End"); 
     }).returns(String.class).writeAsText("./dataO.csv", WriteMode.OVERWRITE).setParallelism(1); 
     env.execute("Flink Batch Java API Skeleton"); 
     System.out.println("Done"); 
    } 
} 
  • 是否可以保持輸入順序?有沒有什麼好的解決方法?
  • 我知道,我正在閱讀csv &當有可用的readAsCsv()方法時,會拆分字符串。問題是,csv可以有每行/元組的動態數量的comlumns。我無法弄清楚如何將它轉換爲每個元組具有動態列數的DataSource。 MapPartition需要定義的類型 - 我如何在運行時替換Tuple0 - Tuple25
  • 而且,最後一個問題 - 我可以限制分區在Iterable<String> values參數中永遠不會超過n個值嗎?

在此先感謝! :)

+0

除此之外呢? :) – Vineet

+0

https://stackoverflow.com/questions/34071445/global-sorting-in-apache-flink –

回答

2

Flink的mapPartition維護每個並行分區內記錄的順序。但是,用例中的問題是數據如何分配給MapPartition運算符的並行任務。

您正在使用的TextInputFormat將輸入文件劃分爲若干個由數據源運算符的並行實例獨立處理的輸入拆分。每個數據源實例將其所有記錄本地轉發給後續的MapPartition操作員,並將其結果記錄轉發給接收器。該管道是這樣的:

source_1 -> mapPartition_1 -> sink_1 
source_2 -> mapPartition_2 -> sink_2 
source_3 -> mapPartition_3 -> sink_3 
... 
從源頭

所以,所有的記錄,以便進行處理。但是,由於輸入拆分是隨機分配給源任務和匯單獨運行的(不進行協調),所以輸出只是部分排序的(從相同拆分中讀取的記錄是有序的)。

將源的並行性設置爲1將無濟於事,因爲它將以循環方式將其結果記錄發送到後續任務,以利用後續運​​算符的並行性。同時將整個作業的並行性設置爲1也無濟於事,因爲拆分仍然可以通過單一來源任務以隨機順序進行處理。我知道的唯一解決方案是在輸入結果之前將每個輸入記錄編號並輸入sorting on that number (with range partitioning for parallel processing)

+0

我現在有一個csv我排序。第一列是行號。它排序詞典。我怎樣才能使它成爲一個適當的數字排序? – Vineet

+1

將行分割成一個'Tuple2 ',其中行號是Integer,其餘行是String。然後你可以在Integer字段上排序。 –