2012-06-08 54 views
0

我試圖從Silverlight客戶端直接向Amazon S3上傳多個文件。用戶從標準文件打開對話框中選擇文件,並且我想鏈接上傳,以便它們一次一個地串行發生。這可能發生在應用程序中的多個位置,所以我試圖將它包裝在一個很好的實用工具類中,該工具類接受所選文件的IEnumerable,在上傳文件時公開文件的IObservable,以便UI可以相應地響應每個文件完成。使用Rx連鎖多個2步文件上傳

這是由於雙方Silverlight和AmazonS3的所有安全性要求相當複雜。我會盡量簡單地解釋我的整個環境,但是我已經用一個小的控制檯應用程序來重現問題,我將在下面發佈代碼。

我有一個第三方工具,Silverlight的一個公開標準的基於事件的異步方法處理上傳到S3。我爲每個上傳的文件創建一個該實用程序的實例。它會創建一個未簽名的請求字符串,然後發送到我的服務器以使用我的私鑰進行簽名。該簽名請求通過也使用基於事件的異步方法的服務代理類發生。一旦我有了簽名的請求,我將它添加到上傳器實例並啓動上傳。

我使用的毗連試過,但我最終只有第一個文件經歷的過程。當我使用合併時,所有文件都完好無損,但以並行方式而不是串行方式。當我使用合併(2)時,所有文件開始第一步,但只有2個完成。

顯然我錯過了與Rx有關的東西,因爲它不像我期望的那樣行事。

namespace RxConcat 
{ 
    using System; 
    using System.Collections.Generic; 
    using System.Linq; 
    using System.Reactive.Linq; 
    using System.Timers; 

    public class SignCompletedEventArgs : EventArgs 
    { 
     public string SignedRequest { get; set; } 
    } 

    public class ChainUploader 
    { 
     public IObservable<string> StartUploading(IEnumerable<string> files) 
     { 
      return files.Select(
        file => from signArgs in this.Sign(file + "_request") 
          from uploadArgs in this.Upload(file, signArgs.EventArgs.SignedRequest) 
          select file).Concat(); 
     } 

     private IObservable<System.Reactive.EventPattern<SignCompletedEventArgs>> Sign(string request) 
     { 
      Console.WriteLine("Signing request '" + request + "'"); 
      var signer = new Signer(); 
      var source = Observable.FromEventPattern<SignCompletedEventArgs>(ev => signer.SignCompleted += ev, ev => signer.SignCompleted -= ev); 
      signer.SignAsync(request); 
      return source; 
     } 

     private IObservable<System.Reactive.EventPattern<EventArgs>> Upload(string file, string signedRequest) 
     { 
      Console.WriteLine("Uploading file '" + file + "'"); 
      var uploader = new Uploader(); 
      var source = Observable.FromEventPattern<EventArgs>(ev => uploader.UploadCompleted += ev, ev => uploader.UploadCompleted -= ev); 
      uploader.UploadAsync(file, signedRequest); 
      return source; 
     } 
    } 

    public class Signer 
    { 
     public event EventHandler<SignCompletedEventArgs> SignCompleted; 

     public void SignAsync(string request) 
     { 
      var timer = new Timer(1000); 
      timer.Elapsed += (sender, args) => 
      { 
       timer.Stop(); 
       if (this.SignCompleted == null) 
       { 
        return; 
       } 

       this.SignCompleted(this, new SignCompletedEventArgs { SignedRequest = request + "signed" }); 
      }; 
      timer.Start(); 
     } 
    } 

    public class Uploader 
    { 
     public event EventHandler<EventArgs> UploadCompleted; 

     public void UploadAsync(string file, string signedRequest) 
     { 
      var timer = new Timer(1000); 
      timer.Elapsed += (sender, args) => 
      { 
       timer.Stop(); 
       if (this.UploadCompleted == null) 
       { 
        return; 
       } 

       this.UploadCompleted(this, new EventArgs()); 
      }; 
      timer.Start(); 
     } 
    } 

    internal class Program 
    { 
     private static void Main(string[] args) 
     { 
      var files = new[] { "foo", "bar", "baz" }; 
      var uploader = new ChainUploader(); 
      var token = uploader.StartUploading(files).Subscribe(file => Console.WriteLine("Upload completed for '" + file + "'")); 
      Console.ReadLine(); 
     } 
    } 
} 

回答

1

處理每個文件的2步上傳的基本可觀察性永遠不會「完成」,從而阻止鏈中的下一個啓動。在調用Concat()之前向該observable添加一個Limit(1)並且它將正常工作。

return files.Select(file => (from signArgs in this.Sign(file + "_request") 
          from uploadArgs in this.Upload(file, signArgs.EventArgs.SignedRequest) 
          select file).Take(1)).Concat();