2

我想實現一個ASP.NET核心API,它不響應HTTP請求,但在啓動時開始監聽Google Cloud Pub/Sub消息,並且在其整個生命週期中一直保持無限期地監聽。如何在ASP.NET Core應用程序中持續收聽Pub/Sub消息?

使用官方Pub/Sub SDK實現此功能的首選方式是什麼?

我能想到的方法有兩種:

方法1:只需使用一個SimpleSubscriber,並在Startup.Configure開始聽消息:

public void Configure(IApplicationBuilder app) 
{ 
    var simpleSubscriber = await SimpleSubscriber.CreateAsync(subscriptionName); 
    var receivedMessages = new List<PubsubMessage>(); 

    simpleSubscriber.StartAsync((msg, cancellationToken) => 
    { 
     // Process the message here. 

     return Task.FromResult(SimpleSubscriber.Reply.Ack); 
    }); 

    ... 
} 

方法2:專門使用一個庫創建用於定期運行作業,例如Quartz,Hangfire或FluentScheduler,並且每次觸發作業時,都會使用SubscriberClient來提取新消息。

哪一個是首選方法?第一個似乎更簡單,但我不確定它是否真的可靠。

+0

@Flater這是一個在Kubernetes中運行的ASP.NET Core應用程序。我希望該應用具有一些REST端點,**和**不斷監聽一些發佈/訂閱消息。 (我知道我可以將這兩件事分成兩個部分,但如果可能的話,爲了方便起見,我想保留它。) –

+1

我會從最簡單的方法開始,然後根據需要移動到圖書館。在你的示例中,我只添加了用於在'靜態字段中的某處保留'simpleSubscriber'的代碼,以保護對象免受GC – Shrike

+1

的影響。就我*知道*而言,第一種方法應該沒問題 - 但我正在與同事誰知道更多。 –

回答

2

第一種方法肯定是如何打算使用它。

但是,看到該文檔爲StartAsync

開始接收郵件。當 StopAsync(CancellationToken)被調用時返回的Task完成,或者發生不可恢復的 故障。此方法不能被稱爲不止一次SimpleSubscriber實例。

所以你確實需要處理意外StartAsync關閉不可恢復的錯誤。最簡單的做法是使用外部循環,但考慮到這些錯誤被認爲是不可恢復的,這可能與調用需要在成功之前進行更改有關。

的代碼可能是這樣的:

while (true) 
{ 
    // Each SimpleSubscriber instance must only be used once. 
    var simpleSubscriber = await SimpleSubscriber.CreateAsync(subscriptionName); 
    try 
    { 
     await simpleSubscriber.StartAsync((msg, cancellationToken) => 
     { 
      // Process the message here. 
      return Task.FromResult(SimpleSubscriber.Reply.Ack); 
     }); 
    } 
    catch (Exception e) 
    { 
     // Handle the unrecoverable error somehow... 
    } 
} 

如果如預期這不起作用,請let us know

+0

感謝您的信息!由於不可恢復的錯誤,你的意思是像堆棧溢出或內存不足,所以它不會因爲暫時的網絡故障或類似的事情而停止,對嗎?順便說一句。如何實現監聽,內部是SimpleSubscriber輪詢還是Pub/Sub支持類似WebSocket連接? –

+1

@MarkVincze SimpleSubscriber源代碼位於:https://github.com/GoogleCloudPlatform/google-cloud-dotnet/blob/master/apis/Google.Cloud.PubSub。V1/Google.Cloud.PubSub.V1/SimpleSubscriber.cs,這樣您就可以自己查看;)它使用長時間運行的流式RPC調用,該調用在第613行開始。 – Chris

+1

@MarkVincze「不可恢復」意味着不可恢復的RPC錯誤。 *可恢復* RPC錯誤列表位於以下位置:https://github.com/GoogleCloudPlatform/google-cloud-dotnet/blob/master/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/Extensions .cs#L61所有其他都是不可恢復的。 – Chris

相關問題