2013-07-31 37 views
0

我創建了一個spring-batch作業,用於從本地目錄讀取文件並使用Camel-spring-batch通過ftp將其上傳到遠程目錄。我正在使用塊做同樣的事情。 我的春天批處理作業的配置是這樣的: 如何在spring-batch中停止文件傳輸

<bean id="consumerTemplate" class="org.apache.camel.impl.DefaultConsumerTemplate" init-method="start" destroy-method="stop"> 
    <constructor-arg ref="camelContext"/> 
</bean> 

<bean id="producerTemplate" class="org.apache.camel.impl.DefaultProducerTemplate" scope="step" init-method="start" destroy-method="stop"> 
    <constructor-arg ref="camelContext"/> 
</bean> 

<bean id="localFileReader" class="com.camel.springbatch.reader.LocalFileReader" scope="step" destroy-method="stop"> 
    <constructor-arg value="file:#{jobParameters['dirPath']}"/> 
    <constructor-arg ref="consumerTemplate"/> 
</bean> 

<bean id="ftpFileWriter" class="com.camel.springbatch.writer.FtpFileWriter" scope="step"> 
    <constructor-arg ref="producerTemplate"/> 
    <constructor-arg value="ftp://#{jobParameters['host']}?username=#{jobParameters['user']}&amp;password=#{jobParameters['password']}"/> 
</bean> 

作業配置:

<batch:job id="ftpReadWrite"> 
    <batch:step id="readFromLocalWriteToFtp" next="readFromFtpWriteToLocal"> 
     <batch:tasklet> 
      <batch:chunk reader="localFileReader" writer="ftpFileWriter" commit-interval="5" /> 
     </batch:tasklet> 
    </batch:step> 

而我的 「Localfilereader」 和 「ftpFileWriter」 的模樣:

import org.apache.camel.ConsumerTemplate; 
import org.apache.camel.component.spring.batch.support.CamelItemReader; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

public class LocalFileReader extends CamelItemReader { 
private Logger log= LoggerFactory.getLogger(this.getClass()); 
ConsumerTemplate consumerTemplate; 
String endpointUri; 

public LocalFileReader(ConsumerTemplate consumerTemplate, String endpointUri) { 
    super(consumerTemplate, endpointUri); 
    this.consumerTemplate=consumerTemplate; 
    this.endpointUri=endpointUri; 
} 

@Override 
public Object read() throws Exception { 
    Object item = consumerTemplate.receiveBody(endpointUri); 
    return item; 
} 

}

「FTP文件作家」

import org.apache.camel.ProducerTemplate; 
import org.apache.camel.component.spring.batch.support.CamelItemWriter; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import java.util.List; 
public class FtpFileWriter extends CamelItemWriter { 
private Logger log= LoggerFactory.getLogger(this.getClass()); 
ProducerTemplate producerTemplate; 
String endpointUri; 
public FtpFileWriter(ProducerTemplate producerTemplate, String endpointUri) { 
    super(producerTemplate, endpointUri); 
    this.producerTemplate=producerTemplate; 
    this.endpointUri=endpointUri; 
} 

@Override 
public void write(List items) throws Exception { 
    System.out.println("************************Writing item to ftp "+items); 
    for (Object item : items) { 
     System.out.println("writing item [{}]..."+item); 
     producerTemplate.sendBody(endpointUri, item); 
     log.debug("wrote item"); 
    } 
} 
} 

它工作正常,如果我有5只在我的本地目錄中的文件。它從我的本地目錄中讀取全部5個文件,並將它發送給作者和編寫者,將它作爲我的commit-interval = 5發送給ftp服務器。如果我在本地目錄中有6個文件,那麼它會將5個文件的第一部分發送給寫入程序,並再次開始讀取剩餘的文件,這次只剩下一個文件。它讀取1個文件,並開始等待4個文件,永遠不會發送給作者。我用commit-interval = 1試了一下,現在它將所有6個文件發送到服務器,並再次開始等待下一個文件。在這裏,我需要在處理完所有文件後停止進程。

請幫我解決了這個問題...

回答

1

ConsumerTemplate的javadoc的receiveBody等待,直到有一個響應;您需要使用超時(以spring-batch方式檢查TimeoutPolicy)或以不同的方式將讀取器標記爲'耗盡'(讀取器返回null)以停止讀取器讀取。

+0

感謝您的答覆。我嘗試了以下策略:DefaultResultCompletionPolicy SimpleCompletionPolicy TimeoutTerminationPolicy 像chunk-completion-policy =「policy」。但它沒有奏效... – user9873999

+0

我無法找到可以確定所有文件已從目錄中讀取的方式,以便我可以返回「null」。 – user9873999

+0

我不明白你爲什麼定義自己的LocalFileReader而不是重用CamelItemReader,因爲你沒有收益(此外 ,在xml構造函數arg被交換!)。 嘗試使用MultiResourceItemReader或[FilesInDirectoryItemReader](https://code.google.com/p/springbatch-in-action/source/browse/trunk/sbia/ch09/src/main/java/com/manning/sbia/ch09 /restart/FilesInDirectoryItemReader.java?spec=svn201&r=201)或使用'consumerTemplate.receiveBody(endpointUri);'但超時的版本。 我忍不住了,對不起。 –

1

您可以使用receiveBodyNoWait而不是receiveBody。 然後,您必須檢查消費者端點內是否還有文件。我將它編碼爲一個將big-xml文件變成小塊的tasklet。

的任務蕾:

public class MyCamelTasklet extends ServiceSupport implements Tasklet, InitializingBean{ 

private static final Logger LOG = LoggerFactory.getLogger(MyCamelTasklet.class); 

private final CamelContext camelContext; 
private final ConsumerTemplate consumerTemplate; 
private final File workingDir; 
private final Route xmlSplitRoute; 


public MyCamelTasklet(ConsumerTemplate consumerTemplate) { 
    super();   
    this.consumerTemplate = consumerTemplate; 
    this.camelContext = consumerTemplate.getCamelContext();  
    this.xmlSplitRoute = this.camelContext.getRoutes().get(0); 
    this.workingDir = new File(xmlSplitRoute.getRouteContext().getFrom().getUri().replace("file:", "")); 
} 

@Override 
public RepeatStatus execute(StepContribution arg0, ChunkContext arg1) 
     throws Exception { 

    LOG.debug("reading new item..."); 

    Endpoint endpointXmlSplitRoute = xmlSplitRoute.getEndpoint(); 

    while(getNbFilesToConsume(this.workingDir) > 0) {  

    consumerTemplate.receiveBodyNoWait(endpointXmlSplitRoute); 

    }  

    return RepeatStatus.FINISHED; 
} 

private int getNbFilesToConsume(File workingDir){ 
    return FileUtils.listFiles(workingDir, new String[]{"xml"}, false).size(); 
} 

@Override 
protected void doStart() throws Exception { 
    ServiceHelper.startService(consumerTemplate); 

} 


@Override 
protected void doStop() throws Exception { 
    ServiceHelper.stopService(consumerTemplate); 

} 


@Override 
public void afterPropertiesSet() throws Exception { 
    ObjectHelper.notNull(camelContext, "CamelContext", this);   
    camelContext.addService(this);  
} 
} 

對於前面的tasklet單元測試:

public class SplitTaskletTest { 

@Test public void execute() throws Exception { 
    CamelContext camelContext = new DefaultCamelContext();  

    camelContext.addRoutes(new RouteBuilder() { 
     public void configure() { 

      Namespaces ns = new Namespaces("nsl", "http://www.toto.fr/orders"); 
      from("file:data/inbox").id("inbox-road"). 
      split(). 
      xtokenize("//nsl:order", 'w', ns, 1000). 
      streaming(). 
      to("file:data/outbox?fileName=${file:name.noext}-${exchangeId}.${file:ext}"); 

     } 

    }); 

    camelContext.start(); 

    ConsumerTemplate consumer =new DefaultConsumerTemplate(camelContext); 

    consumer.start(); 

    MyCamelTasklet tasklet = new MyCamelTasklet(consumer); 

    long debutTraitement = System.currentTimeMillis(); 

    tasklet.execute(null, null); 

    long finTraitement = System.currentTimeMillis(); 

    long total = finTraitement-debutTraitement; 

    File outputDir = new File("data/outbox"); 
    outputDir.mkdir(); 

    int nbGeneratesFiles = FileUtils.listFiles(outputDir, new String[]{"xml"}, false).size(); 

    System.out.println("Traitement total en secondes : "+total/1000); 

    Assert.assertTrue(nbGeneratesFiles>0); 

} 
}