2015-10-20 46 views
1

使用Spring Integration Java DSL,我構建了一個流程,我正在與FileSplitter同步處理文件。我已經能夠使用上AbstractFilePayloadTransformersetDeleteFiles標誌要刪除的文件轉換每一行FileMessage進行後續處理之後,像這樣:Spring集成Java DSL流程Splitter/Aggregator在處理完所有行後刪除文件

@Bean 
protected IntegrationFlow s3ChannelFlow() { 
    // do not exhaust filesystem w/ files downloaded from S3 
    FileToInputStreamTransformer transformer = new FileToInputStreamTransformer(); 
    transformer.setDeleteFiles(true); 

    // @see http://docs.spring.io/spring-integration/reference/html/files.html#file-reading 
    // @formatter:off 
    return IntegrationFlows 
     .from(s3Channel()) 
     .channel(StatsUtil.createRunStatsChannel(runStatsRepository)) 
     .transform(transformer) 
     .split(new FileSplitter()) 
     .transform(new JsonToObjectViaTypeHeaderTransformer(new Jackson2JsonObjectMapper(objectMapper), typeSupport)) 
     .publishSubscribeChannel(p -> p.subscribe(persistenceSubFlow())) 
     .get(); 
    // @formatter:on 
} 

這工作得很好,但速度很慢。所以我嘗試上述.split後添加一個ExecutorChannel,像這樣:

.channel(c -> c.executor(Executors.newFixedThreadPool(10))) 

但隨後上述刪除標誌不允許流成功完成刪除文件(S)之前,他們完全讀取。

如果我刪除標誌,我有可能耗盡本地文件系統,其中文件是從S3同步的。

我可以在上面介紹a)完全處理每個文件和b)從本地文件系統中刪除文件一旦完成?換句話說,是否有一種方法可以準確知道文件何時完全處理(何時線已通過池中的線程異步處理)?

如果你很好奇,這裏是我的FileToInputStreamTransformer IMPL:

public class FileToInputStreamTransformer extends AbstractFilePayloadTransformer<InputStream> { 

    private static final int BUFFER_SIZE = 64 * 1024; // 64 kB 

    @Override 
    // @see http://java-performance.info/java-io-bufferedinputstream-and-java-util-zip-gzipinputstream/ 
    protected InputStream transformFile(File payload) throws Exception { 
     return new GZIPInputStream(new FileInputStream(payload), BUFFER_SIZE); 
    } 
} 

UPDATE

那麼,如何在東西下游流動知道要問什麼?

順便說一句,如果我正確地按照你的意見,當我.splitnew FileSplitter(true, true)更新上面,我得到

 
2015-10-20 14:26:45,288 [pool-6-thread-1] org.springframework.integration.handler.LoggingHandler ERROR org.springframework.integration.transformer.MessageTransformationException: failed to transform message; nested exception is java.lang.IllegalArgumentException: 'json' argument must be an instance of: [class java.lang.String, class [B, class java.io.File, class java.net.URL, class java.io.InputStream, class java.io.Reader] , but gotten: class org.springframework.integration.file.splitter.FileSplitter$FileMarker 
    at org.springframework.integration.transformer.AbstractTransformer.transform(AbstractTransformer.java:44) 

回答

0

謝謝阿爾喬姆。

我確實設法解決了這個問題,但可能是以更重的方式。

介紹一個ExecutorChannel造成執行調整相當的波動,包括​​:在AbtractFilePayloadTransformer關閉setDeleteFiles標誌,更新JPA @EntityRunStats和存儲庫這樣,捕捉文件的處理狀態以及處理狀態爲整個跑。總之,處理狀態更新讓流程知道何時從本地文件系統中刪除文件(即,何時完全處理)以及如何返回/stats/{run}端點的狀態,以便用戶知道何時完成運行。

下面是我實現的片段(如果任何人的好奇)...

class FileToInputStreamTransformer extends AbstractFilePayloadTransformer<InputStream> { 

private static final int BUFFER_SIZE = 64 * 1024; // 64 kB 

@Override 
// @see http://java-performance.info/java-io-bufferedinputstream-and-java-util-zip-gzipinputstream/ 
protected InputStream transformFile(File payload) throws Exception { 
    return new GZIPInputStream(new FileInputStream(payload), BUFFER_SIZE); 
} 
} 

public class RunStatsHandler extends AbstractMessageHandler { 

private final SplunkSlf4jLogger log = new SplunkSlf4jLogger(LoggerFactory.getLogger(getClass())); 
private static final int BUFFER_SIZE = 64 * 1024; // 64 kB 

private final RunStatsRepository runStatsRepository; 

public RunStatsHandler(RunStatsRepository runStatsRepository) { 
    this.runStatsRepository = runStatsRepository; 
} 

// Memory efficient routine, @see http://www.baeldung.com/java-read-lines-large-file 
@Override 
protected void handleMessageInternal(Message<?> message) throws Exception { 
    RunStats runStats = message.getHeaders().get(RunStats.RUN, RunStats.class); 
    String token = message.getHeaders().get(RunStats.FILE_TOKEN, String.class); 
    if (runStats != null) { 
     File compressedFile = (File) message.getPayload(); 
     String compressedFileName = compressedFile.getCanonicalPath(); 
     LongAdder lineCount = new LongAdder(); 
     // Streams and Scanner implement java.lang.AutoCloseable 
     InputStream fs = new FileInputStream(compressedFile); 
     InputStream gzfs = new GZIPInputStream(fs, BUFFER_SIZE); 
     try (Scanner sc = new Scanner(gzfs, "UTF-8")) { 
      while (sc.hasNextLine()) { 
       sc.nextLine(); 
       lineCount.increment(); 
      } 
      // note that Scanner suppresses exceptions 
      if (sc.ioException() != null) { 
       log.warn("file.lineCount", ImmutableMap.of("run", runStats.getRun(), "file", compressedFileName, 
         "exception", sc.ioException().getMessage())); 
       throw sc.ioException(); 
      } 
      runStats.addFile(compressedFileName, token, lineCount.longValue()); 
      runStatsRepository.updateRunStats(runStats); 
      log.info("file.lineCount", 
        ImmutableMap.of("run", runStats.getRun(), "file", compressedFileName, "lineCount", lineCount.intValue())); 
     } 
    } 
} 

} 

更新流

@Bean 
protected IntegrationFlow s3ChannelFlow() { 
    // @see http://docs.spring.io/spring-integration/reference/html/files.html#file-reading 
    // @formatter:off 
    return IntegrationFlows 
     .from(s3Channel()) 
     .enrichHeaders(h -> h.headerFunction(RunStats.FILE_TOKEN, f -> UUID.randomUUID().toString())) 
     .channel(runStatsChannel()) 
     .channel(c -> c.executor(Executors.newFixedThreadPool(persistencePoolSize))) 
     .transform(new FileToInputStreamTransformer()) 
     .split(new FileSplitter()) 
     .transform(new JsonToObjectViaTypeHeaderTransformer(new Jackson2JsonObjectMapper(objectMapper), typeSupport)) 
     .publishSubscribeChannel(p -> p.subscribe(persistenceSubFlow())) 
     .get(); 
    // @formatter:on 
} 

@Bean 
public MessageChannel runStatsChannel() { 
    DirectChannel wiretapChannel = new DirectChannel(); 
    wiretapChannel.subscribe(new RunStatsHandler(runStatsRepository)); 
    DirectChannel loggingChannel = new DirectChannel(); 
    loggingChannel.addInterceptor(new WireTap(wiretapChannel)); 
    return loggingChannel; 
} 

不幸的是,我不能共享RunStats和回購的實現。

0

FileSplittermarkersoption正是爲了這個目的:

套裝在文件數據前後發送文件標記消息的開始/結束。標記是帶有FileSplitter.FileMarker有效載荷的消息(標記屬性中的值爲STARTEND)。順序處理某些行被過濾的下游流中的文件時,可能會使用標記。它們使下游處理能夠知道文件何時被完全處理。 END標記包含行數。默認:falsetrue,apply-sequence默認爲false

您可以在下游流程中使用它來確定是否可以刪除文件已經或尚未。

+0

請參閱上面的我的更新。我該如何繼續? –

+0

考慮在一側使用'aggregator'使用'PayloadTypeRouter'或'publishSubscribeChannel',在另一側使用'transform()'前使用'filter'。 –

相關問題