2017-01-27 56 views
3

根據堆棧驅動器圖表,我們開始注意到某個主題/訂閱的「未確認消息」數量不時增加。確認的消息徘徊在Google Pubsub

症狀

我不知道我們有多少可以信任的Stackdriver圖表,但我已經檢查:

  • 拉操作數是多達發佈操作數
  • ACK操作當問題發生時計數低於拉操作計數

此外,我能夠看到pubsub實際上多次發送相同的消息,一個根據我們的日誌,這也證實'拉'是成功的,但'ack'可能不成功。

因此,我認爲我們可以假設我們的系統能夠迅速提取,但從GCP的角度來看並不能很好地得到答覆。

我檢查了沒有及時發送ACK的可能性,但我不認爲是這樣,因爲我顯示了下面的流程。

在有問題的訂閱中,郵件正在積累幾個小時。對我們來說,這是一個嚴重的問題。

實施細則

我們使用拉法出於某種原因,我們很不願意轉用push方法,除非有很好的理由。對於每一次訂閱,我們都有一個消息抽籤的例程,這個例程爲每條消息產生一名工作人員。更具體地說,

// in a dedicated message-pumping goroutine 
sub, _ := CreateSubscription(..., 0 /* ack-deadline */,) 
iter, _ := sub.Pull(...) 
for { 
    // omitted: wait if we have too many workers 
    msg, _ := iter.Next() 
    go func(msg Message) { 
    // omitted: handle the message and measure the latency; it turned out the latency is almost within 1 second 
    msg.Done(true) 
    }(msg) 
} 

對於負載平衡,訂閱也被同一集羣中的其他窗格拉動。因此,對於一個訂閱(如在Google Pubsub主題/訂閱中),我們有多個訂閱對象(如Go綁定的訂閱結構中),每個訂閱對象僅在一個窗格中使用。而且,每個訂閱對象都會創建一個迭代器。我相信這個設置沒有錯,但如果我錯了,請糾正我。

正如此代碼所示,我們確認ACK。 (我們的服務器不慌,所以周邊的msg.Done()沒有得到路徑)。

嘗試

奇怪的是有問題的訂購不是很忙。對於另一個訂閱,我們通常不會遇到任何問題,這些訂閱在同一個窗格中接收的消息多得多。所以,我開始懷疑拉動操作的max-prefetch選項是否會影響。它似乎解決了一段時間的問題,但問題再次出現。按照Google支持部門的建議,我也增加了豆莢數量,這有效地增加了工作人員數量。這並沒有太大的幫助。由於我們不會發布很多消息給有問題的(大約1條消息/秒),並且我們有很多(可能太多)工作者,所以我不認爲我們的服務器超載。

有人可以對此有所瞭解嗎?

+1

我在上週開始使用Node.js庫時遇到了同樣的問題。 ack不起作用,然後我必須等待消息被重新發送 – andresk

回答

2

在我的情況下,由於某種原因,Ack不會返回的症狀會定期發生,超時與gRPC的調用沒有設置,並且'acker'的groutine被阻塞。

screen shot

所以我從pubsub.NewClient傳遞GRPC方案解決了這個問題。

import (
    "cloud.google.com/go/pubsub" 
    "google.golang.org/api/option" 
    "google.golang.org/grpc" 
) 

// ... 

scChan := make(chan grpc.ServiceConfig) 
go func() { 
    sc := grpc.ServiceConfig{ 
     Methods: map[string]grpc.MethodConfig{ 
      "/google.pubsub.v1.Subscriber/Acknowledge": { 
       Timeout: 5 * time.Second, 
      }, 
     }, 
    } 
    scChan <- sc 
}() 

c, err := pubsub.NewClient(ctx, project, option.WithGRPCDialOption(grpc.WithServiceConfig(scChan))) 

而且您可以通過指定grpc.EnableTracing = true來調查原因。

grpc.EnableTracing = true 

c, err := pubsub.NewClient(ctx, project) 
if err != nil { 
    return nil, errors.Wrap(err, "pubsub.NewClient") 
} 

go func(){ 
    http.ListenAndServe(":8080", nil) 
}() 

gRPC的跟蹤信息可以通過golang.org/x/net/trace確認。

+1

非常感謝。這是迄今爲止我得到的最有幫助的答案。我會嘗試。 – linjus