2010-09-27 74 views
4

我已經使用Rx爲我的WCF Web服務編寫了簡化的Silverlight客戶端庫,但是我注意到有時候我缺少完成的事件。使用Rx簡化異步Silverlight Web服務請求

public IObservable<XElement> GetReport(string reportName) 
{ 
    return from client in Observable.Return(new WebServiceClient()) 
      from request in Observable.ToAsync<string>(client.GetReportDataAsync)(reportName) 
      from result in Observable.FromEvent<GetReportDataCompletedEventArgs>(client, "GetReportDataCompleted").Take(1) 
      from close in this.CloseClient(client) 
      select result.EventArgs.Result; 
} 

我相信這個問題是由以下事實導致的調用Web服務和訂閱完成事件之前返回。我無法弄清楚如何在異步調用之前讓Rx訂閱事件。我試過StartWith,但那需要輸入和輸出類型相同,有什麼想法?

+0

看起來不是很簡單給我嗎? – AnthonyWJones 2010-09-27 12:24:08

+0

相信我,當你使用它。調用它只是service.GetReport(「MyReport1」)。訂閱(this.LoadResults); – 2010-09-27 23:55:26

回答

7

好像最好答案是使用Observable.CreateWithDisposable()

例如

public IObservable<XElement> GetReport(string reportName) 
{ 
    return from client in Observable.Return(new WebServiceClient()) 
      from completed in Observable.CreateWithDisposable<GetReportDataCompletedEventArgs>(observer => 
       { 
        var subscription = Observable.FromEvent<GetReportDataCompletedEventArgs>(client, "GetReportDataCompleted") 
         .Take(1) 
         .Select(e => e.EventArgs) 
         .Subscribe(observer); 
        client.GetReportDataAsync(reportName); 
        return subscription; 
       }) 
      from close in this.CloseClient(client) 
      select completed.Result; 
} 

爲了更方便與我重構了CreateWithDisposable成可以用我所有的Web服務調用,包括事件參數類型自動確定事件的名稱使用的公共功能的工作:

private IObservable<T> CallService<T>(ICommunicationObject serviceClient, Action start) where T : AsyncCompletedEventArgs 
{ 
    if (typeof(T) == typeof(AsyncCompletedEventArgs)) 
    { 
     throw new InvalidOperationException("Event arguments type cannot be used to determine event name, use event name overload instead."); 
    } 

    string completedEventName = typeof(T).Name.TrimEnd("EventArgs"); 
    return CallService<T>(serviceClient, start, completedEventName); 
} 

private IObservable<T> CallService<T>(ICommunicationObject serviceClient, Action start, string completedEventName) where T : AsyncCompletedEventArgs 
{ 
    return Observable.CreateWithDisposable<T>(observer => 
    { 
     var subscription = Observable.FromEvent<T>(serviceClient, completedEventName).Take(1).Select(e => e.EventArgs).Subscribe(observer); 
     start(); 
     return subscription; 
    }); 
} 

// Example usage: 
public IObservable<XElement> GetReport(string reportName) 
{ 
    return from client in Observable.Return(new WebServiceClient()) 
      from completed in this.CallService<GetReportDataCompletedEventArgs>(client,() => client.GetReportDataAsync(reportName)) 
      from close in this.CloseClient(client) 
      select completed.Result; 
} 

/// <summary> 
/// Asynchronously closes the web service client 
/// </summary> 
/// <param name="client">The web service client to be closed.</param> 
/// <returns>Returns a cold observable sequence of a single success Unit.</returns> 
private IObservable<AsyncCompletedEventArgs> CloseClient(WebServiceClient client) 
{ 
    return this.CallService<AsyncCompletedEventArgs>(client, client.CloseAsync, "CloseCompleted"); 
} 

希望這可以幫助別人!

+0

每次創建和關閉客戶端都有什麼好處嗎? 另外,你有一個CloseClient方法的例子嗎? – Jordan 2010-11-12 20:25:40

+0

我每次關閉並重新打開客戶端以確保不存在線程問題。底層的頻道無論如何都是由WCF緩存的,所以幾乎沒有這樣做的成本。對不起,我忘了包含CloseClient,現在我將添加它。 – 2010-11-15 01:21:17

1

我需要使用通用WebClient.DownloadStringAsync所以在這裏我的版本。

首先,包裹事件:

public static IObservable<IEvent<DownloadStringCompletedEventArgs>> 
    GetDownloadStringObservableEvent(this WebClient wc) 
{ 
    return Observable.FromEvent<DownloadStringCompletedEventArgs>(
     wc, "DownloadStringCompleted"); 
} 

然後創建擴展方法:

public static IObservable<string> GetDownloadString(this WebClient wc, Uri uri) 
{ 
    return Observable.CreateWithDisposable<string>(
     observer => { 
      // Several downloads may be going on simultaneously. The token allows 
      // us to establish that we're retrieving the right one. 
      Guid token = Guid.NewGuid(); 
      var stringDownloaded = wc.GetDownloadStringObservableEvent() 
        .Where(evt => ((Guid)evt.EventArgs.UserState) == token) 
        .Take(1);  //implicitly unhooks handler after event is received 
      bool errorOccurred = false; 
      IDisposable unsubscribe = 
       stringDownloaded.Subscribe(
        // OnNext action 
        ev => { 
         // Propagate the exception if one is reported. 
         if (ev.EventArgs.Error != null) { 
          errorOccurred = true; 
          observer.OnError(ev.EventArgs.Error); 
         } else if (!ev.EventArgs.Cancelled) { 
          observer.OnNext(ev.EventArgs.Result); 
         } 
        }, 
        // OnError action (propagate exception) 
        ex => observer.OnError(ex), 
        // OnCompleted action 
        () => { 
         if (!errorOccurred) { 
          observer.OnCompleted(); 
         } 
        }); 
      try { 
       wc.DownloadStringAsync(uri, token); 
      } catch (Exception ex) { 
       observer.OnError(ex); 
      } 
      return unsubscribe; 
     } 
    ); 
} 

用法很簡單:

wc.GetDownloadString(new Uri("http://myservice")) 
    .Subscribe(resultCallback , errorCallback);