我有一個基於Spring及其SimpleBroker實現(不使用外部代理)的Websocket-stomp服務器。Spring Websocket STOMP:發送RECEIPT框架
我想啓用STOMP RECEIPT消息。
我怎樣才能配置我的代碼自動發送這些?
我有一個基於Spring及其SimpleBroker實現(不使用外部代理)的Websocket-stomp服務器。Spring Websocket STOMP:發送RECEIPT框架
我想啓用STOMP RECEIPT消息。
我怎樣才能配置我的代碼自動發送這些?
在Spring集成測試STOMP協議,我們有這樣的代碼:
//SimpleBrokerMessageHandler doesn't support RECEIPT frame, hence we emulate it this way
@Bean
public ApplicationListener<SessionSubscribeEvent> webSocketEventListener(
final AbstractSubscribableChannel clientOutboundChannel) {
return event -> {
Message<byte[]> message = event.getMessage();
StompHeaderAccessor stompHeaderAccessor = StompHeaderAccessor.wrap(message);
if (stompHeaderAccessor.getReceipt() != null) {
stompHeaderAccessor.setHeader("stompCommand", StompCommand.RECEIPT);
stompHeaderAccessor.setReceiptId(stompHeaderAccessor.getReceipt());
clientOutboundChannel.send(
MessageBuilder.createMessage(new byte[0], stompHeaderAccessor.getMessageHeaders()));
}
};
}
由阿爾喬姆比蘭提供的答案只對訂閱幀。這是另一個捕獲任何傳入幀的收據標題。只有帶有@EnableWebSocketMessageBroker註解的類需要擴展,其他類(如帶有@Controller註釋的類)保持不變。
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.Map;
import java.util.List;
import java.util.HashMap;
import java.util.ArrayList;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.AbstractWebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.messaging.simp.config.SimpleBrokerRegistration;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.ChannelInterceptorAdapter;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.MessageChannel;
import org.springframework.beans.factory.annotation.Autowired;
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
private static final Logger LOGGER = Logger.getLogger(WebSocketConfig.class.getName());
private MessageChannel outChannel;
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(new InboundMessageInterceptor());
}
@Override
public void configureClientOutboundChannel(ChannelRegistration registration) {
registration.interceptors(new OutboundMessageInterceptor());
}
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
// prefixes are application-dependent
config.enableSimpleBroker("/topic");
config.setApplicationDestinationPrefixes("/app");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/note");
}
class InboundMessageInterceptor extends ChannelInterceptorAdapter {
@SuppressWarnings("unchecked")
public Message preSend(Message message, MessageChannel channel) {
LOGGER.log(Level.SEVERE, "preSend: "+message);
GenericMessage genericMessage = (GenericMessage)message;
MessageHeaders headers = genericMessage.getHeaders();
String simpSessionId = (String)headers.get("simpSessionId");
if((SimpMessageType.MESSAGE.equals(headers.get("simpMessageType")) &&
StompCommand.SEND.equals(headers.get("stompCommand"))) ||
(SimpMessageType.SUBSCRIBE.equals(headers.get("simpMessageType")) &&
StompCommand.SUBSCRIBE.equals(headers.get("stompCommand"))) &&
(simpSessionId != null)) {
Map<String,List<String>> nativeHeaders = (Map<String,List<String>>)headers.get("nativeHeaders");
if(nativeHeaders != null) {
List<String> receiptList = nativeHeaders.get("receipt");
if(receiptList != null) {
String rid = (String)receiptList.get(0);
LOGGER.log(Level.SEVERE, "receipt requested: "+rid);
sendReceipt(rid, simpSessionId);
}
}
}
return message;
}
private void sendReceipt(String rid, String simpSessionId) {
if(outChannel != null) {
HashMap<String,Object> rcptHeaders = new HashMap<String,Object>();
rcptHeaders.put("simpMessageType", SimpMessageType.OTHER);
rcptHeaders.put("stompCommand", StompCommand.RECEIPT);
rcptHeaders.put("simpSessionId", simpSessionId);
HashMap<String,List<String>> nativeHeaders = new HashMap<String,List<String>>();
ArrayList<String> receiptElements = new ArrayList<String>();
receiptElements.add(rid);
nativeHeaders.put("receipt-id", receiptElements);
rcptHeaders.put("nativeHeaders",nativeHeaders);
GenericMessage<byte[]> rcptMsg = new GenericMessage<byte[]>(new byte[0],new MessageHeaders(rcptHeaders));
outChannel.send(rcptMsg);
} else
LOGGER.log(Level.SEVERE, "receipt NOT sent");
}
}
class OutboundMessageInterceptor extends ChannelInterceptorAdapter {
public void postSend(Message message,
MessageChannel channel,
boolean sent) {
LOGGER.log(Level.SEVERE, "postSend: "+message);
outChannel = channel;
}
}
}
事實上,這是要複雜得多應該和獲得outChannel是不是很優雅。但它的工作。 :-)