當ip包含一個json類型時,
當前連接到factory.getConnectionIds()
找到相應的IP。 然後設置標題在開發過程中發送邏輯。我如何發送單個消息到多個IP?
通過factory.getConnectionIds()
找到當前連接的IP列表,我設置了一個頭。但發生了unable to find outbound socket
錯誤。
原因是什麼?
一體化配置是...
@Bean
public TcpReceivingChannelAdapter sslAdapter() {
TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
adapter.setConnectionFactory(sslServerFactory());
adapter.setOutputChannel(inputWithSSL());
return adapter;
}
@Bean
public TcpSendingMessageHandler sslHandler() {
TcpSendingMessageHandler handler = new TcpSendingMessageHandler();
handler.setConnectionFactory(sslServerFactory());
return handler;
}
@Bean
public AbstractConnectionFactory sslServerFactory() {
int port = Integer.parseInt(inboundPort);
TcpNioServerConnectionFactory factory = new TcpNioServerConnectionFactory(port);
factory.setBacklog(BACKLOG);
factory.setTaskExecutor(taskSchedulerWithSSL());
factory.setLookupHost(false);
factory.setSerializer(echoSerializer);
factory.setDeserializer(echoSerializer);
factory.setTcpNioConnectionSupport(tcpNioSSLConnectionSupport());
// Nagle's algorithm disabled
factory.setSoTcpNoDelay(true);
return factory;
}
@Bean
public IntegrationFlow flowForReceiveSslMessage() {
return IntegrationFlows
.from(sslAdapter)
.<byte[], Boolean>route(
p -> (short) 0 == ByteBuffer.wrap(p, 0, BYTE_LENGTH_OF_SHORT).getShort(),
m -> m.channelMapping(TRUE, INPUT_WITH_SSL_JSON)
.channelMapping(FALSE, INPUT_WITH_SSL_ECHO)).get();
}
@Bean
public IntegrationFlow flowForExtractingSslJson() {
return IntegrationFlows
.from(inputWithSslJson())
.handle(INBOUND_SERVICE, EXTRACT_PAYLOAD_AS_JSON)
.<Map<String, Object>, String>route(
p -> (String) p.get(REQUEST),
m -> m.channelMapping(LOGIN, INPUT_WITH_SSL_LOGIN)
.channelMapping(LOGOUT, INPUT_WITH_SSL_LOGOUT)
.channelMapping(POLICY, INPUT_WITH_SSL_POLICY)
.channelMapping(PUSH_TARGET, INPUT_WITH_SSL_PUSH_TARGET).get();
}
@Bean
public IntegrationFlow flowForHandlingSslNotifyPolicyUpdate() {
return IntegrationFlows.from(inputWithSslPushTarget()).handle(POLICY_SERVICE, RESPONSE_POLICY_UPDATE)
.split(POLICY_SERVICE, SPLIT_MESSAGES)
.channel(outputWithSslJsonBytesToClient()).get();
}
@Bean
public IntegrationFlow flowForConvertingSslJsonToBytesAndSendClient() {
return IntegrationFlows.from(outputWithSslJsonBytesToClient())
.transform(new ObjectToJsonTransformer())
.handle(INBOUND_SERVICE, ATTACH_HEADER_BY_STRING).handle(sslHandler).get();
}
@Bean
public MessageChannel outputWithSsl() {
return MessageChannels.queue(POOL_SIZE).get();
}
@Bean
public MessageChannel inputWithSslJson() {
return MessageChannels.queue(POOL_SIZE).get();
}
@Bean
public MessageChannel inputWithSslPushTarget() {
return MessageChannels.queue(POOL_SIZE).get();
}
@Bean
public MessageChannel outputWithSslJsonBytesToClient() {
return MessageChannels.queue(POOL_SIZE).get();
}
RESPONSE_POLICY_UPDATE和SPLIT_MESSAGES是...
@Override
public Object responsePolicyUpdate(Object payload) throws Exception {
log.debug("notify policy update debug : {}", payload);
Map<String, Object> params = initParam(payload);
Map<String, Object> result = initResult(params);
result.put(RESPONSE, PUSH_TARGET);
result.put(RESULT, SUCCESS);
result.put(REASON, 0);
return result;
}
@Splitter
@Override
@SuppressWarnings("unchecked")
public List<Message<String>> splitMessages(Object payload) throws Exception {
log.debug("split messages debug : {}", payload);
Map<String, Object> params = initParam(payload);
List<String> pushTargetList = (List<String>) params.get(PUSH_TARGET_LIST); // pushTargetList is ip list.
List<Message<String>> messageList = new ArrayList<Message<String>>();
String[] conArray = new String[4];
List<String> sslConnectionIds = sslServerFactory.getOpenConnectionIds();
int sslPort = sslServerFactory.getPort();
for (String con : sslConnectionIds) {
log.debug("## con ip : {}", con);
conArray = con.split(":");
for (String pushTargetIP : pushTargetList) {
if (conArray[0].equals(pushTargetIP)) {
Message<String> message = MessageBuilder.withPayload(params.toString())
.setHeader("ip_connectionId", con).build();
messageList.add(message);
break;
}
}
}
return messageList;
}
調試日誌是...
第一行是當前連接列表。
2016-07-05 14:30:14.664 DEBUG 56092 --- [sk-scheduler-10] c.m.j.policy.service.PolicyServiceImpl : ## con ip : 192.168.3.57:62370:5443:cdeb011d-91f5-46c4-abc9-b68ba13624b3
2016-07-05 14:30:14.672 DEBUG 56092 --- [ask-scheduler-1] o.s.i.ip.tcp.TcpSendingMessageHandler : plainHandler received message: GenericMessage [payload=byte[246], headers={sequenceNumber=1, json__TypeId__=class java.lang.String, sequenceSize=1, ip_connectionId=192.168.3.57:62370:5443:cdeb011d-91f5-46c4-abc9-b68ba13624b3, correlationId=fae71250-bf47-3f64-6ad3-1ce22ef69464, id=c6c097f0-9efb-f0a5-4240-924e06879b7f, contentType=application/json, timestamp=1467696614672}]
2016-07-05 14:30:14.672 ERROR 56092 --- [ask-scheduler-1] o.s.i.ip.tcp.TcpSendingMessageHandler : Unable to find outbound socket for GenericMessage [payload=byte[246], headers={sequenceNumber=1, json__TypeId__=class java.lang.String, sequenceSize=1, ip_connectionId=192.168.3.57:62370:5443:cdeb011d-91f5-46c4-abc9-b68ba13624b3, correlationId=fae71250-bf47-3f64-6ad3-1ce22ef69464, id=c6c097f0-9efb-f0a5-4240-924e06879b7f, contentType=application/json, timestamp=1467696614672}]
2016-07-05 14:30:14.673 DEBUG 56092 --- [ask-scheduler-1] o.s.i.channel.PublishSubscribeChannel : preSend on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: Unable to find outbound socket, headers={id=273f4477-52cf-645b-d157-e22dc7cc781a, timestamp=1467696614673}]
2016-07-05 14:30:14.673 DEBUG 56092 --- [ask-scheduler-1] o.s.integration.handler.LoggingHandler : (inner bean)#6dc2279c received message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: Unable to find outbound socket, headers={id=273f4477-52cf-645b-d157-e22dc7cc781a, timestamp=1467696614673}]
2016-07-05 14:30:14.675 ERROR 56092 --- [ask-scheduler-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: Unable to find outbound socket
at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.handleMessageInternal(TcpSendingMessageHandler.java:113)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
at
...
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
您是否使用此程序發送或接收TCP數據包? – aksappy
是的,當然是 接收對同一個ip的請求和響應是正常的。 – kurochi
它應該可以正常工作 - 您需要在調試器中運行並檢查「TcpSendingMessageHandler」的「連接」字段的內容。 –