2017-03-15 33 views
0

我在批處理作業代碼中添加了帶有篩選器的Zookeeper元數據存儲,以同步羣集環境中的數據並在多個JVM上進行水平伸縮。但是,當我的作業(Jpa適配器)運行時,我的所有消息都會進入丟棄通道並在日誌中獲得低於錯誤的錯誤。Spring集成Idempotent接收器和Zookeeper元數據存儲

Hibernate: 
    select 
     componenti0_.id as id1_4_, 
     componenti0_.component_info_file_id as componen2_4_, 
     componenti0_.component_serial_no as componen3_4_, 
     componenti0_.date_received as date_rec4_4_, 
     componenti0_.date_recorded as date_rec5_4_, 
     componenti0_.engine_hours as engine_h6_4_, 
     componenti0_.is_processed as is_proce7_4_, 
     componenti0_.product_serial_no as product_8_4_ 
    from 
     cs_component_info componenti0_ 
2017-03-15 21:28:55.290 INFO [componentdatafiles,,,]  39712 --- [54.26.101:2181)] org.apache.zookeeper.ClientCnxn   : Session establishment complete on server **************, sessionid = 0x35a82a934c40127, negotiated timeout = 40000 
2017-03-15 21:28:55.294 INFO [componentdatafiles,,,]  39712 --- [54.26.100:2181)] org.apache.zookeeper.ClientCnxn   : Session establishment complete on server *************, sessionid = 0x25a82aab90a0122, negotiated timeout = 40000 
2017-03-15 21:28:55.313 INFO [componentdatafiles,,,]  39712 --- [p-1-EventThread] o.a.c.f.state.ConnectionStateManager  : State change: CONNECTED 
2017-03-15 21:28:55.313 INFO [componentdatafiles,,,]  39712 --- [p-1-EventThread] o.a.c.f.state.ConnectionStateManager  : State change: CONNECTED 
2017-03-15 21:28:55.332 INFO [componentdatafiles,,,]  39712 --- [54.26.101:2181)] org.apache.zookeeper.ClientCnxn   : Session establishment complete on server ****************, sessionid = 0x35a82a934c40128, negotiated timeout = 40000 
2017-03-15 21:28:55.333 INFO [componentdatafiles,,,]  39712 --- [p-1-EventThread] o.a.c.f.state.ConnectionStateManager  : State change: CONNECTED 
2017-03-15 21:28:55.507 INFO [componentdatafiles,053728794a9688b3,374707c3669809db,false]  39712 --- [ask-scheduler-1] o.s.integration.handler.LoggingHandler : [[email protected], [email protected], [email protected], [email protected], [email protected], [email protected]6d0ae0b5, [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected]] 
2017-03-15 21:28:55.511 INFO [componentdatafiles,053728794a9688b3,83a7350bbbdb4b79,false]  39712 --- [ask-scheduler-1] o.s.integration.handler.LoggingHandler : [email protected] 
2017-03-15 21:28:55.519 INFO [componentdatafiles,9f9c46137110bacc,a02fd11fe137a83c,false]  39712 --- [ask-scheduler-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: Expression evaluation failed: @componentInfoMetadataStore.get(payload.id) == null; nested exception is java.lang.NullPointerException, failedMessage=GenericMessage [[email protected]d6, headers={sequenceNumber=1, sequenceSize=17, X-Message-Sent=true, messageSent=true, spanTraceId=053728794a9688b3, spanId=91816fafd9b447d0, X-B3-SpanId=91816fafd9b447d0, currentSpan=[Trace: 053728794a9688b3, Span: 91816fafd9b447d0, Parent: 053728794a9688b3, exportable:false], X-B3-Sampled=0, X-B3-TraceId=053728794a9688b3, correlationId=3bac32df-115c-4ff1-8270-2bfca59a4e83, id=4ad7414c-7b63-cfa8-fb03-ce2e20bf16b8, X-Current-Span=[Trace: 053728794a9688b3, Span: 91816fafd9b447d0, Parent: 053728794a9688b3, exportable:false], spanSampled=0}] 
    at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:143) 
    at org.springframework.integration.handler.ExpressionEvaluatingMessageProcessor.processMessage(ExpressionEvaluatingMessageProcessor.java:72) 
    at org.springframework.integration.filter.AbstractMessageProcessingSelector.accept(AbstractMessageProcessingSelector.java:62) 
    at org.springframework.integration.filter.MessageFilter.doHandleRequestMessage(MessageFilter.java:161) 
    at org.springframework.integration.handler.AbstractReplyProducingPostProcessingMessageHandler.handleRequestMessage(AbstractReplyProducingPostProcessingMessageHandler.java:46) 
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109) 
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) 
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) 
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) 
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) 
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423) 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373) 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) 
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) 
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:292) 
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:212) 
    at org.springframework.integration.splitter.AbstractMessageSplitter.produceOutput(AbstractMessageSplitter.java:159) 
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:129) 
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) 
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) 
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) 
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) 
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) 
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423) 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373) 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) 
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) 
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:210) 
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:272) 
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:58) 
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:190) 
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:186) 
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:353) 
    at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:55) 
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) 
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51) 
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:344) 
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) 
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) 
    at java.util.concurrent.FutureTask.run(Unknown Source) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
    at java.lang.Thread.run(Unknown Source) 
Caused by: java.lang.NullPointerException 
    at org.springframework.integration.zookeeper.metadata.ZookeeperMetadataStore.get(ZookeeperMetadataStore.java:205) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) 
    at java.lang.reflect.Method.invoke(Unknown Source) 
    at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:113) 
    at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:129) 
    at org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:49) 
    at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:347) 
    at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:88) 
    at org.springframework.expression.spel.ast.OpEQ.getValueInternal(OpEQ.java:42) 
    at org.springframework.expression.spel.ast.OpEQ.getValueInternal(OpEQ.java:32) 
    at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:131) 
    at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:330) 
    at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:169) 
    at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:128) 
    ... 49 more 

我想知道設置Metadatastore或使用表達式時是否缺少一些配置細節。 以下是我的配置細節。

<int-jpa:inbound-channel-adapter 
     channel="inboundChannelAdapterOne" entity-manager-factory="entityManagerFactory" 
     auto-startup="true" jpa-query="select cmpntinfo from ComponentInfo cmpntinfo" 
     expect-single-result="false" delete-after-poll="false"> 
     <int:poller fixed-rate="${componentInfoPollarInterval}"> 
      <!-- <int:transactional propagation="REQUIRES_NEW" transaction-manager="transactionManager"/> --> 
     </int:poller> 
    </int-jpa:inbound-channel-adapter> 

    <splitter id="splitter" input-channel="inboundChannelAdapterOne" 
     output-channel="splitteroutputChannel" /> 


    <int:publish-subscribe-channel id="idempotentServiceChannel"/> 

     <integration:channel id="discardChannel" /> 

    <int:filter input-channel="splitteroutputChannel" 
      output-channel="idempotentServiceChannel" 
      discard-channel="discardChannel" 
      expression="@componentInfoMetadataStore.get(payload.id) == null"/> 


    <int:outbound-channel-adapter channel="idempotentServiceChannel" 
           expression="@componentInfoMetadataStore.put(payload.id, 'payload.componentSerialNo')"/> 

    <int:service-activator id="componentInfoPollarActivator" 
     input-channel="idempotentServiceChannel" ref="componentInfoPollarConsumer" 
     method="componentInfoListen" /> 

     <int:service-activator id="discardChannelActivator" 
     input-channel="discardChannel" ref="componentInfoPollarConsumer" 
     method="discard" /> 

    <beans:bean id="componentInfoMetadataStore" 
     class="org.springframework.integration.zookeeper.metadata.ZookeeperMetadataStore"> 
     <beans:constructor-arg ref="componentInfoZookeeperClient" /> 
     <beans:property name="root" value="/componentInfoMetaDataStore" /> 
     <beans:property name="phase" value="-2147483648" /> 
    </beans:bean> 
     <beans:bean id="componentInfoZookeeperClient" 
     class="org.springframework.integration.zookeeper.config.CuratorFrameworkFactoryBean"> 
     <beans:constructor-arg value="${zookeeper.server.uri}" /> 
    </beans:bean> 

回答

0

該錯誤意味着元數據存儲尚未啓動。

它需要在適配器之前啓動。

嘗試將其phase屬性設置爲一個很大的負數 - 如Integer.MIN_VALUE(因此它開始提前)。

+0

將相位屬性設置爲Integer.MIN_VALUE後,錯誤日誌記錄不會到來,但所有消息仍會丟棄通道。是否它的鍵值不會從元數據存儲清除? questiion編輯配置 –

+0

你是什麼意思的「鍵值不清除」? –

+0

元數據存儲中的值未清除componentInfoMetadataStore.put(payload.id,payload.componentSerialNo) –