2016-09-02 114 views
0

我有一些基本的項目,有四個調用一些外部資源,在當前版本同步運行。我想實現的是將這些調用打包到HystrixObservableCommand中,然後異步調用它。Netflix Hystrix - HystrixObservableCommand異步運行

從我讀到的,在HystrixObservableCommand對象處調用.observe()之後,應該立即並且異步地調用封裝的邏輯。但是我做錯了什麼,因爲它同步工作。

在示例代碼中,輸出爲Void,因爲我對輸出不感興趣(現在)。這也是爲什麼我沒有將Observable分配給任何對象,只叫constructor.observe()

@Component 
public class LoggerProducer { 

    private static final Logger LOGGER = Logger.getLogger(LoggerProducer.class); 

    @Autowired 
    SimpMessagingTemplate template; 

    private void push(Iterable<Message> messages, String topic) throws Exception { 
     template.convertAndSend("/messages/"+topic, messages); 
    } 

    public void splitAndPush(Iterable<Message> messages) { 

     Map<MessageTypeEnum, List<Message>> groupByMessageType = StreamSupport.stream(messages.spliterator(), true) 
       .collect(Collectors.groupingBy(Message::getType)); 

     //should be async - it's not 
     new CommandPushToBrowser(groupByMessageType.get(MessageTypeEnum.INFO), 
       MessageTypeEnum.INFO.toString().toLowerCase()).observe(); 
     new CommandPushToBrowser(groupByMessageType.get(MessageTypeEnum.WARN), 
       MessageTypeEnum.WARN.toString().toLowerCase()).observe(); 
     new CommandPushToBrowser(groupByMessageType.get(MessageTypeEnum.ERROR), 
       MessageTypeEnum.ERROR.toString().toLowerCase()).observe(); 

    } 

    class CommandPushToBrowser extends HystrixObservableCommand<Void> { 

     private Iterable<Message> messages; 
     private String messageTypeName; 

     public CommandPushToBrowser(Iterable<Message> messages, String messageTypeName) { 
      super(HystrixCommandGroupKey.Factory.asKey("Messages")); 
      this.messageTypeName = messageTypeName; 
      this.messages = messages; 
     } 

     @Override 
     protected Observable<Void> construct() { 
      return Observable.create(new Observable.OnSubscribe<Void>() { 

       @Override 
       public void call(Subscriber<? super Void> observer) { 
        try { 
         for (int i = 0 ; i < 50 ; i ++) { 
          LOGGER.info("Count: " + i + " messageType " + messageTypeName); 
         } 
         if (null != messages) { 
          push(messages, messageTypeName); 
          LOGGER.info("Message type: " + messageTypeName + " pushed: " + messages); 
         } 
         if (!observer.isUnsubscribed()) { 
          observer.onCompleted(); 
         } 
        } catch (Exception e) { 
         e.printStackTrace(); 
         observer.onError(e); 
        } 
       } 
      }); 
     } 
    } 
} 

有一些純粹的「測試」的代碼片段那裏,我試圖找出問題,只是忽略邏輯,重點是使其與.observe()運行異步。我知道我可以通過標準HystrixCommand來實現,但這不是目標。

希望有人幫助:) 問候,

回答

2

答發現:

「觀測量不添加自動併發如果您正在使用可觀察到的建模 同步,阻止執行,那麼他們會的。 同步執行

您可以通過使用 subscribeOn(Schedulers.io())在線程上調度來輕鬆實現異步。用於包裝 阻塞調用與可觀察到的: https://speakerdeck.com/benjchristensen/applying-reactive-programming-with-rxjava-at-goto-chicago-2015?slide=33

但是,如果你是包裹阻塞調用,你應該堅持 使用HystrixCommand因爲這就是它的建成爲它 默認爲一個獨立運行的一切線。使用 HystrixCommand.observe()會爲您提供您正在尋找的併發異步 組合。

HystrixObservableCommand是用於異步周圍包裹, 無阻塞不需要額外的線程觀測量「

- 本克里斯滕森 - Netflix的邊緣工程

來源:https://groups.google.com/forum/#!topic/hystrixoss/g7ZLIudE8Rs