2016-07-05 28 views
0

當ip包含一個json類型時,
當前連接到factory.getConnectionIds()找到相應的IP。 然後設置標題在開發過程中發送邏輯。我如何發送單個消息到多個IP?

通過factory.getConnectionIds()找到當前連接的IP列表,我設置了一個頭。但發生了unable to find outbound socket錯誤。

原因是什麼?

一體化配置是...

@Bean 
public TcpReceivingChannelAdapter sslAdapter() { 
    TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter(); 
    adapter.setConnectionFactory(sslServerFactory()); 
    adapter.setOutputChannel(inputWithSSL()); 

    return adapter; 
} 

@Bean 
public TcpSendingMessageHandler sslHandler() { 
    TcpSendingMessageHandler handler = new TcpSendingMessageHandler(); 
    handler.setConnectionFactory(sslServerFactory()); 

    return handler; 
} 

@Bean 
public AbstractConnectionFactory sslServerFactory() { 
    int port = Integer.parseInt(inboundPort); 
    TcpNioServerConnectionFactory factory = new TcpNioServerConnectionFactory(port); 
    factory.setBacklog(BACKLOG); 
    factory.setTaskExecutor(taskSchedulerWithSSL()); 
    factory.setLookupHost(false); 

    factory.setSerializer(echoSerializer); 
    factory.setDeserializer(echoSerializer); 

    factory.setTcpNioConnectionSupport(tcpNioSSLConnectionSupport()); 

    // Nagle's algorithm disabled 
    factory.setSoTcpNoDelay(true); 

    return factory; 
} 

@Bean 
public IntegrationFlow flowForReceiveSslMessage() { 
    return IntegrationFlows 
      .from(sslAdapter) 
      .<byte[], Boolean>route(
        p -> (short) 0 == ByteBuffer.wrap(p, 0, BYTE_LENGTH_OF_SHORT).getShort(), 
        m -> m.channelMapping(TRUE, INPUT_WITH_SSL_JSON) 
          .channelMapping(FALSE, INPUT_WITH_SSL_ECHO)).get(); 
} 

@Bean 
public IntegrationFlow flowForExtractingSslJson() { 
    return IntegrationFlows 
      .from(inputWithSslJson()) 
      .handle(INBOUND_SERVICE, EXTRACT_PAYLOAD_AS_JSON) 
      .<Map<String, Object>, String>route(
        p -> (String) p.get(REQUEST), 
        m -> m.channelMapping(LOGIN, INPUT_WITH_SSL_LOGIN) 
          .channelMapping(LOGOUT, INPUT_WITH_SSL_LOGOUT) 
          .channelMapping(POLICY, INPUT_WITH_SSL_POLICY) 
          .channelMapping(PUSH_TARGET, INPUT_WITH_SSL_PUSH_TARGET).get(); 
} 

@Bean 
public IntegrationFlow flowForHandlingSslNotifyPolicyUpdate() { 
    return IntegrationFlows.from(inputWithSslPushTarget()).handle(POLICY_SERVICE, RESPONSE_POLICY_UPDATE) 
      .split(POLICY_SERVICE, SPLIT_MESSAGES) 
      .channel(outputWithSslJsonBytesToClient()).get(); 
} 

@Bean 
public IntegrationFlow flowForConvertingSslJsonToBytesAndSendClient() { 
    return IntegrationFlows.from(outputWithSslJsonBytesToClient()) 
      .transform(new ObjectToJsonTransformer()) 
      .handle(INBOUND_SERVICE, ATTACH_HEADER_BY_STRING).handle(sslHandler).get(); 
} 

@Bean 
public MessageChannel outputWithSsl() { 
    return MessageChannels.queue(POOL_SIZE).get(); 
} 

@Bean 
public MessageChannel inputWithSslJson() { 
    return MessageChannels.queue(POOL_SIZE).get(); 
} 

@Bean 
public MessageChannel inputWithSslPushTarget() { 
    return MessageChannels.queue(POOL_SIZE).get(); 
} 

@Bean 
public MessageChannel outputWithSslJsonBytesToClient() { 
    return MessageChannels.queue(POOL_SIZE).get(); 
} 

RESPONSE_POLICY_UPDATE和SPLIT_MESSAGES是...

@Override 
public Object responsePolicyUpdate(Object payload) throws Exception { 
    log.debug("notify policy update debug : {}", payload); 
    Map<String, Object> params = initParam(payload); 
    Map<String, Object> result = initResult(params); 
    result.put(RESPONSE, PUSH_TARGET); 
    result.put(RESULT, SUCCESS); 
    result.put(REASON, 0); 

    return result; 
} 

@Splitter 
@Override 
@SuppressWarnings("unchecked") 
public List<Message<String>> splitMessages(Object payload) throws Exception { 
    log.debug("split messages debug : {}", payload); 
    Map<String, Object> params = initParam(payload); 
    List<String> pushTargetList = (List<String>) params.get(PUSH_TARGET_LIST); // pushTargetList is ip list. 

    List<Message<String>> messageList = new ArrayList<Message<String>>(); 
    String[] conArray = new String[4]; 
    List<String> sslConnectionIds = sslServerFactory.getOpenConnectionIds(); 
    int sslPort = sslServerFactory.getPort(); 
    for (String con : sslConnectionIds) { 
     log.debug("## con ip : {}", con); 
     conArray = con.split(":"); 
     for (String pushTargetIP : pushTargetList) { 
      if (conArray[0].equals(pushTargetIP)) { 
       Message<String> message = MessageBuilder.withPayload(params.toString()) 
             .setHeader("ip_connectionId", con).build(); 
       messageList.add(message); 
       break; 
      } 
     } 
    } 

    return messageList; 
} 

調試日誌是...
第一行是當前連接列表。

2016-07-05 14:30:14.664 DEBUG 56092 --- [sk-scheduler-10] c.m.j.policy.service.PolicyServiceImpl : ## con ip : 192.168.3.57:62370:5443:cdeb011d-91f5-46c4-abc9-b68ba13624b3 

2016-07-05 14:30:14.672 DEBUG 56092 --- [ask-scheduler-1] o.s.i.ip.tcp.TcpSendingMessageHandler : plainHandler received message: GenericMessage [payload=byte[246], headers={sequenceNumber=1, json__TypeId__=class java.lang.String, sequenceSize=1, ip_connectionId=192.168.3.57:62370:5443:cdeb011d-91f5-46c4-abc9-b68ba13624b3, correlationId=fae71250-bf47-3f64-6ad3-1ce22ef69464, id=c6c097f0-9efb-f0a5-4240-924e06879b7f, contentType=application/json, timestamp=1467696614672}] 
2016-07-05 14:30:14.672 ERROR 56092 --- [ask-scheduler-1] o.s.i.ip.tcp.TcpSendingMessageHandler : Unable to find outbound socket for GenericMessage [payload=byte[246], headers={sequenceNumber=1, json__TypeId__=class java.lang.String, sequenceSize=1, ip_connectionId=192.168.3.57:62370:5443:cdeb011d-91f5-46c4-abc9-b68ba13624b3, correlationId=fae71250-bf47-3f64-6ad3-1ce22ef69464, id=c6c097f0-9efb-f0a5-4240-924e06879b7f, contentType=application/json, timestamp=1467696614672}] 
2016-07-05 14:30:14.673 DEBUG 56092 --- [ask-scheduler-1] o.s.i.channel.PublishSubscribeChannel : preSend on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: Unable to find outbound socket, headers={id=273f4477-52cf-645b-d157-e22dc7cc781a, timestamp=1467696614673}] 
2016-07-05 14:30:14.673 DEBUG 56092 --- [ask-scheduler-1] o.s.integration.handler.LoggingHandler : (inner bean)#6dc2279c received message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: Unable to find outbound socket, headers={id=273f4477-52cf-645b-d157-e22dc7cc781a, timestamp=1467696614673}] 
2016-07-05 14:30:14.675 ERROR 56092 --- [ask-scheduler-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: Unable to find outbound socket 
at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.handleMessageInternal(TcpSendingMessageHandler.java:113) 
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78) 
at 
... 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 
+0

您是否使用此程序發送或接收TCP數據包? – aksappy

+0

是的,當然是 接收對同一個ip的請求和響應是正常的。 – kurochi

+0

它應該可以正常工作 - 您需要在調試器中運行並檢查「TcpSendingMessageHandler」的「連接」字段的內容。 –

回答

0

感謝加里。
如您所說,handleMessageInternalTcpSendingMessageHandler類的方法發生錯誤。

獲取connectionId是...

2016-07-06 10:04:28.704 DEBUG 30144 --- [ask-scheduler-4] c.m.j.policy.service.PolicyServiceImpl : ## con ip : 192.168.3.57:53759:5443:bf93680b-13fe-401b-a1eb-5545917f404a 

connectionId不爲空。但是,connections.get (connectionId)的結果爲空。
這不應該被理解。

這是TcpSendingMessageHandler類...

/** 
* Writes the message payload to the underlying socket, using the specified 
* message format. 
* @see org.springframework.messaging.MessageHandler#handleMessage(org.springframework.messaging.Message) 
*/ 
@Override 
public void handleMessageInternal(final Message<?> message) throws 
     MessageHandlingException { 
    if (this.serverConnectionFactory != null) { 
     // We don't own the connection, we are asynchronously replying 
     Object connectionId = message.getHeaders().get(IpHeaders.CONNECTION_ID); 
     TcpConnection connection = null; 
     if (connectionId != null) { 
      connection = connections.get(connectionId); 
     } 
     if (connection != null) { 
      try { 
       connection.send(message); 
      } 
      catch (Exception e) { 
       logger.error("Error sending message", e); 
       connection.close(); 
       if (e instanceof MessageHandlingException) { 
        throw (MessageHandlingException) e; 
       } 
       else { 
        throw new MessageHandlingException(message, "Error sending message", e); 
       } 
      } 
     } 
     else { 
      logger.error("Unable to find outbound socket for " + message); 
      throw new MessageHandlingException(message, "Unable to find outbound socket"); 
     } 
     return; 
    } 
    else { 
     // we own the connection 
     try { 
      doWrite(message); 
     } 
     catch (MessageHandlingException e) { 
      // retry - socket may have closed 
      if (e.getCause() instanceof IOException) { 
       if (logger.isDebugEnabled()) { 
        logger.debug("Fail on first write attempt", e); 
       } 
       doWrite(message); 
      } 
      else { 
       throw e; 
      } 
     } 
    } 
} 
0

這是Message<String>列表...
通過factory.getOpenConnectionIds()方法獲得進入ip_connectionId值。
爲什麼不找到一個outboud套接字?

GenericMessage [payload={result=success, reason=0, response=pushTarget}, headers={ip_connectionId=192.168.3.57:58187:5443:37702eaf-0bbc-44a1-8763-65e841a2f480, id=a1b80cc4-3f56-1b80-9c59-57be98b1031e, timestamp=1467783978378}] 
GenericMessage [payload={result=success, reason=0, response=pushTarget}, headers={ip_connectionId=192.168.3.40:53161:5443:693c394c-d3dd-42a3-95ce-692a39a8b603, id=bb49ea99-5e3b-eccf-df3b-7ce03b4bbf73, timestamp=1467783978378}] 
相關問題