2016-11-25 50 views
0

我流的流動,其目標是計算簡單「校驗」的內容在一組的.zip文件。Observable.Using和異步流越來越損壞的數據

要做到這一點,我已經設置了觀察到的是:

  1. 發生在一個給定的文件夾
  2. 讀取每個文件的內容中的所有文件(閱讀爲ZipArchive
  3. 每個條目在每個文件中,執行校驗和

爲了說明它的計算中,我已創建這個前充足:

通知AsyncContext.Runhttps://stackoverflow.com/a/9212343/1025407)的使用,使Main方法等待GetChecksum因爲它是一個控制檯應用程序

namespace DisposePoC 
{ 
    using System.Collections.Generic; 
    using System.IO; 
    using System.IO.Compression; 
    using System.Reactive.Linq; 
    using Nito.AsyncEx; 
    using System.Linq; 
    using System.Threading.Tasks; 


    class Program 
    { 
     private static void Main() 
     { 
      AsyncContext.Run(GetChecksums); 
     } 

     private static async Task<IList<byte>> GetChecksums() 
     { 
      var bytes = Directory.EnumerateFiles("FolderWithZips") 
       .ToObservable() 
       .SelectMany(path => Observable.Using(() => CreateZipArchive(path), archive => archive.Entries.ToObservable())) 
       .SelectMany(entry => Observable.Using(entry.Open, stream => Observable.FromAsync(() => CalculateChecksum(stream, entry.Length)))); 

      return await bytes.ToList(); 
     } 

     private static ZipArchive CreateZipArchive(string path) 
     { 
      return new ZipArchive(new FileStream(path, FileMode.Open, FileAccess.Read)); 
     } 

     private static async Task<byte> CalculateChecksum(Stream stream, long entryLength) 
     { 
      var bytes = await GetBytesFromStream(stream, entryLength); 
      return bytes.Aggregate((b1, b2) => (byte) (b1^b2)); 
     } 

     private static async Task<byte[]> GetBytesFromStream(Stream stream, long entryLength) 
     { 
      byte[] bytes = new byte[entryLength]; 
      await stream.ReadAsync(bytes, 0, (int)entryLength); 
      return bytes;    
     } 
    } 
} 

運行應用程序,我得到的各種錯誤:

'System.IO.InvalidDataException':本地文件頭損壞。 'System.NotSupportedException':Stream不支持讀取。 'System.ObjectDisposedException':無法訪問處置的對象。 'System.IO.InvalidDataException':塊長度與其補碼不匹配。

我在做什麼錯?

觀察本身是否存在問題,還是因爲ZipArchive不是線程安全的?如果不是,我如何使代碼工作?

+1

我會對此做一個評論,因爲我目前無法驗證代碼內容,但我懷疑問題是在第一個SelectMany中創建的ZipArchive正在被Using語句處置然後才能閱讀下一行中的入口流 - 實質上,一次性範圍確定是錯誤的。我會將第二個SelectMany的邏輯移到第一個。我還會驗證您的測試數據沒有被破壞,正如第一個例外所示。 – Andrew

+0

我想我明白你的觀點。但是,如果範圍是錯誤的,那麼我如何修改代碼以避免處理每個ZipArchive,直到處理完所有條目?它甚至有可能嗎? – SuperJMN

回答

1

似乎沒有關於您的問題的「Rx」。

如果國防部整個事情勢在必行一套循環的正常工作

private static async Task<IList<byte>> GetChecksums() 
{ 
    var bytes = new List<byte>(); 
    foreach (var path in Directory.EnumerateFiles("FolderWithZips")) 
    { 
     using (var archive = CreateZipArchive(path)) 
     { 
      foreach (var entry in archive.Entries) 
      { 
       using (var stream = entry.Open()) 
       { 
        var checksum = await CalculateChecksum(stream, entry.Length); 
        bytes.Add(checksum); 
       } 
      } 
     } 
    } 

    return bytes; 
} 

所以我會想象你有一組的比賽條件(並行)和/或亂序處理的問題。

+0

我認爲Observable.Using會以正確的順序處理流的處理,所以我不會得到ObjectDisposedExceptions。我是否錯誤地使用了它,或者它與問題的本質有着內在聯繫? (同時從ZipArchive中讀取) – SuperJMN

+1

Observable.Using將在序列終止時處理由提供的工廠創建的資源(處置/錯誤/完成)。但這一切都是學術性的,因爲你迫使Rx成爲一個沒有反應能力的問題。這是我看到的主要問題是外圍問題是通過不向2(不必要的)ToObservable()調用提供IScheduler而引入線程化問題 –

2

Rx可能不是最適合這個的。說實話,你甚至可以做到沒有異步。

Directory.EnumerateFiles("FolderWithZips") 
     .AsParallel() 
     .Select(folder => CalculateChecksum(folder)) 
     .ToList() 
+0

呃,CalculateChecksum只是一個簡化問題的例子。在我的現實生活中,這是一個異步方法,我無法修改(第三方)。它如何修改你的方法? (異步) – SuperJMN