2017-05-29 33 views
2

我正在實現無限流響應,就像使用gRPC體系結構的pub/sub模式一樣。python - 在gRPC無限流中檢測客戶端丟失

有一個端點可以打開一個響應流並保留它直到客戶端丟失。要做到這一點,我要存儲一個鍵值散列,其中的鍵是gRPC上下文,並且這些值是我用來輪詢發送消息的隊列。

我的端點代碼如下所示:

def StreamTrades(self, request, context): 
    self.feeds[context] = queue.Queue() 

    callback_queue = queue.Queue() 

    def remove_feed(): 
     if self.feeds.get(context) is not None: 
      del self.feeds[context] 

    def stop_stream(): 
     remove_feed() 

     def raise_stop_stream_exception(): 
      raise StopStream('stopping stream') 

     callback_queue.put(raise_stop_stream_exception) 

    context.add_callback(stop_stream) 

    def output_generator(): 
     while True: 
      try: 
       try: 
        callback = callback_queue.get(False) 
        callback() 
       except queue.Empty: 
        pass 
       if self.feeds.get(context) is not None: 
        trade = self.feeds[context].get() 
        if isinstance(trade, trades_pb2.Trade): 
         yield trade 
       else: 
        raise StopStream('stopping stream') 
      except IndexError: 
       pass 
      except StopStream: 
       return 

    return output_generator() 

此代碼工作正常進行訂閱和發佈更改的客戶端。但是,與取消訂閱有關的問題。檢測客戶端丟失的好方法是什麼?使用Context.add_callback(callBack)似乎不起作用,因爲回調只在服務器完成並關閉流時調用。當客戶不在時,發電機不會產生任何狀態。我看到,在Java中,當在streamObserver中調用onNext並且沒有客戶端時會拋出Status.CANCELLED的StatusRuntimeException,它允許惰性取消訂閱對我來說已經足夠了。

有沒有什麼方法可以檢測客戶端在響應流中丟棄連接?

回答

3

當客戶端斷開連接時,應該調用您使用ServicerContext.add_callback註冊的回調;它沒有被調用表明你患有this bug。 「」不是「服務器完成關閉流時只調用回調」的情況。

+0

的確,我期望這是正確的行爲。由於它是一個已知和報告的錯誤,我將在java中重新實現我的系統的這部分,並等待python的新版本。 –