我有一個flink作業,它使用TextOutputFormat將數據寫入目標。代碼是這樣的:Flink在HDFS上寫入產生空文件
String basePath = "/Users/me/out";
// String basePath = "hdfs://10.199.200.204:9000/data";
// ensure we have a format for this.
TextOutputFormat<String> format = new TextOutputFormat<>(new Path(basePath, selection + "/" + uid));
StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
format.configure(GlobalConfiguration.getConfiguration());
format.open(context.getIndexOfThisSubtask(), context.getNumberOfParallelSubtasks());
// then serialize and write.
String record = serializationFunction.map(value);
log.info("Writing " + record);
format.writeRecord(record);
當在普通文件系統上使用路徑作爲目的地時,這很好地工作。但是,當我將基礎路徑更改爲hdfs位置時,它不再按預期工作。會發生什麼情況是,輸出文件實際上是在HDFS上創建的,但它的大小爲零字節。通話期間我沒有收到任何例外。
我正在使用Hadoop 2.6.0和Flink 0.10.1。使用命令行工具(hadoop fs -put ...
)將文件複製到hdfs的作品,所以我認爲我可以排除一些Hadoop的配置錯誤。另外我開始使用Wireshark並將數據傳輸到Hadoop服務器,那麼在實際編寫之前我需要提交一些數據嗎?