2016-01-20 119 views
1

我有一個spring sd源模塊,它從s3中提取文件並逐行分割。我的spring配置如下。但是我有3個容器和1個管理服務器。現在,我看到正在處理的重複消息每個容器都有自己的副本。 我可以解決與源s3模塊部署計數爲1,但我的郵件處理變慢。?解決此問題的任何投入?處理多條消息

<int:poller fixed-delay="${fixedDelay}" default="true"> 
    <int:advice-chain> 
      <ref bean="pollAdvise"/> 

      </int:advice-chain> 
    </int:poller> 


    <bean id="pollAdvise" 

    </bean> 






    <bean id="credentials" class="org.springframework.integration.aws.core.BasicAWSCredentials"> 
     <property name="accessKey" value="#{encryptedDatum.decryptBase64Encoded('${accessKey}')}"/> 
     <property name="secretKey" value="${secretKey}"/> 
    </bean> 


    <bean id="clientConfiguration" class="com.amazonaws.ClientConfiguration"> 
     <property name="proxyHost" value="${proxyHost}"/> 
     <property name="proxyPort" value="${proxyPort}"/> 
    <property name="preemptiveBasicProxyAuth" value="false"/> 
    </bean> 

    <bean id="s3Operations" class="org.springframework.integration.aws.s3.core.CustomC1AmazonS3Operations"> 
     <constructor-arg index="0" ref="credentials"/> 
     <constructor-arg index="1" ref="clientConfiguration"/> 
     <property name="awsEndpoint" value="s3.amazonaws.com"/> 
     <property name="temporaryDirectory" value="${temporaryDirectory}"/> 
     <property name="awsSecurityKey" value="${awsSecurityKey}"/> 
    </bean> 

    <bean id="encryptedDatum" class="abc"/> 

    <!-- aws-endpoint="https://s3.amazonaws.com" --> 
    <int-aws:s3-inbound-channel-adapter aws-endpoint="s3.amazonaws.com" 
             bucket="${bucket}" 
             s3-operations="s3Operations" 
             credentials-ref="credentials" 
             file-name-wildcard="${fileNameWildcard}" 
             remote-directory="${remoteDirectory}" 
             channel="splitChannel" 
             local-directory="${localDirectory}" 
             accept-sub-folders="false" 
             delete-source-files="true" 
             archive-bucket="${archiveBucket}" 
             archive-directory="${archiveDirectory}"> 
    </int-aws:s3-inbound-channel-adapter> 

    <int-file:splitter input-channel="splitChannel" output-channel="output" markers="false" charset="UTF-8"> 

     <int-file:request-handler-advice-chain> 
      <bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice"> 
       <property name="onSuccessExpression" value="payload.delete()"/> 
      </bean> 
     </int-file:request-handler-advice-chain> 

</int-file:splitter> 

    <int:channel id="output"/> 

[更新] 我增加了冪等的建議由你與元數據store.But因爲我的XD在3容器集羣中運行與兔子會簡單metadatastore工作?我想我應該用紅色/蒙戈元source.If我使用mongo/redis metadatastore howcan我驅逐/刪除消息,因爲消息會隨着時間堆積起來?

<int:idempotent-receiver id="expressionInterceptor" endpoint="output" 
           metadata-store="store" 
          discard-channel="nullChannel" 
          throw-exception-on-rejection="false" 
           key-expression="payload"/> 

    <bean id="store" class="org.springframework.integration.metadata.SimpleMetadataStore"/> 

回答

1

我建議你看看Idempotent Receiver

由於您可以使用共享MetadataStore並且不接受重複的文件。

<idempotent-receiver>應該配置爲您的<int-file:splitter>。是的:用丟棄邏輯來避免重複的消息。

UPDATE

。但因爲我的XD在3容器集羣中運行與兔子會簡單metadatastore工作?

這並不重要,因爲你從S3 MessageSource開始流,所以你應該過濾已經存在的文件。因此您需要外部共享MetadataStore

。如果我使用mongo/redis metadatastore howcan我會驅逐/刪除郵件,因爲郵件會隨着時間的推移堆積起來嗎?

這是正確的。這是Idempotent接收器邏輯的副作用。不確定如果你使用數據庫對你來說有什麼問題...

你可以通過一些週期性的任務清理集合/鍵。也許一週一次...

+0

更新了我的實施建議您的問題。請讓我知道你的輸入.. – constantlearner

+0

查看我的答案更新。 –

+0

「這並不重要,因爲你從S3 MessageSource啓動流,所以你應該過濾已經存在的文件,因此你需要外部共享的MetadataStore。」在這種情況下,simplemetadata存儲將無法工作,因爲它的每個實例?可以簡單的元數據存儲可以跨實例共享 – constantlearner