2014-07-10 54 views
1

下面是我的流量,我想我的銷售團隊批量信息存儲到一個隊列兔MQ在RabbitMQ的存儲從騾子消息

<flow name="foreachsimilar_pmFlow1" doc:name="foreachsimilar_pmFlow1"> 
     <http:inbound-endpoint exchange-pattern="request-response" host="localhost" port="8081" doc:name="HTTP"/> 
     <sfdc:create-job config-ref="Salesforce1" type="HRISASI__c" operation="insert" doc:name="Salesforce"/> 
     <set-variable variableName="batchID" value="1" doc:name="Variable"/> 
     <set-property propertyName="jobInfo" value="#[payload]" doc:name="Property"/> 
     <set-variable variableName="jobId" value="#[payload.id]" doc:name="Variable"/> 
     <jdbc-ee:outbound-endpoint exchange-pattern="request-response" queryKey="id" queryTimeout="-1" connector-ref="Database1" doc:name="Database"/> 
     <logger message="#[payload]" level="INFO" doc:name="Logger"/> 
     <scripting:transformer doc:name="Groovy"> 
      <scripting:script engine="Groovy"><![CDATA[payload.collect { it.EmpId }.collate(3).collect { [min: it[0], max: it[-1]] } 
         ]]></scripting:script> 
     </scripting:transformer> 
     <foreach doc:name="For Each"> 
      <set-variable variableName="bulkPayload" value="#[groovy: return[];]" doc:name="bulkPayload EmptyArray"/> 
      <set-variable variableName="IDs" value="#[groovy:return[];]" doc:name="Id EmptyArray"/> 
      <set-variable variableName="jdbdinsbatch" value="#[groovy: return[];]" doc:name="Variable"/> 
      <jdbc-ee:outbound-endpoint exchange-pattern="request-response" queryTimeout="-1" connector-ref="Database1" doc:name="Database" queryKey="all"/> 
      <foreach doc:name="For Each"> 
      <set-variable variableName="empId" value="#[payload['EmpId']]" doc:name="Variable"/> 
      <logger message="#[payload]" level="INFO" doc:name="Logger"/> 
      <data-mapper:transform config-ref="map_to_hrisasi__c" doc:name="Map To HRISASI__c"/> 
      <scripting:transformer doc:name="Groovy"> 
       <scripting:script engine="Groovy"><![CDATA[payloadMap = payload[0]; 
IDs = flowVars['IDs']; 
IDs.add([ EmpId: flowVars['EmpId'] ]); 
flowVars['IDs'] = IDs; 
jdbdinsbatch= flowVars['jdbdinsbatch']; 
jdbdinsbatch.add([ EmpId: flowVars['EmpId'], batchID: flowVars['batchID'] ]); 
flowVars['jdbdinsbatch'] = jdbdinsbatch; 
return [ payloadMap ];]]></scripting:script> 
      </scripting:transformer> 
      <set-variable variableName="bulkPayload" value="#[groovy: bulkPayload = flowVars['bulkPayload']; bulkPayload.add(payload[0]); return bulkPayload;]" doc:name="bulkPayload"/> 
     </foreach> 
     <set-payload value="#[flowVars['bulkPayload']]" doc:name="Set Payload"/> 
     <sfdc:create-batch config-ref="Salesforce1" doc:name="Salesforce"> 
       <sfdc:job-info ref="#[message.outboundProperties['jobInfo']]"/> 
       <sfdc:objects ref="#[payload]"/> 
      </sfdc:create-batch> 
      <scripting:transformer doc:name="Groovy"> 
       <scripting:script engine="Groovy"><![CDATA[return [ batch: payload, IDs: flowVars['IDs'], batchid: flowVars['batchID'] ]]]></scripting:script> 
      </scripting:transformer> 
     <amqp:outbound-endpoint exchangeName="Salesforce-Batch" queueName="batchInfo" exchangeDurable="true" queueDurable="true" responseTimeout="10000" doc:name="AMQP"/> 

     </foreach> 
     <flow-ref name="foreachsimilar_pmFlow2" doc:name="Flow Reference"/> 
    </flow> 

以下異常被拋出

Message    : Failed to route event via endpoint: DefaultOutboundEndpoint{endpointUri=amqp://Salesforce-Batch/amqp-queue.batchInfo, connector=AmqpConnector 
{ 
    name=AMQP_Connector 
    lifecycle=start 
    this=33982399 
    numberOfConcurrentTransactedReceivers=4 
    createMultipleTransactedReceivers=true 
    connected=true 
    supportedProtocols=[amqp] 
    serviceOverrides=<none> 
} 
, name='endpoint.amqp.Salesforce.Batch.amqp.queue.batchInfo', mep=ONE_WAY, properties={queueDurable=true, exchangeDurable=true}, transactionConfig=Transaction{factory=null, action=INDIFFERENT, timeout=0}, deleteUnacceptedMessages=false, initialState=started, responseTimeout=10000, endpointEncoding=UTF-8, disableTransportTransformer=false}. Message payload is of type: AmqpMessage 
Code     : MULE_ERROR--2 
-------------------------------------------------------------------------------- 
Exception stack is: 
1. invalid value in table (java.lang.IllegalArgumentException) 
    com.rabbitmq.client.impl.Frame:306 (null) 
2. Failed to route event via endpoint: DefaultOutboundEndpoint{endpointUri=amqp://Salesforce-Batch/amqp-queue.batchInfo, connector=AmqpConnector 
{ 
    name=AMQP_Connector 
    lifecycle=start 
    this=33982399 
    numberOfConcurrentTransactedReceivers=4 
    createMultipleTransactedReceivers=true 
    connected=true 
    supportedProtocols=[amqp] 
    serviceOverrides=<none> 
} 
, name='endpoint.amqp.Salesforce.Batch.amqp.queue.batchInfo', mep=ONE_WAY, properties={queueDurable=true, exchangeDurable=true}, transactionConfig=Transaction{factory=null, action=INDIFFERENT, timeout=0}, deleteUnacceptedMessages=false, initialState=started, responseTimeout=10000, endpointEncoding=UTF-8, disableTransportTransformer=false}. Message payload is of type: AmqpMessage (org.mule.api.transport.DispatchException) 
    org.mule.transport.AbstractMessageDispatcher:109 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/transport/DispatchException.html) 
-------------------------------------------------------------------------------- 
Root Exception stack trace: 
java.lang.IllegalArgumentException: invalid value in table 
    at com.rabbitmq.client.impl.Frame.fieldValueSize(Frame.java:306) 
    at com.rabbitmq.client.impl.Frame.tableSize(Frame.java:246) 
    at com.rabbitmq.client.impl.ValueWriter.writeTable(ValueWriter.java:120) 
    + 3 more (set debug level logging or '-Dmule.verbose.exceptions=true' for everything) 

請問有人可以告訴爲什麼上述例外拋出,以及應該怎麼辦才能解決它在過去兩天與此打擊

在此先感謝您。下面

是啓用詳細的異常記錄

******************************************************************************** 
Root Exception stack trace: 
java.lang.IllegalArgumentException: invalid value in table 
    at com.rabbitmq.client.impl.Frame.fieldValueSize(Frame.java:306) 
    at com.rabbitmq.client.impl.Frame.tableSize(Frame.java:246) 
    at com.rabbitmq.client.impl.ValueWriter.writeTable(ValueWriter.java:120) 
    at com.rabbitmq.client.impl.ContentHeaderPropertyWriter.writeTable(ContentHeaderPropertyWriter.java:98) 
    at com.rabbitmq.client.AMQP$BasicProperties.writePropertiesTo(AMQP.java:1782) 
    at com.rabbitmq.client.impl.AMQContentHeader.writeTo(AMQContentHeader.java:51) 
    at com.rabbitmq.client.impl.AMQContentHeader.toFrame(AMQContentHeader.java:78) 
    at com.rabbitmq.client.impl.AMQCommand.transmit(AMQCommand.java:106) 
    at com.rabbitmq.client.impl.AMQChannel.quiescingTransmit(AMQChannel.java:316) 
    at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:292) 
    at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:636) 
    at org.mule.transport.amqp.AmqpMessageDispatcher$OutboundAction$1.run(AmqpMessageDispatcher.java:55) 
    at org.mule.transport.amqp.AmqpMessageDispatcher.doOutboundAction(AmqpMessageDispatcher.java:172) 
    at org.mule.transport.amqp.AmqpMessageDispatcher.doDispatch(AmqpMessageDispatcher.java:127) 
    at org.mule.transport.AbstractMessageDispatcher.process(AbstractMessageDispatcher.java:99) 
    at org.mule.transport.AbstractConnector$DispatcherMessageProcessor.process(AbstractConnector.java:2627) 
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27) 
    at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:61) 
    at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:47) 
    at org.mule.processor.AsyncInterceptingMessageProcessor.process(AsyncInterceptingMessageProcessor.java:101) 
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27) 
    at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:61) 
    at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:47) 
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27) 
    at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:61) 
    at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:47) 
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27) 
    at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:61) 
    at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:47) 
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27) 
    at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:47) 
    at org.mule.endpoint.outbound.OutboundResponsePropertiesMessageProcessor.process(OutboundResponsePropertiesMessageProcessor.java:39) 
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27) 
    at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:61) 
    at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:47) 
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27) 
    at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:47) 
    at org.mule.processor.EndpointTransactionalInterceptingMessageProcessor$1.process(EndpointTransactionalInterceptingMessageProcessor.java:50) 
    at org.mule.processor.EndpointTransactionalInterceptingMessageProcessor$1.process(EndpointTransactionalInterceptingMessageProcessor.java:47) 
    at org.mule.execution.ExecuteCallbackInterceptor.execute(ExecuteCallbackInterceptor.java:20) 
    at org.mule.execution.BeginAndResolveTransactionInterceptor.execute(BeginAndResolveTransactionInterceptor.java:58) 
    at org.mule.execution.ResolvePreviousTransactionInterceptor.execute(ResolvePreviousTransactionInterceptor.java:48) 
    at org.mule.execution.SuspendXaTransactionInterceptor.execute(SuspendXaTransactionInterceptor.java:54) 
    at org.mule.execution.ValidateTransactionalStateInterceptor.execute(ValidateTransactionalStateInterceptor.java:44) 
    at org.mule.execution.IsolateCurrentTransactionInterceptor.execute(I... 
******************************************************************************** 

回答

0

你想發送給兔子有效載荷看起來相當複雜(SFDC BatchInfo對象)之後的堆棧跟蹤。對於隊列使用者來說,除了它是否可讀之外,你如何看待它?我認爲,出於某種原因,參與將此有效負載轉換爲適合AMQP傳輸的字節數組的消息轉換器失敗,並且最終嘗試將具有sfdc BatchInfo對象的groovy生成的Map作爲原樣發送到AMQP客戶機庫。

在發送給Rabbit之前,您應該將帶有內容的映射轉換爲字符串。 像這樣的東西(也稱BatchInfo對象的toString然後轉換地圖JSON):怎麼過,甚至改造後

編輯(刪除出站屬性)

<scripting:transformer doc:name="Groovy"> 
    <scripting:script engine="Groovy"><![CDATA[return [ batch: payload.toString(), IDs: flowVars['IDs'], batchid: flowVars['batchID'] ]]]></scripting:script> 
</scripting:transformer> 
<json:object-to-json-transformer doc:name="Object to JSON"/> 
<amqp:outbound-endpoint exchangeName="Salesforce-Batch" queueName="batchInfo" exchangeDurable="true" queueDurable="true" responseTimeout="10000" doc:name="AMQP"> 
    <message-properties-transformer scope="outbound"> 
     <delete-message-property key="*" /> 
    </message-properties-transformer> 
</amqp:outbound-endpoint> 
+0

謝謝你們的回覆?它發生錯誤字符串.... – madhu

+0

好了,然後啓用詳細的異常(-Dmule.verbose.exceptions = true)併發佈一個新的堆棧跟蹤。 –

+0

我正在使用mule工作室我應該怎麼做才能啓用詳細的例外 – madhu