與測試谷歌酒吧工作/次的客戶端(v0.28.3)PubSub的Ack'd消息不斷重新提供每10s(Python的公測客戶端)
有沒有人看到這樣一個場景,相同的消息是不斷每10秒重新發放一次,即使在acking之後呢?
這超出了Pub/Sub的至少一次性質。它偶爾發生,但是當它發生時,我們會連續幾個小時看到相同的信息。
我嫌疑人這是因爲我們處理來自用戶的後臺線程中的傳入消息;但尚未能始終如一地重現它。由於某種原因,這不是猶太教嗎?
如果有錯誤,很樂意提交,但假設我們做錯了。有沒有人處理過類似的問題?
隨着調試日誌,我們看到:
D 13:51:46.000 Received response: received_messages { ... message_id: "155264162517414" ... }
D 13:51:46.000 New message received from Pub/Sub: %r
I 13:51:46.000 Processing Message: 155264162517414
I 13:51:48.000 Acking Message: 155264162517414
D 13:51:48.000 Sending request: ack_ids: "LDR..."
D 13:51:50.000 Snoozing lease management for 4.009431 seconds.
D 13:51:50.000 Renewing lease for 0 ack IDs.
D 13:51:50.000 The current p99 value is 10 seconds.
...
D 13:51:59.000 Received response: received_messages { ... message_id: "155264162517414" ... }
D 13:51:59.000 New message received from Pub/Sub: %r
I 13:51:59.000 Processing Message: 155264162517414
這裏是代碼的玩具版本,展示了我們如何線程,這有時會觸發本地運行的問題:
import Queue
import logging
import threading
import random
import time
from google.cloud import pubsub
SUBSCRIPTION_PATH = ...
class Worker(threading.Thread):
"""Background thread to consume incoming messages."""
def __init__(self, name):
threading.Thread.__init__(self, name=name)
self.queue = Queue.Queue()
def run(self):
while True:
message = self.queue.get()
self.process(message)
print '<< Acking :', message.message_id
message.ack()
self.queue.task_done()
def process(self, message):
"""Fake some work by sleeping for 0-15s. """
s = random.randint(0, 15)
print '>> Worker sleeping for ', s, message.message_id
for i in range(s):
time.sleep(1)
print i
class Subscriber(threading.Thread):
"""Handles the subscription to pubsub."""
def __init__(self):
threading.Thread.__init__(self, name='Subscriber')
self.subscriber = pubsub.SubscriberClient()
self.worker = Worker('FakeWorker')
self.worker.daemon = True
def run(self):
self.worker.start()
flow_control = pubsub.types.FlowControl(max_messages=10)
policy = self.subscriber.subscribe(SUBSCRIPTION_PATH,
flow_control=flow_control,
callback=self._consume)
print 'Sub started, thread', threading.current_thread()
def _consume(self, message):
self.worker.queue.put(message)
if __name__ == '__main__':
subscriber = Subscriber()
subscriber.start()
while 1:
pass
謝謝!
感謝盧克的迴應。我相信我們一直在重複這些重複的信息。 Unf,盡最大努力確保重新傳輸的意義在於,我們必須保持集羣範圍內處理消息的緩存,這並不是很理想。現在我們切換到Google的prod基於HTTP的庫;並提交了支持票。 – Greg