2014-11-21 88 views
3

我目前在Spring 4.1.2中使用Spring Integration 4.1.0。 我有一個要求,能夠逐行讀取文件並將每行讀取作爲消息。基本上我想允許我們的消息源之一「重放」,但消息不是保存在單個文件中,而是保存在單個文件中。我對此用例沒有交易要求。 我的要求是,JVM運行上與此類似,除了張貼在駐留在同一臺服務器上的一個文件:spring integration - read a remote file line by line春季集成入站通道適配器逐行讀取大文件

當我看到它,我有以下選擇:

1.使用int-file:inbound-channel-adapter讀取文件,然後「分割」該文件,以使1條消息現在成爲多條消息。 樣品配置文件:

<?xml version="1.0" encoding="UTF-8"?> 
    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xmlns:int-file="http://www.springframework.org/schema/integration/file" xmlns:task="http://www.springframework.org/schema/task" 
     xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd 
      http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd 
      http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd 
      http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd 
      http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd 
      http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd 
      http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd"> 

     <int-file:inbound-channel-adapter id="filereader" directory="/tmp" filename-pattern="myfile.txt" channel="channel1"/> 
     <int-file:file-to-string-transformer input-channel="channel1" output-channel="channel2"/> 
     <int:channel id="channel1"/> 
     <int:splitter input-channel="channel2" output-channel="nullChannel"/> 
     <int:channel id="channel2"/> 
    </beans> 

的問題是,該文件是非常大的,並使用上述技術,當整個文件被首先讀入存儲器,然後分裂和JVM用完堆空間。真正需要的步驟是:讀取一行並將行轉換爲消息,發送消息,從內存中刪除消息,重複。

  • int-file:tail-inbound-channel-adapter使用與end="false"(其基本上表示從文件的開始閱讀)。根據需要爲每個文件啓動和停止此適配器(每次啓動前更改文件名)。 示例配置文件:

    <?xml version="1.0" encoding="UTF-8"?> 
    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xmlns:int-file="http://www.springframework.org/schema/integration/file" xmlns:task="http://www.springframework.org/schema/task" 
        xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd 
         http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd 
         http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd 
         http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd 
         http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd 
         http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd 
         http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd"> 
    
        <int-file:tail-inbound-channel-adapter id="apache" 
         channel="exchangeSpringQueueChannel" 
         task-executor="exchangeFileReplayTaskExecutor" 
         file="C:\p2-test.txt" 
         delay="1" 
         end="false" 
         reopen="true" 
         file-delay="10000" /> 
    
        <int:channel id="exchangeSpringQueueChannel" /> 
        <task:executor id="exchangeFileReplayTaskExecutor" pool-size="1" /> 
    </beans> 
    
  • 讓Spring集成調入Spring Batch的,並使用一個ItemReader處理文件。當然允許在整個過程中進行更細粒度的控制,但是需要大量的工作來設置工作存儲庫的內容等等(我不關心工作歷史,所以我要麼告訴工作不要記錄狀態和/或使用內存中的MapJobRepository)。

  • 4.通過擴展MessageProducerSupport創建我自己的FileLineByLineInboundChannelAdapter。 大部分代碼可以從ApacheCommonsFileTailingMessageProducer(也請參閱http://forum.spring.io/forum/spring-projects/integration/119897-custom-upd-inbound-channel-adapter)借入。下面是一個示例,但需要一些工作才能將讀數放入它自己的Thread中,以便在逐行讀取時尊重stop()命令。

    package com.xxx.exchgateway.common.util.springintegration; 
    
        import java.io.BufferedReader; 
        import java.io.File; 
        import java.io.FileInputStream; 
        import java.io.FileNotFoundException; 
        import java.io.IOException; 
        import java.io.InputStreamReader; 
        import org.apache.commons.io.IOUtils; 
        import org.springframework.core.task.SimpleAsyncTaskExecutor; 
        import org.springframework.core.task.TaskExecutor; 
        import org.springframework.integration.core.MessageSource; 
        import org.springframework.integration.endpoint.MessageProducerSupport; 
        import org.springframework.integration.file.FileHeaders; 
        import org.springframework.messaging.Message; 
        import org.springframework.util.Assert; 
    
        /** 
        * A lot of the logic for this class came from {@link #ApacheCommonsFileTailingMessageProducer}. 
        * See {@link http://forum.spring.io/forum/spring-projects/integration/119897-custom-upd-inbound-channel-adapter} 
        */ 
        public class FileLineByLineInboundChannelAdapter extends MessageProducerSupport implements MessageSource<String> { 
         private volatile File file; 
    
         /** 
         * The name of the file you wish to tail. 
         * @param file The absolute path of the file. 
         */ 
         public void setFile(File file) { 
          Assert.notNull("'file' cannot be null"); 
          this.file = file; 
         } 
    
         protected File getFile() { 
          if (this.file == null) { 
           throw new IllegalStateException("No 'file' has been provided"); 
          } 
          return this.file; 
         } 
    
         @Override 
         public String getComponentType() { 
          return "file:line-by-line-inbound-channel-adapter"; 
         } 
    
         private void readFile() { 
          FileInputStream fstream; 
          try { 
           fstream = new FileInputStream(getFile()); 
    
           BufferedReader br = new BufferedReader(new InputStreamReader(fstream)); 
    
           String strLine; 
    
           // Read File Line By Line, make sure we honor if someone manually sets the isRunning=false (via clicking the stop() method in JMX) 
           while ((strLine = br.readLine()) != null && isRunning()) { 
            send(strLine); 
           } 
    
           //Close the input stream 
           IOUtils.closeQuietly(br); 
           IOUtils.closeQuietly(fstream); 
          } catch (FileNotFoundException e) { 
           // TODO Auto-generated catch block 
           e.printStackTrace(); 
          } catch (IOException e) { 
           // TODO Auto-generated catch block 
           e.printStackTrace(); 
          } 
         } 
    
         @Override 
         protected void doStart() { 
          super.doStart(); 
    
          // TODO this needs to be moved into it's own thread since isRunning() will return "false" until this method has completed 
          // and we want to honor the stop() command while we read line-by-line 
          readFile(); 
         } 
    
         protected void send(String line) { 
          Message<?> message = this.getMessageBuilderFactory().withPayload(line).setHeader(FileHeaders.FILENAME, this.file.getAbsolutePath()).build(); 
          super.sendMessage(message); 
         } 
    
         @Override 
         public Message<String> receive() { 
          // TODO Auto-generated method stub 
          return null; 
         } 
        } 
    

    這在我看來,不,我的用例是典型的事物領域之外的人可能會喜歡這樣做,我很驚訝,我無法找到一個解決的辦法失開箱。然而,我搜索了很多,然後看了很多例子,不幸的是還沒有找到符合我需求的東西。

    我假設也許我錯過了框架已經提供的東西(儘管也許這屬於Spring Integraton和Spring Batch之間的模糊界線)。有人可以告訴我,如果我完全脫離了我的想法,或者有一個我錯過的簡單解決方案,或者提供了其他建議嗎?

    回答

    3

    Spring集成4.x版具有使用迭代器作爲消息的一個不錯的新功能:

    Spring Integration Reference

    有4個版本開始。1,AbstractMessageSplitter支持要分割的值的Iterator類型。

    這允許將Iterator作爲不讀取整個文件的消息發送到內存中。

    Here is的Spring上下文分裂CSV文件,一個簡單的例子爲每行一個消息:

    <int-file:inbound-channel-adapter 
         directory="${inputFileDirectory:/tmp}" 
         channel="inputFiles"/> 
    
    <int:channel id="inputFiles"> 
        <int:dispatcher task-executor="executor"/> 
    </int:channel> 
    
    <int:splitter 
        input-channel="inputFiles" 
        output-channel="output"> 
        <bean 
         class="FileSplitter" 
         p:commentPrefix="${commentPrefix:#}" /> 
    </int:splitter> 
    
    <task:executor 
        id="executor" 
        pool-size="${poolSize:8}" 
        queue-capacity="${aueueCapacity:0}" 
        rejection-policy="CALLER_RUNS" /> 
    
    <int:channel id="output"/> 
    

    這是splitter implementation

    import java.io.BufferedReader; 
    import java.io.File; 
    import java.io.FileReader; 
    import java.io.IOException; 
    import java.util.Iterator; 
    
    import org.slf4j.Logger; 
    import org.slf4j.LoggerFactory; 
    import org.springframework.integration.splitter.AbstractMessageSplitter; 
    import org.springframework.integration.transformer.MessageTransformationException; 
    import org.springframework.messaging.Message; 
    import org.springframework.util.Assert; 
    
    public class FileSplitter extends AbstractMessageSplitter { 
        private static final Logger log = LoggerFactory.getLogger(FileSplitter.class); 
    
        private String commentPrefix = "#"; 
    
        public Object splitMessage(Message<?> message) { 
         if(log.isDebugEnabled()) { 
          log.debug(message.toString()); 
         } 
         try { 
    
          Object payload = message.getPayload(); 
          Assert.isInstanceOf(File.class, payload, "Expected java.io.File in the message payload"); 
    
          return new BufferedReaderFileIterator((File) payload); 
         } 
         catch (IOException e) { 
          String msg = "Unable to transform file: " + e.getMessage(); 
          log.error(msg); 
          throw new MessageTransformationException(msg, e); 
         } 
        } 
    
        public void setCommentPrefix(String commentPrefix) { 
         this.commentPrefix = commentPrefix; 
        } 
    
        public class BufferedReaderFileIterator implements Iterator<String> { 
    
         private File file; 
         private BufferedReader bufferedReader; 
         private String line; 
    
         public BufferedReaderFileIterator(File file) throws IOException { 
          this.file = file; 
          this.bufferedReader = new BufferedReader(new FileReader(file)); 
          readNextLine(); 
         } 
    
         @Override 
         public boolean hasNext() { 
          return line != null; 
         } 
    
         @Override 
         public String next() { 
          try { 
           String res = this.line; 
           readNextLine(); 
           return res; 
          } 
          catch (IOException e) { 
           log.error("Error reading file", e); 
           throw new RuntimeException(e); 
          } 
         } 
    
         void readNextLine() throws IOException { 
          do { 
           line = bufferedReader.readLine(); 
          } 
          while(line != null && line.trim().startsWith(commentPrefix)); 
    
          if(log.isTraceEnabled()) { 
           log.trace("Read next line: {}", line); 
          } 
    
          if(line == null) { 
           close(); 
          } 
         } 
    
         void close() throws IOException { 
          bufferedReader.close(); 
          file.delete(); 
         } 
    
         @Override 
         public void remove() { 
          throw new UnsupportedOperationException(); 
         } 
    
        } 
    
    } 
    

    請注意從返回的Iterator對象splitMessage()處理程序方法。

    +0

    感謝您的回覆Smollet。這真的很有幫助。 我試過你的解決方案(甚至是第二個更新版本),它有一個小故障,它讀取文件的第一行,然後繼續讀取同一行。 因此,我編輯了你原來的文章,對next()和readNextLine()稍作修改。 我假設(也許不正確),一旦文件的所有行都被讀取,大多數人會想停止閱讀文件。 如果這確實是人們的需求,那麼我所做的更新還調整了hasNext()方法以在最後一行被讀取後關閉文件。 – 2014-11-25 23:53:27

    +0

    有趣的一點!最近幾個小時我一直在調試類似的問題(在循環中讀取相同的文件)。 確實需要關閉閱讀器,但是您的更改尚未解決當前的問題。我懷疑該文件:入站通道適配器不斷髮送帶有相同文件的消息(如果不是在入站通道適配器上設置prevent-duplicates =「true」)。在處理完文件後,刪除文件也會很好。 – 2014-11-26 01:16:15

    +0

    我冒昧地更新了代碼片段,這些更改幫助我完成了Spring XD環境中的文件處理。 @TonyFalabella它應該與您的需求兼容(如果不需要,只需註釋file.delete())。 – 2014-11-26 02:03:36

    0

    我也有這個,我也將文件複製到另一個文件夾,從文件中讀取數據也

    fileCopyApplicationContext.xml

    <?xml version="1.0" encoding="UTF-8"?> 
    <beans xmlns="http://www.springframework.org/schema/beans" 
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" 
        xmlns:file="http://www.springframework.org/schema/integration/file" 
        xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p" 
        xsi:schemaLocation="http://www.springframework.org/schema/beans 
          http://www.springframework.org/schema/beans/spring-beans.xsd 
          http://www.springframework.org/schema/integration 
          http://www.springframework.org/schema/integration/spring-integration.xsd 
          http://www.springframework.org/schema/integration/file 
          http://www.springframework.org/schema/integration/file/spring-integration-file.xsd 
          http://www.springframework.org/schema/context 
          http://www.springframework.org/schema/context/spring-context.xsd"> 
    
        <context:property-placeholder /> 
    
        <file:inbound-channel-adapter id="filesIn" 
         directory="E:/usmandata/logs/input/" filter="onlyPropertyFiles" 
         auto-startup="true"> 
         <int:poller id="poller" fixed-delay="500" /> 
        </file:inbound-channel-adapter> 
    
    
    
        <int:service-activator input-channel="filesIn" 
         output-channel="filesOut" ref="handler" /> 
    
        <file:outbound-channel-adapter id="filesOut" 
         directory="E:/usmandata/logs/output/" /> 
    
    
    
    
        <bean id="handler" class="com.javarticles.spring.integration.file.FileHandler" /> 
        <bean id="onlyPropertyFiles" 
         class="org.springframework.integration.file.config.FileListFilterFactoryBean" 
         p:filenamePattern="*.log" /> 
    </beans> 
    

    FileHandler.java

    package com.javarticles.spring.integration.file; 
    
    import java.io.File; 
    import java.io.IOException; 
    import java.io.RandomAccessFile; 
    import java.nio.ByteBuffer; 
    import java.nio.channels.FileChannel; 
    
    public class FileHandler { 
        public File handleFile(File input) throws IOException { 
         // System.out.println("Copying file: " + input.getAbsolutePath()); 
    
    
         RandomAccessFile file = new RandomAccessFile(input,"r"); 
    
         FileChannel channel = file.getChannel(); 
    
         //System.out.println("File size is: " + channel.size()); 
    
         ByteBuffer buffer = ByteBuffer.allocate((int) channel.size()); 
    
         channel.read(buffer); 
    
         buffer.flip();//Restore buffer to position 0 to read it 
    
         System.out.println("Reading content and printing ... "); 
    
         for (int i = 0; i < channel.size(); i++) { 
          System.out.print((char) buffer.get()); 
         } 
    
         channel.close(); 
         file.close(); 
         return input; 
        } 
    } 
    

    SpringIntegrationFileCopyExample .java

    package com.javarticles.spring.integration.file; 
    
    import java.io.File; 
    import java.io.FileInputStream; 
    import java.io.IOException; 
    import java.util.Properties; 
    
    import org.springframework.context.support.ClassPathXmlApplicationContext; 
    
    public class SpringIntegrationFileCopyExample { 
    
        public static void main(String[] args) throws InterruptedException, IOException { 
         ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
           "fileCopyApplicationContext.xml"); 
    
        } 
    
    } 
    
    相關問題