我目前在Spring 4.1.2中使用Spring Integration 4.1.0。 我有一個要求,能夠逐行讀取文件並將每行讀取作爲消息。基本上我想允許我們的消息源之一「重放」,但消息不是保存在單個文件中,而是保存在單個文件中。我對此用例沒有交易要求。 我的要求是,JVM運行上與此類似,除了張貼在駐留在同一臺服務器上的一個文件:spring integration - read a remote file line by line春季集成入站通道適配器逐行讀取大文件
當我看到它,我有以下選擇:
1.使用int-file:inbound-channel-adapter
讀取文件,然後「分割」該文件,以使1條消息現在成爲多條消息。 樣品配置文件:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xmlns:int-file="http://www.springframework.org/schema/integration/file" xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
<int-file:inbound-channel-adapter id="filereader" directory="/tmp" filename-pattern="myfile.txt" channel="channel1"/>
<int-file:file-to-string-transformer input-channel="channel1" output-channel="channel2"/>
<int:channel id="channel1"/>
<int:splitter input-channel="channel2" output-channel="nullChannel"/>
<int:channel id="channel2"/>
</beans>
的問題是,該文件是非常大的,並使用上述技術,當整個文件被首先讀入存儲器,然後分裂和JVM用完堆空間。真正需要的步驟是:讀取一行並將行轉換爲消息,發送消息,從內存中刪除消息,重複。
int-file:tail-inbound-channel-adapter
使用與end="false"
(其基本上表示從文件的開始閱讀)。根據需要爲每個文件啓動和停止此適配器(每次啓動前更改文件名)。 示例配置文件:<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xmlns:int-file="http://www.springframework.org/schema/integration/file" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd"> <int-file:tail-inbound-channel-adapter id="apache" channel="exchangeSpringQueueChannel" task-executor="exchangeFileReplayTaskExecutor" file="C:\p2-test.txt" delay="1" end="false" reopen="true" file-delay="10000" /> <int:channel id="exchangeSpringQueueChannel" /> <task:executor id="exchangeFileReplayTaskExecutor" pool-size="1" /> </beans>
讓Spring集成調入Spring Batch的,並使用一個
ItemReader
處理文件。當然允許在整個過程中進行更細粒度的控制,但是需要大量的工作來設置工作存儲庫的內容等等(我不關心工作歷史,所以我要麼告訴工作不要記錄狀態和/或使用內存中的MapJobRepository
)。
4.通過擴展MessageProducerSupport
創建我自己的FileLineByLineInboundChannelAdapter
。 大部分代碼可以從ApacheCommonsFileTailingMessageProducer
(也請參閱http://forum.spring.io/forum/spring-projects/integration/119897-custom-upd-inbound-channel-adapter)借入。下面是一個示例,但需要一些工作才能將讀數放入它自己的Thread
中,以便在逐行讀取時尊重stop()
命令。
package com.xxx.exchgateway.common.util.springintegration;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import org.apache.commons.io.IOUtils;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.file.FileHeaders;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
/**
* A lot of the logic for this class came from {@link #ApacheCommonsFileTailingMessageProducer}.
* See {@link http://forum.spring.io/forum/spring-projects/integration/119897-custom-upd-inbound-channel-adapter}
*/
public class FileLineByLineInboundChannelAdapter extends MessageProducerSupport implements MessageSource<String> {
private volatile File file;
/**
* The name of the file you wish to tail.
* @param file The absolute path of the file.
*/
public void setFile(File file) {
Assert.notNull("'file' cannot be null");
this.file = file;
}
protected File getFile() {
if (this.file == null) {
throw new IllegalStateException("No 'file' has been provided");
}
return this.file;
}
@Override
public String getComponentType() {
return "file:line-by-line-inbound-channel-adapter";
}
private void readFile() {
FileInputStream fstream;
try {
fstream = new FileInputStream(getFile());
BufferedReader br = new BufferedReader(new InputStreamReader(fstream));
String strLine;
// Read File Line By Line, make sure we honor if someone manually sets the isRunning=false (via clicking the stop() method in JMX)
while ((strLine = br.readLine()) != null && isRunning()) {
send(strLine);
}
//Close the input stream
IOUtils.closeQuietly(br);
IOUtils.closeQuietly(fstream);
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
protected void doStart() {
super.doStart();
// TODO this needs to be moved into it's own thread since isRunning() will return "false" until this method has completed
// and we want to honor the stop() command while we read line-by-line
readFile();
}
protected void send(String line) {
Message<?> message = this.getMessageBuilderFactory().withPayload(line).setHeader(FileHeaders.FILENAME, this.file.getAbsolutePath()).build();
super.sendMessage(message);
}
@Override
public Message<String> receive() {
// TODO Auto-generated method stub
return null;
}
}
這在我看來,不,我的用例是典型的事物領域之外的人可能會喜歡這樣做,我很驚訝,我無法找到一個解決的辦法失開箱。然而,我搜索了很多,然後看了很多例子,不幸的是還沒有找到符合我需求的東西。
我假設也許我錯過了框架已經提供的東西(儘管也許這屬於Spring Integraton和Spring Batch之間的模糊界線)。有人可以告訴我,如果我完全脫離了我的想法,或者有一個我錯過的簡單解決方案,或者提供了其他建議嗎?
感謝您的回覆Smollet。這真的很有幫助。 我試過你的解決方案(甚至是第二個更新版本),它有一個小故障,它讀取文件的第一行,然後繼續讀取同一行。 因此,我編輯了你原來的文章,對next()和readNextLine()稍作修改。 我假設(也許不正確),一旦文件的所有行都被讀取,大多數人會想停止閱讀文件。 如果這確實是人們的需求,那麼我所做的更新還調整了hasNext()方法以在最後一行被讀取後關閉文件。 – 2014-11-25 23:53:27
有趣的一點!最近幾個小時我一直在調試類似的問題(在循環中讀取相同的文件)。 確實需要關閉閱讀器,但是您的更改尚未解決當前的問題。我懷疑該文件:入站通道適配器不斷髮送帶有相同文件的消息(如果不是在入站通道適配器上設置prevent-duplicates =「true」)。在處理完文件後,刪除文件也會很好。 – 2014-11-26 01:16:15
我冒昧地更新了代碼片段,這些更改幫助我完成了Spring XD環境中的文件處理。 @TonyFalabella它應該與您的需求兼容(如果不需要,只需註釋file.delete())。 – 2014-11-26 02:03:36