我正在嘗試一個簡單的Flink程序,它只需要一個文件,反轉文件中的字符串&寫出來。Apache Flink:按照mapPartition的順序處理數據
該方案的工作原理,只有個別線路出現故障。
E.g.
文件輸入
Thing,Name
Person,Vineet
Fish,Karp
Dog,Fido
輸出文件
Fish,praK
Thing,emaN
Person,teeniV
Dog,odiF
我期待:
Thing,emaN
Person,teeniV
Fish,praK
Dog,odiF
下面是我寫的,以實現這一目標的計劃:
package testflink;
import java.util.Iterator;
import java.util.StringJoiner;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.util.Collector;
public class BatchJob {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
System.err.println(env.getParallelism());
DataSource<String> file = env.readTextFile("./data.csv");
file.mapPartition((Iterable<String> values, Collector<String> out) -> {
System.err.println("************* " + out.hashCode() + " Begin");
Iterator<String> iterator = values.iterator();
while (iterator.hasNext()) {
String tuple = iterator.next();
System.err.println("************* " + out.hashCode() + tuple);
String[] split = tuple.split(",");
String tuple1Rev = new StringBuilder(split[1]).reverse().toString();
out.collect(new StringJoiner(",").add(split[0]).add(tuple1Rev).toString());
}
System.err.println("************* " + out.hashCode() + " End");
}).returns(String.class).writeAsText("./dataO.csv", WriteMode.OVERWRITE).setParallelism(1);
env.execute("Flink Batch Java API Skeleton");
System.out.println("Done");
}
}
- 是否可以保持輸入順序?有沒有什麼好的解決方法?
- 我知道,我正在閱讀csv &當有可用的
readAsCsv()
方法時,會拆分字符串。問題是,csv可以有每行/元組的動態數量的comlumns。我無法弄清楚如何將它轉換爲每個元組具有動態列數的DataSource。 MapPartition需要定義的類型 - 我如何在運行時替換Tuple0
-Tuple25
? - 而且,最後一個問題 - 我可以限制分區在
Iterable<String> values
參數中永遠不會超過n個值嗎?
在此先感謝! :)
除此之外呢? :) – Vineet
https://stackoverflow.com/questions/34071445/global-sorting-in-apache-flink –