2
我正在實現無限流響應,就像使用gRPC體系結構的pub/sub模式一樣。python - 在gRPC無限流中檢測客戶端丟失
有一個端點可以打開一個響應流並保留它直到客戶端丟失。要做到這一點,我要存儲一個鍵值散列,其中的鍵是gRPC上下文,並且這些值是我用來輪詢發送消息的隊列。
我的端點代碼如下所示:
def StreamTrades(self, request, context):
self.feeds[context] = queue.Queue()
callback_queue = queue.Queue()
def remove_feed():
if self.feeds.get(context) is not None:
del self.feeds[context]
def stop_stream():
remove_feed()
def raise_stop_stream_exception():
raise StopStream('stopping stream')
callback_queue.put(raise_stop_stream_exception)
context.add_callback(stop_stream)
def output_generator():
while True:
try:
try:
callback = callback_queue.get(False)
callback()
except queue.Empty:
pass
if self.feeds.get(context) is not None:
trade = self.feeds[context].get()
if isinstance(trade, trades_pb2.Trade):
yield trade
else:
raise StopStream('stopping stream')
except IndexError:
pass
except StopStream:
return
return output_generator()
此代碼工作正常進行訂閱和發佈更改的客戶端。但是,與取消訂閱有關的問題。檢測客戶端丟失的好方法是什麼?使用Context.add_callback(callBack)似乎不起作用,因爲回調只在服務器完成並關閉流時調用。當客戶不在時,發電機不會產生任何狀態。我看到,在Java中,當在streamObserver中調用onNext並且沒有客戶端時會拋出Status.CANCELLED的StatusRuntimeException,它允許惰性取消訂閱對我來說已經足夠了。
有沒有什麼方法可以檢測客戶端在響應流中丟棄連接?
的確,我期望這是正確的行爲。由於它是一個已知和報告的錯誤,我將在java中重新實現我的系統的這部分,並等待python的新版本。 –