2016-11-23 97 views
1

我是Spring集成的新手。我正在研究解決方案,但在使用入站文件適配器(FileReadingMessageSource)時,我遇到了特定問題。 我必須從不同目錄讀取文件並處理它們,並將這些文件保存在不同的目錄中。據我所知,目錄名稱在流程開始時是固定的。 有人可以幫助我更改不同請求的目錄名稱。春季集成文件閱讀

我嘗試了以下方法。首先,我不確定這是否是正確的方法,儘管它只適用於一個目錄。我認爲Poller正在等待更多的文件,並且再也沒有回來閱讀另一個目錄。

@SpringBootApplication 
@EnableIntegration 
@IntegrationComponentScan 
public class SiSampleFileProcessor { 

    @Autowired 
    MyFileProcessor myFileProcessor; 

    @Value("${si.outdir}") 
    String outDir; 

    @Autowired 
    Environment env; 

    public static void main(String[] args) throws IOException { 
     ConfigurableApplicationContext ctx = new SpringApplication(SiSampleFileProcessor.class).run(args); 
     FileProcessingService gateway = ctx.getBean(FileProcessingService.class); 
     boolean process = true; 
     while (process) { 
      System.out.println("Please enter the input Directory: "); 
      String inDir = new Scanner(System.in).nextLine(); 
      if (inDir.isEmpty() || inDir.equals("exit")) { 
       process=false; 
      } else { 
       System.out.println("Processing... " + inDir); 
       gateway.processFilesin(inDir); 
      } 
     } 
     ctx.close(); 
    } 

    @MessagingGateway(defaultRequestChannel="requestChannel") 
    public interface FileProcessingService { 
     String processFilesin(String inputDir); 
    } 

    @Bean(name = PollerMetadata.DEFAULT_POLLER) 
    public PollerMetadata poller() {          
    return Pollers.fixedDelay(1000).get(); 
    } 

    @Bean 
    public MessageChannel requestChannel() { 
     return new DirectChannel(); 
    } 

    @ServiceActivator(inputChannel = "requestChannel") 
    @Bean 
    GenericHandler<String> fileReader() { 
     return new GenericHandler<String>() { 
      @Override 
      public Object handle(String p, Map<String, Object> map) { 
       FileReadingMessageSource fileSource = new FileReadingMessageSource(); 
       fileSource.setDirectory(new File(p)); 
       Message<File> msg; 
       while((msg = fileSource.receive()) != null) { 
        fileInChannel().send(msg); 
       } 
       return null; // Not sure what to return! 
      } 
     }; 
    } 

    @Bean 
    public MessageChannel fileInChannel() { 
     return MessageChannels.queue("fileIn").get(); 
    } 

    @Bean 
    public IntegrationFlow fileProcessingFlow() { 
     return IntegrationFlows.from(fileInChannel()) 
       .handle(myFileProcessor) 
       .handle(Files.outboundAdapter(new File(outDir)).autoCreateDirectory(true).get()) 
       .get(); 
    }  
} 

編輯:根據Gary的響應更換了一些方法,如

@MessagingGateway(defaultRequestChannel="requestChannel") 
public interface FileProcessingService { 
    boolean processFilesin(String inputDir); 
} 

@ServiceActivator(inputChannel = "requestChannel") 
public boolean fileReader(String inDir) { 
    FileReadingMessageSource fileSource = new FileReadingMessageSource(); 
    fileSource.setDirectory(new File(inDir)); 
    fileSource.afterPropertiesSet(); 
    fileSource.start(); 
    Message<File> msg; 
    while ((msg = fileSource.receive()) != null) { 
     fileInChannel().send(msg); 
    } 
    fileSource.stop(); 
    System.out.println("Sent all files in directory: " + inDir); 
    return true; 
} 

現在按預期工作。

回答

0

FileReadingMessageSource在內部使用DirectoryScanner;它通常在注入屬性後由Spring設置。由於您在Spring之外管理對象,因此需要調用Spring bean初始化和生命週期方法afterPropertiesSet()start()stop()。當接收返回null時,調用stop()

> return null; // Not sure what to return! 

如果您不返回任何內容,您的調用線程將掛起在等待響應的網關中。您可以將網關更改爲void,或者由於您的網關期望有一個字符串,只需返回一些值即可。

但是,您的調用代碼無論如何不看結果。

> gateway.processFilesin(inDir); 

此外,刪除從@ServiceActivator@Bean;用這種風格,豆類必須是MessageHandler

+0

謝謝你,作爲工作預期。我已經用你的建議編輯了這篇文章。但需要澄清。對於類似的需求(即閱讀不同的目錄),如果源代碼是S3,我們該怎麼辦? – pkm

+0

@pkm mount s3作爲一個文件系統並監聽它的變化,或者你可以使用帶有觸發器的aws微服務流在s3上放置/發佈,並且你對它採取適當的行動 –

+0

[spring-integration-aws](https:// github.com/spring-projects/spring-integration-aws)項目提供了一個消息源,但它更復雜一點,它將遠程目錄與本地同步,然後對本地目錄使用「FileReadingMessageSource」。您應該使用'S3RemoteFileTemplate'代替。 –

0

您可以使用此代碼

FileProcessor.java

import org.springframework.messaging.Message; 
import org.springframework.stereotype.Component; 
@Component 
public class FileProcessor { 

    private static final String HEADER_FILE_NAME = "file_name"; 
    private static final String MSG = "%s received. Content: %s"; 

    public void process(Message<String> msg) { 
     String fileName = (String) msg.getHeaders().get(HEADER_FILE_NAME); 
     String content = msg.getPayload(); 
     //System.out.println(String.format(MSG, fileName, content)); 
     System.out.println(content); 

    } 
} 

LastModifiedFileFilter.java

package com.example.demo; 

import org.springframework.integration.file.filters.AbstractFileListFilter; 

import java.io.File; 
import java.util.HashMap; 
import java.util.Map; 

public class LastModifiedFileFilter extends AbstractFileListFilter<File> { 
    private final Map<String, Long> files = new HashMap<>(); 
    private final Object monitor = new Object(); 

    @Override 
    protected boolean accept(File file) { 
     synchronized (this.monitor) { 
      Long previousModifiedTime = files.put(file.getName(), file.lastModified()); 

      return previousModifiedTime == null || previousModifiedTime != file.lastModified(); 
     } 
    } 
} 

主類= DemoApplication.java

package com.example.demo; 

import org.springframework.boot.SpringApplication; 
import org.springframework.boot.autoconfigure.EnableAutoConfiguration; 
import org.springframework.boot.autoconfigure.SpringBootApplication; 
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; 
import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration; 
import java.io.File; 
import java.io.IOException; 
import java.nio.charset.Charset; 
import org.apache.commons.io.FileUtils; 
import org.springframework.context.annotation.Bean; 
import org.springframework.context.annotation.Configuration; 
import org.springframework.integration.annotation.Aggregator; 
import org.springframework.integration.annotation.InboundChannelAdapter; 
import org.springframework.integration.annotation.Poller; 
import org.springframework.integration.channel.DirectChannel; 
import org.springframework.integration.channel.QueueChannel; 
import org.springframework.integration.core.MessageSource; 
import org.springframework.integration.dsl.IntegrationFlow; 
import org.springframework.integration.dsl.IntegrationFlows; 
import org.springframework.integration.dsl.channel.MessageChannels; 
import org.springframework.integration.dsl.core.Pollers; 
import org.springframework.integration.file.FileReadingMessageSource; 
import org.springframework.integration.file.filters.CompositeFileListFilter; 
import org.springframework.integration.file.filters.SimplePatternFileListFilter; 
import org.springframework.integration.file.transformer.FileToStringTransformer; 
import org.springframework.integration.scheduling.PollerMetadata; 
import org.springframework.messaging.MessageChannel; 
import org.springframework.messaging.PollableChannel; 
import org.springframework.stereotype.Component; 


@SpringBootApplication 
@Configuration 
public class DemoApplication { 

    private static final String DIRECTORY = "E:/usmandata/logs/input/"; 

    public static void main(String[] args) throws IOException, InterruptedException { 
     SpringApplication.run(DemoApplication.class, args); 


    } 


    @Bean 
    public IntegrationFlow processFileFlow() { 
     return IntegrationFlows 
       .from("fileInputChannel") 
       .transform(fileToStringTransformer()) 
       .handle("fileProcessor", "process").get(); 
    } 

    @Bean 
    public MessageChannel fileInputChannel() { 
     return new DirectChannel(); 
    } 

    @Bean 
    @InboundChannelAdapter(value = "fileInputChannel", poller = @Poller(fixedDelay = "1000")) 
    public MessageSource<File> fileReadingMessageSource() { 
     CompositeFileListFilter<File> filters =new CompositeFileListFilter<>(); 
     filters.addFilter(new SimplePatternFileListFilter("*.log")); 
     filters.addFilter(new LastModifiedFileFilter()); 

     FileReadingMessageSource source = new FileReadingMessageSource(); 
     source.setAutoCreateDirectory(true); 
     source.setDirectory(new File(DIRECTORY)); 
     source.setFilter(filters); 

     return source; 
    } 

    @Bean 
    public FileToStringTransformer fileToStringTransformer() { 
     return new FileToStringTransformer(); 
    } 

    @Bean 
    public FileProcessor fileProcessor() { 
     return new FileProcessor(); 
    } 
}