我遇到了GCP pubsub的問題,在幾秒鐘內發佈數千條消息時,一小部分數據丟失。Google Cloud Pubsub數據丟失
我登錄既message_id
從發佈訂閱和session_id
獨特到兩者上出版端以及接收端的每個消息,並且我看到的結果是,在接收端的一些消息具有相同session_id
,但不同message_id
。此外,一些消息丟失。
例如,在一次測試中,我發送了5,000條消息到pubsub,並且正好收到了5,000條消息,丟失了8條消息。日誌丟失的郵件是這樣的:
MISSING sessionId:sessionId: 731 (missing in log from pull request, but present in log from Flask API)
messageId FOUND: messageId:108562396466545
API: 200 **** sessionId: 731, messageId:108562396466545 ******(Log from Flask API)
Pubsub: sessionId: 730, messageId:108562396466545(Log from pull request)
而且重複的樣子:
======= Duplicates FOUND on sessionId: 730=======
sessionId: 730, messageId:108562396466545
sessionId: 730, messageId:108561339282318
(both are logs from pull request)
所有丟失的數據,並重復這個樣子。
從上面的例子可以看出,有些消息已經採取了其他消息的message_id
,並且已經用兩個不同的message_id
發送了兩次。
我想知道有沒有人會幫我弄清楚發生了什麼事?提前致謝。
代碼
我有一個API發送消息的PubSub,它看起來像這樣:
from flask import Flask, request, jsonify, render_template
from flask_cors import CORS, cross_origin
import simplejson as json
from google.cloud import pubsub
from functools import wraps
import re
import json
app = Flask(__name__)
ps = pubsub.Client()
...
@app.route('/publish', methods=['POST'])
@cross_origin()
@json_validator
def publish_test_topic():
pubsub_topic = 'test_topic'
data = request.data
topic = ps.topic(pubsub_topic)
event = json.loads(data)
messageId = topic.publish(data)
return '200 **** sessionId: ' + str(event["sessionId"]) + ", messageId:" + messageId + " ******"
這是我以前從發佈 - 訂閱閱讀代碼:
從谷歌.cloud import pubsub import re import json
ps = pubsub.Client()
topic = ps.topic('test-xiu')
sub = topic.subscription('TEST-xiu')
max_messages = 1
stop = False
messages = []
class Message(object):
"""docstring for Message."""
def __init__(self, sessionId, messageId):
super(Message, self).__init__()
self.seesionId = sessionId
self.messageId = messageId
def pull_all():
while stop == False:
m = sub.pull(max_messages = max_messages, return_immediately = False)
for data in m:
ack_id = data[0]
message = data[1]
messageId = message.message_id
data = message.data
event = json.loads(data)
sessionId = str(event["sessionId"])
messages.append(Message(sessionId = sessionId, messageId = messageId))
print '200 **** sessionId: ' + sessionId + ", messageId:" + messageId + " ******"
sub.acknowledge(ack_ids = [ack_id])
pull_all()
對於產生的session_id,從發送API請求&測井響應:
// generate trackable sessionId
var sessionId = 0
var increment_session_id = function() {
sessionId++;
return sessionId;
}
var generate_data = function() {
var data = {};
// data.sessionId = faker.random.uuid();
data.sessionId = increment_session_id();
data.user = get_rand(userList);
data.device = get_rand(deviceList);
data.visitTime = new Date;
data.location = get_rand(locationList);
data.content = get_rand(contentList);
return data;
}
var sendData = function (url, payload) {
var request = $.ajax({
url: url,
contentType: 'application/json',
method: 'POST',
data: JSON.stringify(payload),
error: function (xhr, status, errorThrown) {
console.log(xhr, status, errorThrown);
$('.result').prepend("<pre id='json'>" + JSON.stringify(xhr, null, 2) + "</pre>")
$('.result').prepend("<div>errorThrown: " + errorThrown + "</div>")
$('.result').prepend("<div>======FAIL=======</div><div>status: " + status + "</div>")
}
}).done(function (xhr) {
console.log(xhr);
$('.result').prepend("<div>======SUCCESS=======</div><pre id='json'>" + JSON.stringify(payload, null, 2) + "</pre>")
})
}
$(submit_button).click(function() {
var request_num = get_request_num();
var request_url = get_url();
for (var i = 0; i < request_num; i++) {
var data = generate_data();
var loadData = changeVerb(data, 'load');
sendData(request_url, loadData);
}
})
UPDATE
我做的API的變化,這個問題似乎消失。我所做的更改是不是使用一個pubsub.Client()
所有要求,我初始化客戶端就進來了,每一個請求新的API看起來像:
from flask import Flask, request, jsonify, render_template
from flask_cors import CORS, cross_origin
import simplejson as json
from google.cloud import pubsub
from functools import wraps
import re
import json
app = Flask(__name__)
...
@app.route('/publish', methods=['POST'])
@cross_origin()
@json_validator
def publish_test_topic():
ps = pubsub.Client()
pubsub_topic = 'test_topic'
data = request.data
topic = ps.topic(pubsub_topic)
event = json.loads(data)
messageId = topic.publish(data)
return '200 **** sessionId: ' + str(event["sessionId"]) + ", messageId:" + messageId + " ******"
根據你的代碼,你兩次發佈的每封郵件。這只是一個錯字,或者你是否確實多次發佈每封郵件? –
良好的捕獲,這是一個錯字。每個請求只發送一次。現在已修復 –
這是另一個錯字嗎? self.seesionId = sessionId。它應該是self.sessionId嗎? –