2014-03-13 93 views
0

我有一個應用程序通知我們的SMS數據庫中的每個用戶更新我們的系統。它使用實體框架來選擇每條記錄,然後創建一個新的任務來向該人發送消息。從理論上講,這應該是一個快速的過程。我做錯了什麼,因爲它每一秒或兩秒只能得到一次完整的迴應。使用Task.Factory和Entity Framework的多線程?

我認爲這個問題與我在Task.Factory.StartNew()中設置任務的方式有關。它表現得像是在同步運行,但我希望它能夠異步運行。

如果我完全脫離了我如何使用任務的基礎,請告訴我。我從this post得到了我的靈感。

這裏是我的代碼:

class Program 
{ 
static List<MessageToSend> Messages = new List<MessageToSend>(); 
static Entities oDatabase = new Entities(); 
static SMS.API oAPI = new SMS.API(); 

const string sAuthToken = "*****"; 
const string sNotificationMessage = "*****"; 

static void Main(string[] args) 
{ 
    foreach (var subscriber in oDatabase.SMS_Subscribers.Where(x => x.GlobalOptOut == false)) 
    { 
     MessageToSend oMessage = new MessageToSend(); 
     oMessage.ID = subscriber.ID; 
     oMessage.MobileNumber = subscriber.MobileNumber; 

     var recentlySentMessage = oDatabase.SMS_OutgoingMessages.Where(x => x.Message == sNotificationMessage && x.MobileNumber == oMessage.MobileNumber && x.Sent > new DateTime(2014, 3, 12)).FirstOrDefault(); 
     if (recentlySentMessage != null) 
     { 
      oMessage.Completed = true; 
      continue; 
     } 

     Task t = Task.Factory.StartNew(() => 
     { 
      try{ 
       var keywordID = oDatabase.SMS_SubscribersKeywords.Where(x => x.SubscriberID == oMessage.ID).First().KeywordID; 
       var keyword = oDatabase.SMS_Keywords.Where(x => x.ID == keywordID).First(); 
       oMessage.DemographicID = keyword.DemographicID; 
       oMessage.Keyword = keyword.Keyword; 

       SendNotificationMessage(oMessage); 
      } 
      catch (Exception oEx){ //Write exception to console} 
     }); 

     Thread.Sleep(15); 
    } 

    while (Messages.ToList().Any(x => !x.Completed)){ //wait till all are completed} 
} 

public static void SendNotificationMessage(object message) 
{ 
    MessageToSend oMessage = (MessageToSend)message; 
    try 
    { 
     SMS.APIResponse oResponse = oAPI.SendMessage(sAuthToken, oMessage.DemographicID, oMessage.Keyword, oMessage.MobileNumber, sNotificationMessage); 

     if (oResponse.Success){ //Write success to console } 
     else{ //Write failure to console } 
    } 
    catch (Exception oEx){ //Write Exception to console } 

    oMessage.Completed = true; 
} 
} 

class MessageToSend 
{ 
public long ID { get; set; } 
public long DemographicID {get;set;} 
public string MobileNumber { get; set; } 
public bool Completed { get; set; } 
public string Keyword { get; set; } 

public MessageToSend(){ Completed = false; } 
} 

編輯:在foreach塊的內部現在看起來是這樣的:

 MessageToSend oMessage = new MessageToSend(); 
     oMessage.ID = subscriber.ID; 
     oMessage.MobileNumber = subscriber.MobileNumber; 

     int keywordID = 0; 
     SMSShortcodeMover.SMS_Keywords keyword; 

     var recentlySentMessage = oDatabase.SMS_OutgoingMessages.Where(x => x.Message == sNotificationMessage && x.MobileNumber == oMessage.MobileNumber && x.Sent > new DateTime(2014, 3, 12)).FirstOrDefault(); 
     if (recentlySentMessage != null) 
     { 
      oMessage.Completed = true; 
      continue; 
     } 

     try 
     { 
      keywordID = (int)oDatabase.SMS_SubscribersKeywords.Where(x => x.SubscriberID == oMessage.ID).First().KeywordID; 
      keyword = oDatabase.SMS_Keywords.Where(x => x.ID == keywordID).First(); 
     } catch (Exception oEx){ //write exception to console, then continue; } 

     Task t = Task.Factory.StartNew(() => 
     { 
      oMessage.DemographicID = keyword.DemographicID; 
      oMessage.Keyword = keyword.Keyword; 

      SendNotificationMessage(oMessage); 
     }); 

     Thread.Sleep(15); 
    } 

編輯2: 我再次更新我的代碼,我現在收集所有的我的數據在我進入發送之前。它仍然掛在某個地方,但它現在可以在大約5秒內獲得所有52,000行數據。代碼如下所示:

var query = 
(from subscriber in oDatabase.SMS_Subscribers 
where subscriber.GlobalOptOut == false 
where !(from x in oDatabase.SMS_OutgoingMessages 
     where x.Message == sNotificationMessage 
     where x.MobileNumber == subscriber.MobileNumber 
     where x.Sent > new DateTime(2014, 3, 12) 
     select x).Any() 
join sk in oDatabase.SMS_SubscribersKeywords 
    on subscriber.ID equals sk.SubscriberID 
join k in oDatabase.SMS_Keywords on sk.KeywordID equals k.ID into ks 
from k2 in ks.Take(1) 
select new MessageToSend() 
{ 
    ID = subscriber.ID, 
    MobileNumber = subscriber.MobileNumber, 
    DemographicID = k2.DemographicID, 
    Keyword = k2.Keyword 
}).ToList(); 

foreach(var q in query){ 
    Task t = Task.Factory.StartNew(() => SendNotificationMessage(q)); 
    Tasks.Add(t); 
    Thread.Sleep(80); 
} 

Task.WaitAll(Tasks.ToArray()); 
+0

首先,嘗試在任務外執行oDatabase查詢。另外爲什麼你的代碼有一個'Thread.Sleep(15)'? – Enigmativity

+0

我有一個'Thread.Sleep(15)'來限制循環的速度。我們允許我們的SMS提供商產生多少流量是有限制的。 我最初在任務外部有'oDatabase'查詢,但是我把它們放在裏面去試圖阻止它們阻塞代碼的其餘部分。不過,它似乎沒有幫助。 – ijb109

回答

1

如果我是你,我會盡力執行所有的數據庫調用一次,試圖仙您的消息之前。

嘗試這樣做:

var query = 
    from subscriber in oDatabase.SMS_Subscribers 
    where subscriber.GlobalOptOut == false 
    where !(from x in oDatabase.SMS_OutgoingMessages 
     where x.Message == sNotificationMessage 
     where x.MobileNumber == subscriber.MobileNumber 
     where x.Sent > new DateTime(2014, 3, 12) 
     select x 
    ).Any() 
    join sk in oDatabase.SMS_SubscribersKeywords 
     on subscriber.ID equals sk.SubscriberID 
    join k in oDatabase.SMS_Keywords on sk.KeywordID equals k.ID into ks 
    from k2 in ks.Take(1) 
    select new 
    { 
      ID = subscriber.ID, 
      MobileNumber = subscriber.MobileNumber, 
      DemographicID = k2.DemographicID, 
      Keyword = k2.Keyword 
    }; 

var tasks = 
    from x in query.ToArray() 
    let message = new MessageToSend() 
    { 
     ID = x.ID, 
     MobileNumber = x.MobileNumber, 
     DemographicID = x.DemographicID, 
     Keyword = x.Keyword 
    } 
    select Task.Factory.StartNew(() => SendNotificationMessage(message)); 

Task.WaitAll(tasks.ToArray()); 

我沒有你的數據庫,所以我不能對此進行測試,但如果這是不完全正確這樣的事情應該工作。

+0

在這裏有一些不好的條目,這就是爲什麼我有所有的try/catch塊。這個查詢會避免這些嗎?另外,非常感謝。我永遠不會想到這樣做。 – ijb109

+0

這是一個偉大的方向,我仍然在努力。我有一個錯誤,看起來像這篇文章中的一個,我想弄清楚如何調整選擇在最後:http://stackoverflow.com/questions/7259567/linq-to-entities-does-not -recognize-the-method – ijb109

+0

@ ijb109 - 我修改了查詢以擺脫您提到的錯誤的問題。基本上我從建立你需要的對象中分離出數據庫查詢。注意'查詢。ToArray()'調用強制執行數據庫查詢。現在應該更接近工作了。 – Enigmativity

0

這並不讓我感到吃驚,它的服用1-2秒,每個循環的每次迭代,因爲你有同步執行3所獨立的數據庫調用。數據庫調用在事情的宏偉計劃中非常緩慢。解決這個問題的方法之一是創建一個方法,除了任務代碼之外,它還包含foreach塊中的所有內容,然後使用任務來調用它。只需要小心,任務內部的任何內容都不會阻塞。

I.e.

var tasks = new List<Task>(); 
foreach (var subscriber in oDatabase.SMS_Subscribers.Where(x => x.GlobalOptOut == false)) 
{ 
    tasks.Add(Task.Factory.StartNew(() => SendNotificationTask(subscriber)); 
    Thread.Sleep(15); 
} 
//Might want to to use Task.WhenAll instead of WaitAll. Just need to debug it and see what happens. 
Task.WaitAll(tasks.ToArray()); 



public void SendNotificationTask(SomeType subscriber) 
{ 
    MessageToSend oMessage = new MessageToSend(); 
    oMessage.ID = subscriber.ID; 
    oMessage.MobileNumber = subscriber.MobileNumber; 

    int keywordID = 0; 
    SMSShortcodeMover.SMS_Keywords keyword; 

    ////Database call 1 
    var recentlySentMessage = oDatabase.SMS_OutgoingMessages.Where(x => x.Message == sNotificationMessage && x.MobileNumber == oMessage.MobileNumber && x.Sent > new DateTime(2014, 3, 12)).FirstOrDefault(); 
    if (recentlySentMessage != null) 
    { 
     oMessage.Completed = true; 
    } 
    else 
    { 
     try 
     { 
     ////Database call 2 
     keywordID = (int)oDatabase.SMS_SubscribersKeywords.Where(x => x.SubscriberID == oMessage.ID).First().KeywordID; 

     ////Database call 3 
     keyword = oDatabase.SMS_Keywords.Where(x => x.ID == keywordID).First(); 
    } catch (Exception oEx){ //write exception to console, then continue; } 

    oMessage.DemographicID = keyword.DemographicID; 
    oMessage.Keyword = keyword.Keyword; 

    SendNotificationMessage(oMessage); 

    } 

}