2015-12-17 29 views
3

我有一個flink作業,它使用TextOutputFormat將數據寫入目標。代碼是這樣的:Flink在HDFS上寫入產生空文件

String basePath = "/Users/me/out"; 
    // String basePath = "hdfs://10.199.200.204:9000/data"; 
    // ensure we have a format for this. 
    TextOutputFormat<String> format = new TextOutputFormat<>(new Path(basePath, selection + "/" + uid)); 
    StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext(); 
    format.configure(GlobalConfiguration.getConfiguration()); 
    format.open(context.getIndexOfThisSubtask(), context.getNumberOfParallelSubtasks()); 
    // then serialize and write. 
    String record = serializationFunction.map(value); 
    log.info("Writing " + record); 
    format.writeRecord(record); 

當在普通文件系統上使用路徑作爲目的地時,這很好地工作。但是,當我將基礎路徑更改爲hdfs位置時,它不再按預期工作。會發生什麼情況是,輸出文件實際上是在HDFS上創建的,但它的大小爲零字節。通話期間我沒有收到任何例外。

我正在使用Hadoop 2.6.0和Flink 0.10.1。使用命令行工具(hadoop fs -put ...)將文件複製到hdfs的作品,所以我認爲我可以排除一些Hadoop的配置錯誤。另外我開始使用Wireshark並將數據傳輸到Hadoop服務器,那麼在實際編寫之前我需要提交一些數據嗎?

回答

2

爲了將結果清理到HDFS,必須在完成記錄寫入後調用TextOutputFormatclose方法。

// do writing 
while (some condition) { 
    format.writeRecord(record); 
} 

// finished writing 
format.close(); 
0

我發現它爲什麼發生。實際上有兩個原因:

  1. 正如Till Rohrmann指出的那樣,輸出格式沒有被刷新。由於我在流式作業中使用格式,因此關閉格式不適用。我使出寫我自己的格式,可以刷新:

    public class MyTextOutputFormat<T> extends TextOutputFormat<T> { 
        public MyTextOutputFormat(Path outputPath) { 
         super(outputPath); 
        } 
    
        public MyTextOutputFormat(Path outputPath, String charset) { 
         super(outputPath, charset); 
        } 
    
        // added a custom flush method here. 
        public void flush() throws IOException { 
         stream.flush(); 
        } 
    } 
    
  2. 我在VM guest虛擬機上運行HDFS和VM主機連接到它。 Flink的HDFS客戶端默認使用數據節點的IP地址連接到數據節點。但datanode的IP地址報告爲127.0.0.1。所以flink試圖連接到127.0.0.1,當然在主機系統中沒有運行HDFS數據節點。然而這只是在我添加了手動沖洗操作後才顯示出來。爲了解決這個問題,我不得不改變兩兩件事:

    • 裏面的VM guest虛擬機,修改$HADOOP_HOME/etc/hadoop/hdfs-site.xml並添加

      <property> 
          <name>dfs.datanode.hostname</name> 
          <value>10.199.200.204</value> <!-- IP of my VM guest --> 
      </property> 
      

      這種變化做出的名稱節點上報數據節點的正確路由的主機名。它實際上是一個無證的設置,但似乎工作。

    • 在其中弗林克實際運行系統,我有一個文件夾(例如/home/me/conf)中創建一個hdfs-site.xml只好再設置一個環境變量HADOOP_CONF_DIR指向/home/me/conf。該文件有如下內容:

      <?xml version="1.0" encoding="UTF-8"?> 
      <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> 
      <configuration> 
          <property> 
           <name>dfs.client.use.datanode.hostname</name> 
           <value>true</value> 
          </property> 
      </configuration> 
      

      這種變化指示通過Hadoop客戶端使用主機名代替IP地址連接到數據管理部。這些更改後,我的數據被正確寫入HDFS。