2015-05-13 33 views
1

我從磁盤中讀取一個CSV文件(http://data.gdeltproject.org/events/index.html)與弗林克(Java,Maven版本8.1)前,出現以下情況例外:阿帕奇弗林克通道接收的情況下完成當前部分記錄

ERROR operators.DataSinkTask: Error in user code: Channel received an event before completing the current partial record.: DataSink(Print to System.out) (4/4) 
java.lang.IllegalStateException: Channel received an event before completing the current partial record. 
    at org.apache.flink.runtime.io.network.channels.InputChannel.readRecord(InputChannel.java:158) 
    at org.apache.flink.runtime.io.network.gates.InputGate.readRecord(InputGate.java:176) 
    at org.apache.flink.runtime.io.network.api.MutableRecordReader.next(MutableRecordReader.java:51) 
    at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:53) 
    at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:170) 
    at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257) 
    at java.lang.Thread.run(Thread.java:745) 

我的代碼:

public static void main(String[] args) { 
    // set up execution environment 
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 
    //env.setDegreeOfParallelism(1); 
    // get input points 
    DataSet<GeoTimeDataTupel> points = getPointDataSet(env); 
    points.print(); 
    // execute program 
    try { 
     env.execute("KMeans Flink"); 
    } catch (Exception e) { 
     e.printStackTrace(); 
    } 
} 
private static DataSet<GeoTimeDataTupel> getPointDataSet(ExecutionEnvironment env) { 
     // load properties 
     Properties pro = new Properties(); 
     try { 
      pro.load(new FileInputStream("./resources/config.properties")); 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 
     String inputFile = pro.getProperty("input"); 
     // map csv file 
     return env.readCsvFile(inputFile) 
      .ignoreInvalidLines() 
      .fieldDelimiter('\u0009') 
      .lineDelimiter("\n") 
      .includeFields(true, true, false, false, false, false, false, false, false, false, false 
        , false, false, false, false, false, false, false, false, false, false 
        , false, false, false, false, false, false, false, false, false, false 
        , false, false, false, false, false, false, false, false, true, true 
        , false, false, false, false, false, false, false, false, false, false 
        , false, false, false, false, false, false, false) 
      .types(String.class, Long.class, Double.class, Double.class) 
      .map(new TuplePointConverter()); 
    } 

也許有人有解決方案嗎?

問候保羅

回答

4

我張貼從Apache弗林克答案在這裏郵件列表,所以人們不必通過郵件列表歸檔閱讀:

的原因錯誤是自定義序列邏輯被使用,並且反序列化函數是錯誤的並且沒有消耗所有數據。

最新的主人有一個改進的錯誤消息。

背景:

弗林克支持兩種類型的接口,允許程序員來實現自己的序列化程序:Writables(Hadoop的核心類型的接口)和值(弗林克自己的自定義序列化接口)。

相關問題