2016-07-07 178 views
2

我想排隊一系列任務並使用Azure服務結構異步運行它們。我目前正在使用具有輔助角色的CloudMessageQueue。我正在嘗試遷移到Service Fabric。從工人的角色,這是我的代碼:Azure服務結構消息隊列

private void ExecuteTask() 
    { 
     CloudQueueMessage message = null; 

     if (queue == null) 
     { 
      jmaLogProvider.WriteToLog(new ErrorMessage(MessageSeverity.Error, String.Format("Queue for WorkerRole2 is null. Exiting."))); 
      return; 
     } 

     try 
     { 
      message = queue.GetMessage(); 
      if (message != null) 
      { 
       JMATask task = GetTask(message.AsString); 
       string msg = (message == null) ? string.Empty : message.AsString; 
       //jmaLogProvider.WriteToLog(new ErrorMessage(MessageSeverity.JMA, String.Format("Executing task {0}", msg))); 
       queue.DeleteMessage(message); 
       PerformTask(task); 
      } 
     } 
     catch (Exception ex) 
     { 
      string msg = (message == null) ? string.Empty : message.AsString; 
      jmaLogProvider.WriteToLog(new ErrorMessage(MessageSeverity.Error, String.Format("Message {0} Error removing message from queue {1}", msg, ex.ToString()))); 
     } 
    } 

我有一些問題:

  1. 如何運行的異步執行任務的方法?我想同時運行30-40個任務。
  2. 我有一個JMATask的列表。如何將列表添加到隊列中?
  3. 列表是否需要添加到隊列中?

    namespace Stateful1 
    { 
        public class JMATask 
        { 
        public string Name { get; set; } 
        } 
    
    /// <summary> 
    /// An instance of this class is created for each service replica by the Service Fabric runtime. 
    /// </summary> 
    internal sealed class Stateful1 : StatefulService 
    { 
    public Stateful1(StatefulServiceContext context) 
        : base(context) 
    { } 
    
    /// <summary> 
    /// Optional override to create listeners (e.g., HTTP, Service Remoting, WCF, etc.) for this service replica to handle client or user requests. 
    /// </summary> 
    /// <remarks> 
    /// For more information on service communication, see http://aka.ms/servicefabricservicecommunication 
    /// </remarks> 
    /// <returns>A collection of listeners.</returns> 
    protected override IEnumerable<ServiceReplicaListener> CreateServiceReplicaListeners() 
    { 
        return new ServiceReplicaListener[0]; 
    } 
    
    /// <summary> 
    /// This is the main entry point for your service replica. 
    /// This method executes when this replica of your service becomes primary and has write status. 
    /// </summary> 
    /// <param name="cancellationToken">Canceled when Service Fabric needs to shut down this service replica.</param> 
    protected override async Task RunAsync(CancellationToken cancellationToken) 
    { 
        // TODO: Replace the following sample code with your own logic 
        //  or remove this RunAsync override if it's not needed in your service. 
    
        IReliableQueue<JMATask> tasks = await this.StateManager.GetOrAddAsync<IReliableQueue<JMATask>>("JMATasks"); 
        //var myDictionary = await this.StateManager.GetOrAddAsync<IReliableDictionary<string, long>>("myDictionary"); 
    
        while (true) 
        { 
         cancellationToken.ThrowIfCancellationRequested(); 
    
         using (var tx = this.StateManager.CreateTransaction()) 
         { 
          var result = await tasks.TryDequeueAsync(tx); 
    
          //how do I execute this method async? 
          PerformTask(result.Value); 
    
          //Create list of JMA Tasks to queue up 
          await tasks.EnqueueAsync(tx, new JMATask()); 
    
          //var result = await myDictionary.TryGetValueAsync(tx, "Counter"); 
    
          //ServiceEventSource.Current.ServiceMessage(this, "Current Counter Value: {0}", 
          // result.HasValue ? result.Value.ToString() : "Value does not exist."); 
    
          //await myDictionary.AddOrUpdateAsync(tx, "Counter", 0, (key, value) => ++value); 
    
          // If an exception is thrown before calling CommitAsync, the transaction aborts, all changes are 
          // discarded, and nothing is saved to the secondary replicas. 
          await tx.CommitAsync(); 
         } 
    
         await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken); 
        } 
    } 
    
    private async void PerformTask(JMATask task) 
    { 
        //execute task 
    } 
    

    }

+0

掙扎與類似的問題,這是有益的,謝謝。好奇你結束了什麼,以及它如何爲你工作。 – kenchilada

+1

我使用了另一種技術:https://msdn.microsoft.com/en-us/library/dn568104.aspx –

回答

1

的RunAsync方法不應該有這樣的一行代碼:await tasks.EnqueueAsync(tx, new JMATask());

創建JMA任務列表排隊應該在你的狀態服務的另一種方法,看起來像這樣:

public async Task AddJMATaskAsync(JMATask task) 
    { 
     var tasksQueue = await StateManager.GetOrAddAsync<IReliableQueue<JMATask>>("JMATasks"); 
     using (var tx = StateManager.CreateTransaction()) 
     { 
      try 
      { 
       await tasksQueue.EnqueueAsync(tx, request); 
       await tx.CommitAsync(); 
      } 
      catch (Exception ex) 
      { 
       tx.Abort(); 
      } 
     } 
    } 

然後你PerformTask方法可以包含對無狀態微服務的調用:

public async Task PerformTask (JMATask task) 
    { 
     //1. resolve stateless microservice URI 
     // statelessSvc 

     //2. call method of the stateless microservice 
     // statelessSvc.PerformTask(task); 
    } 

因此,基本上,有狀態服務將只有queu e並將任務列出。執行實際任務可以通過微服務完成,微服務將可用於羣集中的所有節點。

1

您可以創建任務列表,然後做等待Task.WhenAll(任務列表);

這可能是最簡單的直接答案。

但是,如果每個任務略有不同,您是否考慮爲每項任務創建單獨的微服務?

+0

是的,每個任務都不同。我如何創建個人微服務? –

+0

理想情況下,爲每項任務創建一個服務(有狀態,無狀態)。沒有更深入的理解就很難回答,但這是我的直覺反應。每個服務都應該有一個簡單的單一責任,並且非常擅長做它應該做的事情。 –

+0

我更新了我的帖子。任務將數據從一個系統移到另一個系統。有幾百個不同的任務,但其中許多執行相同類型的操作。 –