2015-06-02 172 views
5

我想分配我的輸入的每一行id - 應該是從0N - 1的數字,其中N是輸入中的行數。zipWithIndex Apache Flink

粗略地說,我希望能夠做到像下面這樣:

val data = sc.textFile(textFilePath, numPartitions) 
val rdd = data.map(line => process(line)) 
val rddMatrixLike = rdd.zipWithIndex.map { case (v, idx) => someStuffWithIndex(idx, v) } 

但在Apache的弗林克。可能嗎?

+0

這是一個有趣的問題。我會試着想出一個實現。 –

回答

6

這現在是Apache Flink 0.10-SNAPSHOT版本的一部分。 zipWithIndex(in)zipWithUniqueId(in)的示例可在官方Flink documentation中獲得。

5

下面是一個簡單的實現功能:

public class ZipWithIndex { 

public static void main(String[] args) throws Exception { 

    ExecutionEnvironment ee = ExecutionEnvironment.getExecutionEnvironment(); 

    DataSet<String> in = ee.readTextFile("/home/robert/flink-workdir/debug/input"); 

    // count elements in each partition 
    DataSet<Tuple2<Integer, Long>> counts = in.mapPartition(new RichMapPartitionFunction<String, Tuple2<Integer, Long>>() { 
     @Override 
     public void mapPartition(Iterable<String> values, Collector<Tuple2<Integer, Long>> out) throws Exception { 
      long cnt = 0; 
      for (String v : values) { 
       cnt++; 
      } 
      out.collect(new Tuple2<Integer, Long>(getRuntimeContext().getIndexOfThisSubtask(), cnt)); 
     } 
    }); 

    DataSet<Tuple2<Long, String>> result = in.mapPartition(new RichMapPartitionFunction<String, Tuple2<Long, String>>() { 
     long start = 0; 

     @Override 
     public void open(Configuration parameters) throws Exception { 
      super.open(parameters); 
      List<Tuple2<Integer, Long>> offsets = getRuntimeContext().getBroadcastVariable("counts"); 
      Collections.sort(offsets, new Comparator<Tuple2<Integer, Long>>() { 
       @Override 
       public int compare(Tuple2<Integer, Long> o1, Tuple2<Integer, Long> o2) { 
        return ZipWithIndex.compare(o1.f0, o2.f0); 
       } 
      }); 
      for(int i = 0; i < getRuntimeContext().getIndexOfThisSubtask(); i++) { 
       start += offsets.get(i).f1; 
      } 
     } 

     @Override 
     public void mapPartition(Iterable<String> values, Collector<Tuple2<Long, String>> out) throws Exception { 
      for(String v: values) { 
       out.collect(new Tuple2<Long, String>(start++, v)); 
      } 
     } 
    }).withBroadcastSet(counts, "counts"); 
    result.print(); 

} 

public static int compare(int x, int y) { 
    return (x < y) ? -1 : ((x == y) ? 0 : 1); 
} 
} 

這是它如何工作的:我使用的是第一mapPartition()操作去了分區中的所有元素來算多少元素都在那裏。 我需要知道每個分區中元素的數量,以便在將元素分配給元素時正確設置偏移量。 第一個mapPartition的結果是一個包含映射的DataSet。我將這個DataSet廣播給所有第二個運算符,它們將ID分配給輸入中的元素。 在第二個mapPartition()open()方法中,我正在計算每個分區的偏移量。

我可能會將代碼貢獻給Flink(與其他提交者討論後)。

+0

謝謝羅伯特!你能否也許用幾句話解釋這是如何工作的?例如。爲什麼我們使用'getRuntimeContext()。getIndexOfThisSubtask()'和爲什麼每個分區的廣播計數可以幫助? –

+0

好點。我會盡快添加一些說明。 –

+0

已添加描述 –