我有一個服務需要儘快讀取來自Amazon SQS的消息。我們預計交通擁擠,我希望能夠以每秒10K信息的速度讀取。不幸的是,我目前在10條信息/秒左右。顯然,我有工作要做。在C#中執行併發任務
這是我使用的是什麼(轉換爲一個控制檯應用程序,使測試更容易):
private static int _concurrentRequests;
private static int _maxConcurrentRequests;
public static void Main(string[] args) {
_concurrentRequests = 0;
_maxConcurrentRequests = 100;
var timer = new Timer();
timer.Elapsed += new ElapsedEventHandler(OnTimedEvent);
timer.Interval = 10;
timer.Enabled = true;
Console.ReadLine();
timer.Dispose();
}
public static void OnTimedEvent(object s, ElapsedEventArgs e) {
if (_concurrentRequests < _maxConcurrentRequests) {
_concurrentRequests++;
ProcessMessages();
}
}
public static async Task ProcessMessages() {
var manager = new MessageManager();
manager.ProcessMessages(); // this is an async method that reads in the messages from SQS
_concurrentRequests--;
}
我沒有收到近100所併發請求的任何地方,而它似乎並沒有被解僱每10毫秒有一個OnTimedEvent
。
我不確定Timer
是否是正確的方法。我對這種編碼沒有太多經驗。我很樂意在這一點上嘗試任何事情。
更新
感謝calebboyd,我有點接近實現我的目標。下面是一些非常糟糕的代碼:
private static SemaphoreSlim _locker;
public static void Main(string[] args) {
_manager = new MessageManager();
RunBatchProcessingForeverAsync();
}
private static async Task RunBatchProcessingForeverAsync() {
_locker = new SemaphoreSlim(10, 10);
while (true) {
Thread thread = new Thread(new ParameterizedThreadStart(Process));
thread.Start();
}
}
private static async void Process(object args) {
_locker.WaitAsync();
try {
await _manager.ProcessMessages();
}
finally {
_locker.Release();
}
}
我能夠接近讀取每秒這一消息的可敬的數量,但問題是我ProcessMessages
調用從未完成(或者也許會經過一個很長的時間)。我想我可能需要限制我在任何時候運行的線程數。
關於如何改進此代碼的任何建議,以便ProcessMessages
有機會完成?
定時器具有每15.6毫秒最多可以觸發的限制。這是.NET實現定時器的一個限制。 – Enigmativity
聲稱是異步的代碼沒有'await' - 這很奇怪。考慮更新你的代碼,看起來更真實。 –
下面是一個非常重要的問題,可幫助確定多線程可能實現的功能 - 如果您在單個線程中串聯運行請求,則每秒可處理多少條消息? – Enigmativity