2017-09-23 57 views
2

與測試谷歌酒吧工作/次的客戶端(v0.28.3)PubSub的Ack'd消息不斷重新提供每10s(Python的公測客戶端)

有沒有人看到這樣一個場景,相同的消息是不斷每10秒重新發放一次,即使在acking之後呢?

這超出了Pub/Sub的至少一次性質。它偶爾發生,但是當它發生時,我們會連續幾個小時看到相同的信息。

嫌疑人這是因爲我們處理來自用戶的後臺線程中的傳入消息;但尚未能始終如一地重現它。由於某種原因,這不是猶太教嗎?

如果有錯誤,很樂意提交,但假設我們做錯了。有沒有人處理過類似的問題?


隨着調試日誌,我們看到:

D 13:51:46.000 Received response: received_messages { ... message_id: "155264162517414" ... } 
D 13:51:46.000 New message received from Pub/Sub: %r 
I 13:51:46.000 Processing Message: 155264162517414 
I 13:51:48.000 Acking Message: 155264162517414 
D 13:51:48.000 Sending request: ack_ids: "LDR..." 
D 13:51:50.000 Snoozing lease management for 4.009431 seconds. 
D 13:51:50.000 Renewing lease for 0 ack IDs. 
D 13:51:50.000 The current p99 value is 10 seconds. 
... 
D 13:51:59.000 Received response: received_messages { ... message_id: "155264162517414" ... } 
D 13:51:59.000 New message received from Pub/Sub: %r 
I 13:51:59.000 Processing Message: 155264162517414 

這裏是代碼的玩具版本,展示了我們如何線程,這有時會觸發本地運行的問題:

import Queue 
import logging 
import threading 
import random 
import time 
from google.cloud import pubsub 


SUBSCRIPTION_PATH = ... 


class Worker(threading.Thread): 
    """Background thread to consume incoming messages.""" 
    def __init__(self, name): 
     threading.Thread.__init__(self, name=name) 
     self.queue = Queue.Queue() 

    def run(self): 
     while True: 
      message = self.queue.get() 
      self.process(message) 
      print '<< Acking :', message.message_id 
      message.ack() 
      self.queue.task_done() 

    def process(self, message): 
     """Fake some work by sleeping for 0-15s. """ 
     s = random.randint(0, 15) 
     print '>> Worker sleeping for ', s, message.message_id 
     for i in range(s): 
      time.sleep(1) 
      print i 


class Subscriber(threading.Thread): 
    """Handles the subscription to pubsub.""" 
    def __init__(self): 
     threading.Thread.__init__(self, name='Subscriber') 
     self.subscriber = pubsub.SubscriberClient() 
     self.worker = Worker('FakeWorker') 
     self.worker.daemon = True 

    def run(self): 
     self.worker.start() 

     flow_control = pubsub.types.FlowControl(max_messages=10) 
     policy = self.subscriber.subscribe(SUBSCRIPTION_PATH, 
      flow_control=flow_control, 
      callback=self._consume) 
     print 'Sub started, thread', threading.current_thread() 

    def _consume(self, message): 
     self.worker.queue.put(message) 


if __name__ == '__main__': 
    subscriber = Subscriber() 
    subscriber.start() 
    while 1: 
     pass 

謝謝!

回答

0

除Pub/Sub的至少一次性質外,Pub/Sub中的acks是盡力而爲的。這意味着有兩種潛在的方法可以使ack「搞砸」。

  1. 該消息可以被pub/sub成功acked,並重新發送一次(可能是由於競爭條件)。
  2. 消息可能無法成功確認。

在第二種情況獲得的情況下,客戶端庫不會給你任何錯誤(因爲客戶端庫本身沒有給出),並且你將開始看到消息的節奏(和如果你的工藝時間很短,這將會是10秒鐘)。

解決此問題的方法是在收到郵件時再次簡單回覆郵件。我假設(從玩具代碼中不清楚,所以我猜測),你只是無視重複的消息,但如果你重複了,你應該停止獲得它。

如果你重新acking的消息,那麼請open an issue針對客戶端庫。

+0

感謝盧克的迴應。我相信我們一直在重複這些重複的信息。 Unf,盡最大努力確保重新傳輸的意義在於,我們必須保持集羣範圍內處理消息的緩存,這並不是很理想。現在我們切換到Google的prod基於HTTP的庫;並提交了支持票。 – Greg

相關問題