2

我遇到了GCP pubsub的問題,在幾秒鐘內發佈數千條消息時,一小部分數據丟失。Google Cloud Pubsub數據丟失

我登錄既message_id從發佈訂閱和session_id獨特到兩者上出版端以及接收端的每個消息,並且我看到的結果是,在接收端的一些消息具有相同session_id,但不同message_id。此外,一些消息丟失。

例如,在一次測試中,我發送了5,000條消息到pubsub,並且正好收到了5,000條消息,丟失了8條消息。日誌丟失的郵件是這樣的:

MISSING sessionId:sessionId: 731 (missing in log from pull request, but present in log from Flask API) 

messageId FOUND: messageId:108562396466545 

API: 200 **** sessionId: 731, messageId:108562396466545 ******(Log from Flask API) 

Pubsub: sessionId: 730, messageId:108562396466545(Log from pull request) 

而且重複的樣子:

======= Duplicates FOUND on sessionId: 730======= 

sessionId: 730, messageId:108562396466545 

sessionId: 730, messageId:108561339282318 

(both are logs from pull request) 

所有丟失的數據,並重復這個樣子。

從上面的例子可以看出,有些消息已經採取了其他消息的message_id,並且已經用兩個不同的message_id發送了兩次。

我想知道有沒有人會幫我弄清楚發生了什麼事?提前致謝。

代碼

我有一個API發送消息的PubSub,它看起來像這樣:

from flask import Flask, request, jsonify, render_template 
from flask_cors import CORS, cross_origin 
import simplejson as json 
from google.cloud import pubsub 
from functools import wraps 
import re 
import json 


app = Flask(__name__) 
ps = pubsub.Client() 

... 

@app.route('/publish', methods=['POST']) 
@cross_origin() 
@json_validator 
def publish_test_topic(): 
    pubsub_topic = 'test_topic' 
    data = request.data 

    topic = ps.topic(pubsub_topic) 

    event = json.loads(data) 

    messageId = topic.publish(data) 
    return '200 **** sessionId: ' + str(event["sessionId"]) + ", messageId:" + messageId + " ******" 

這是我以前從發佈 - 訂閱閱讀代碼:

從谷歌.cloud import pubsub import re import json

ps = pubsub.Client() 
topic = ps.topic('test-xiu') 
sub = topic.subscription('TEST-xiu') 

max_messages = 1 
stop = False 

messages = [] 

class Message(object): 
    """docstring for Message.""" 
    def __init__(self, sessionId, messageId): 
     super(Message, self).__init__() 
     self.seesionId = sessionId 
     self.messageId = messageId 


def pull_all(): 
    while stop == False: 

     m = sub.pull(max_messages = max_messages, return_immediately = False) 

     for data in m: 
      ack_id = data[0] 
      message = data[1] 
      messageId = message.message_id 
      data = message.data 
      event = json.loads(data) 
      sessionId = str(event["sessionId"]) 
      messages.append(Message(sessionId = sessionId, messageId = messageId)) 

      print '200 **** sessionId: ' + sessionId + ", messageId:" + messageId + " ******" 

      sub.acknowledge(ack_ids = [ack_id]) 

pull_all() 

對於產生的session_id,從發送API請求&測井響應:

// generate trackable sessionId 
var sessionId = 0 

var increment_session_id = function() { 
    sessionId++; 
    return sessionId; 
} 

var generate_data = function() { 
    var data = {}; 
    // data.sessionId = faker.random.uuid(); 
    data.sessionId = increment_session_id(); 
    data.user = get_rand(userList); 
    data.device = get_rand(deviceList); 
    data.visitTime = new Date; 
    data.location = get_rand(locationList); 
    data.content = get_rand(contentList); 

    return data; 
} 

var sendData = function (url, payload) { 
    var request = $.ajax({ 
    url: url, 
    contentType: 'application/json', 
    method: 'POST', 
    data: JSON.stringify(payload), 
    error: function (xhr, status, errorThrown) { 
     console.log(xhr, status, errorThrown); 
     $('.result').prepend("<pre id='json'>" + JSON.stringify(xhr, null, 2) + "</pre>") 
     $('.result').prepend("<div>errorThrown: " + errorThrown + "</div>") 
     $('.result').prepend("<div>======FAIL=======</div><div>status: " + status + "</div>") 
    } 
    }).done(function (xhr) { 
    console.log(xhr); 
    $('.result').prepend("<div>======SUCCESS=======</div><pre id='json'>" + JSON.stringify(payload, null, 2) + "</pre>") 
    }) 
} 

$(submit_button).click(function() { 
    var request_num = get_request_num(); 
    var request_url = get_url(); 
    for (var i = 0; i < request_num; i++) { 
    var data = generate_data(); 
    var loadData = changeVerb(data, 'load'); 
    sendData(request_url, loadData); 
    } 
}) 

UPDATE

我做的API的變化,這個問題似乎消失。我所做的更改是不是使用一個pubsub.Client()所有要求,我初始化客戶端就進來了,每一個請求新的API看起來像:

from flask import Flask, request, jsonify, render_template 
from flask_cors import CORS, cross_origin 
import simplejson as json 
from google.cloud import pubsub 
from functools import wraps 
import re 
import json 


app = Flask(__name__) 

... 

@app.route('/publish', methods=['POST']) 
@cross_origin() 
@json_validator 
def publish_test_topic(): 

    ps = pubsub.Client() 


    pubsub_topic = 'test_topic' 
    data = request.data 

    topic = ps.topic(pubsub_topic) 

    event = json.loads(data) 

    messageId = topic.publish(data) 
    return '200 **** sessionId: ' + str(event["sessionId"]) + ", messageId:" + messageId + " ******" 
+0

根據你的代碼,你兩次發佈的每封郵件。這只是一個錯字,或者你是否確實多次發佈每封郵件? –

+0

良好的捕獲,這是一個錯字。每個請求只發送一次。現在已修復 –

+0

這是另一個錯字嗎? self.seesionId = sessionId。它應該是self.sessionId嗎? –

回答

0

曾與谷歌從一些傢伙,這似乎是與Python客戶端的問題:

在我們身邊的共識是,沒有在當前的Python客戶端線程安全問題。當我們說話時,客戶端庫幾乎從頭開始被重寫,所以我不想在當前版本中進行任何修復。我們預計新版本將在6月底之前上市。

在app.yaml中運行帶有thread_safe:false的當前代碼或更好,但只是在每個調用中實例化客戶端應該是解決方法 - 您找到的解決方案。

對於詳細的解決方案,請參閱更新在這個問題

0

谷歌Cloud發佈/訂閱消息ID都是唯一的。它不應該是可能的「一些消息[採取另一個消息的message_id」。消息ID 108562396466545似乎收到的事實意味着Pub/Sub確實將消息傳遞給訂閱者並且沒有丟失。

我建議你檢查你的session_id是如何生成的,以確保它們確實是唯一的,並且每個消息只有一個。通過正則表達式搜索在您的JSON中搜索sessionId看起來有點奇怪。將這個JSON解析爲一個實際的對象並以這種方式訪問​​字段會更好。

通常,Cloud Pub/Sub中的重複郵件總是可能的;該系統保證至少一次交付。如果複製發生在訂閱方(例如,不及時處理ack)或具有不同的消息ID(例如,如果消息的發佈在類似錯誤之後重試超過截止日期)。

+0

謝謝你的迴應。 'session_id'是每個消息使用遞增數字生成器唯一生成的,我檢查App Engine中的日誌並確認每個發送的'session_id'都是唯一的。我也使用JSON解析器運行實驗,並且我仍然觀察到相同的行爲。我知道pubsub中的重複消息,我根本不擔心它們。這是缺少的消息,以及重複正在使我感到擔憂的'message_id'丟失消息。 –

+0

假設self.seesionId = sessionId的拼寫錯誤不負責,我建議您查看變量的範圍。發佈端或訂閱端是否有messageId和sessionId全局變量?如果是這樣,可以通過併發調用pull_all或publish_test_topic來覆蓋它們。看起來消息是一個全局變量,所以任何併發的pull_all調用都可能導致爭用情況。 –

+0

既不是messageId也不是sessionId是全局的。 'messages'是全局的,但不能在任何地方使用。訂閱方不在多線程上,所以在那裏不應該有任何競爭條件。我可以看到的唯一可能的競爭條件是在API方面,因爲App Engine可能有多個實例來處理流量,所以有幾個發佈請求可能會同時發送到pubsub。 –

0

您不應該爲每個發佈操作創建一個新客戶端。我敢打賭,「解決問題」的原因是因爲它緩解了出版商客戶端存在的競爭。我也不相信你已經在發佈商端顯示的日誌行:

API:200 ****的sessionId:731,郵件ID:108562396466545 ******

相當於publish_test_topic()成功發佈sessionId 731。在什麼條件下打印日誌行?迄今爲止提出的代碼沒有顯示這一點。

+0

'publish_test_topic()'方法返回'return'200 **** sessionId:'+ str(event [「sessionId」])+「,messageId:」+ messageId +「******」',返回到應用程序的前端,作爲每個每個請求對'/ publish'的響應,並且一旦接收到它就立即打印在控制檯中。 –

+0

添加了用於記錄該消息的代碼。 –