2

我有一個服務需要儘快讀取來自Amazon SQS的消息。我們預計交通擁擠,我希望能夠以每秒10K信息的速度讀取。不幸的是,我目前在10條信息/秒左右。顯然,我有工作要做。在C#中執行併發任務

這是我使用的是什麼(轉換爲一個控制檯應用程序,使測試更容易):

private static int _concurrentRequests; 
private static int _maxConcurrentRequests; 

public static void Main(string[] args) { 
    _concurrentRequests = 0; 
    _maxConcurrentRequests = 100; 

    var timer = new Timer(); 
    timer.Elapsed += new ElapsedEventHandler(OnTimedEvent); 
    timer.Interval = 10; 
    timer.Enabled = true; 

    Console.ReadLine(); 
    timer.Dispose(); 
} 

public static void OnTimedEvent(object s, ElapsedEventArgs e) { 
    if (_concurrentRequests < _maxConcurrentRequests) { 
     _concurrentRequests++; 
     ProcessMessages(); 
    } 
} 

public static async Task ProcessMessages() { 
    var manager = new MessageManager(); 
    manager.ProcessMessages(); // this is an async method that reads in the messages from SQS 

    _concurrentRequests--; 
} 

我沒有收到近100所併發請求的任何地方,而它似乎並沒有被解僱每10毫秒有一個OnTimedEvent

我不確定Timer是否是正確的方法。我對這種編碼沒有太多經驗。我很樂意在這一點上嘗試任何事情。

更新

感謝calebboyd,我有點接近實現我的目標。下面是一些非常糟糕的代碼:

private static SemaphoreSlim _locker; 

public static void Main(string[] args) { 
    _manager = new MessageManager(); 

    RunBatchProcessingForeverAsync(); 
} 
private static async Task RunBatchProcessingForeverAsync() { 
    _locker = new SemaphoreSlim(10, 10); 
    while (true) { 
     Thread thread = new Thread(new ParameterizedThreadStart(Process)); 
     thread.Start(); 
    } 
} 

private static async void Process(object args) { 
    _locker.WaitAsync(); 
    try { 
     await _manager.ProcessMessages(); 
    } 
    finally { 
     _locker.Release(); 
    } 

} 

我能夠接近讀取每秒這一消息的可敬的數量,但問題是我ProcessMessages調用從未完成(或者也許會經過一個很長的時間)。我想我可能需要限制我在任何時候運行的線程數。

關於如何改進此代碼的任何建議,以便ProcessMessages有機會完成?

+2

定時器具有每15.6毫秒最多可以觸發的限制。這是.NET實現定時器的一個限制。 – Enigmativity

+0

聲稱是異步的代碼沒有'await' - 這很奇怪。考慮更新你的代碼,看起來更真實。 –

+0

下面是一個非常重要的問題,可幫助確定多線程可能實現的功能 - 如果您在單個線程中串聯運行請求,則每秒可處理多少條消息? – Enigmativity

回答

1

正如@calebboyd建議的,你必須首先讓你的線程異步。現在,如果你去這裏 - Where to use concurrency when calling an API,你會看到一個異步線程足夠用於彙集網絡資源。如果你能夠在單個請求中從亞馬遜獲得多條消息,那麼你的生產者線程(即對亞馬遜進行異步調用的線程)將會很好 - 它可以每秒發送數百個請求。這不會是你的瓶頸。但是,處理收到的數據的延續任務將交給線程池。在這裏你有機會脖子 - 假設每秒有100個響應到達,每個響應包含100條消息(達到10K信息/秒的近似值)。每秒你有100個新的任務,每個將需要你的線程處理100條消息。現在有兩種選擇:(1)這些消息的處理不受CPU限制 - 只需將它們發送到數據庫,或者(2)執行CPU消耗計算,例如科學計算,序列化或一些繁重的業務邏輯。如果(1)是你的情況,那麼瓶頸就會向後推向DB。如果(2),那麼你沒有選擇,只能擴大或縮小,或優化計算。但是你的瓶頸可能不是生產線 - 如果它正確實施(參見上面的鏈接)。

+0

這是我的問題。數據庫現在絕對是瓶頸。實現你的批量插入想法應該會有所幫助。感謝您的幫助! – Irving

0

我會假定異步方法在線程池中排隊,線程池只有儘可能多的線程,因爲您有可用的處理器。您可能會生成100個請求,但它們仍然由8個線程執行。嘗試創建N個線程的數組並將其使用。

+1

調用'ThreadPool.GetMaxThreads'在我的機器上返回1023個可用線程。我有一個處理器,有四個內核和8個超線程。 – Enigmativity

+0

@Enigmativity並不意味着Robert Hudjakov是完全錯誤的 - 根據我的理解,在理論上的純異步程序中,線程池不會創建超過8(在你的情況下)線程來處理所有排隊的任務,即使緩衝區飽和。雖然我可能是錯的。 –

3

因爲您的MessageManager對象上的ProcessMessages方法未被等待,所以我將假設它綁定到它所執行的同一個線程。僅將該函數標記爲async不會將工作傳遞到新線程。有了這個假設,這段代碼實際上並沒有在多個線程中執行。你可以使用下面的代碼在更多的線程池中執行你的代碼。

它可能是管理器對象無法處理併發使用。所以我在Task.Run lambda中創建它。這也可能是昂貴的,因此不切實際。

async Task RunBatchProcessingForeverAsync() { 
    var lock = new SemaphoreSlim(initialCount: 10); 
    while (true) { 
     await lock.WaitAsync(); 
     Task.Run(() => { 
      try { 
       var manager = new MessageManager(); 
       manager.ProcessMessages(); 
      } finally { 
       lock.Release(); 
      } 
     }); 
    } 
} 

我有一段時間沒有寫C#,但這應該同時運行你的方法10次,反覆,永遠。

+3

我從來沒有見過這種模式。我喜歡。 – usr

+0

謝謝!我能夠取得一些進展,但我仍然遇到了障礙。我不得不稍微更新一下你的代碼,以便足夠快。你能看看我的更新嗎? – Irving

+0

你應該可以從main方法調用'RunBatchProcessingForeverAsync()。Wait()'。看起來你可能會創建一個無限數量的線程(在while循環中沒有'await')。你的ProcessMessages是否返回一個可以等待的任務? – calebboyd