2016-08-19 31 views
2

我有一個使用Kafka Logs並將其寫入HDFS的Apache Apex應用程序。AbstractFileOutputWriter生成重複的tmp文件

DAG非常簡單,有一個Kafka Consumer(20分區的2 GB內存用於操作員)通過流連接到「MyWriter extends AbstractFileOutputOperator」。

Issue: 1.我已經看到Writer反覆多次寫入相同大小和相同數據的.tmp文件。我試圖增加寫操作員的內存,增加了作家等的數量。仍然這個問題不斷髮生。

我試着添加/刪除requestFinalize到MyWriter。還是同樣的問題。

@Override 
    public void endWindow() 
    { 
     if (null != fileName) { 
      requestFinalize(fileName); 
     } 
     super.endWindow(); 
    } 

這是我properties.xml中的一個子集

<property> 
    <name>dt.attr.STREAMING_WINDOW_SIZE_MILLIS</name> 
    <value>1000</value> 
    </property> 

    <property> 
    <name>dt.application.myapp.operator.*.attr.APPLICATION_WINDOW_COUNT</name> 
    <value>60</value> 
    </property> 

    <property> 
    <name>dt.application.myapp.operator.*.attr.CHECKPOINT_WINDOW_COUNT</name> 
    <value>60</value> 
    </property> 

<property> 
     <name>dt.application.myapp.operator.myWriter.attr.PARTITIONER</name> 
     <value>com.datatorrent.common.partitioner.StatelessPartitioner:20</value> 
    </property> 

    <property> 
    <name>dt.application.myapp.operator.myWriter.prop.maxLength</name> 
    <value>1000000000</value> <!-- 1 GB File --> 
    </property> 

這是堆棧跟蹤我能夠從dt.log得到了運營商: 運營商獲取不同可能重新部署contianers,拋出這個異常,並不斷寫入重複文件。

java.lang.RuntimeException: java.io.FileNotFoundException: File does not exist: /kafkaconsumetest/inventoryCount/nested/trial2/1471489200000_1471489786800_161.0.1471489802786.tmp 
     at com.datatorrent.lib.io.fs.AbstractFileOutputOperator.setup(AbstractFileOutputOperator.java:418) 
     at com.datatorrent.lib.io.fs.AbstractFileOutputOperator.setup(AbstractFileOutputOperator.java:112) 
     at com.datatorrent.stram.engine.Node.setup(Node.java:187) 
     at com.datatorrent.stram.engine.StreamingContainer.setupNode(StreamingContainer.java:1309) 
     at com.datatorrent.stram.engine.StreamingContainer.access$100(StreamingContainer.java:130) 
     at com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1388) 
    Caused by: java.io.FileNotFoundException: File does not exist: /kafkaconsumetest/inventoryCount/nested/trial2/1471489200000_1471489786800_161.0.1471489802786.tmp 
     at org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1219) 
     at org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1211) 
     at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) 
     at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1211) 
     at com.datatorrent.lib.io.fs.AbstractFileOutputOperator.setup(AbstractFileOutputOperator.java:411) 
     ... 5 more 
2016-08-17 22:17:01,108 INFO com.datatorrent.stram.engine.StreamingContainer: Undeploy request: [161, 177] 
2016-08-17 22:17:01,116 INFO com.datatorrent.stram.engine.StreamingContainer: Undeploy complete. 
2016-08-17 22:17:02,121 INFO com.datatorrent.stram.engine.StreamingContainer: Waiting for pending request. 
2016-08-17 22:17:02,625 INFO com.datatorrent.stram.engine.StreamingContainer: Waiting for pending request. 
2016-08-17 22:17:03,129 INFO com.datatorrent.stram.engine.StreamingContainer: Waiting for pending request. 

回答

2

爲基地運營的代碼是在下面的鏈接,並在下面的評論 引用: https://github.com/apache/apex-malhar/blob/master/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java

通過設置最大文件大小爲1GB,您自動啓用滾動文件;在相關領域有:

protected Long maxLength = Long.MAX_VALUE; 
protected transient boolean rollingFile = false; 

後者被設置爲true在setup()方法,如果前者小於Long.MAX_VALUE默認值的值。

當滾動文件啓用時,文件定稿自動完成,因此您不應該撥打requestFinalize()

其次,在你的MyWriter類,除去endWindow()覆蓋,並確保您創建包含在setup()方法作業者ID所需的文件名,並在getFileName()覆蓋返回這個文件名;這可以確保多個分區不會互相踩在一起。例如:

@NotNull 
private String fileName;   // current base file name 

private transient String fName; // per partition file name 

@Override 
public void setup(Context.OperatorContext context) 
{ 
    // create file name for this partition by appending the operator id to 
    // the base name 
    // 
    long id = context.getId(); 
    fName = fileName + "_p" + id; 
    super.setup(context); 

    LOG.debug("Leaving setup, fName = {}, id = {}", fName, id); 
} 

@Override 
protected String getFileName(Long[] tuple) 
{ 
    return fName; 
} 

基礎文件名(fileName在上面的代碼)可以在代碼中直接設置或從一個XML文件中的屬性初始化(你需要添加一個getter和setter它以及)。

你可以看到在這個類型的使用的一個例子: https://github.com/DataTorrent/examples/tree/master/tutorials/fileOutput

夫婦的其他建議:

  1. 設置分割數爲1(或註釋掉設置PARTITIONER屬性的XML )並確保一切按預期工作。這將消除任何與分區無關的問題。如果可能,還可以將最大文件大小減少到2K或4K,這樣測試起來就更容易了。
  2. 一旦單分區案例工作,將分區數量增加到2。如果這個工作,任意更大的數字(在合理範圍內)也應該起作用。