我流的流動,其目標是計算簡單「校驗」的內容在一組的.zip文件。Observable.Using和異步流越來越損壞的數據
要做到這一點,我已經設置了觀察到的是:
- 發生在一個給定的文件夾
- 讀取每個文件的內容中的所有文件(閱讀爲
ZipArchive
) - 每個條目在每個文件中,執行校驗和
爲了說明它的計算中,我已創建這個前充足:
通知AsyncContext.Run
(https://stackoverflow.com/a/9212343/1025407)的使用,使Main
方法等待GetChecksum
因爲它是一個控制檯應用程序
namespace DisposePoC
{
using System.Collections.Generic;
using System.IO;
using System.IO.Compression;
using System.Reactive.Linq;
using Nito.AsyncEx;
using System.Linq;
using System.Threading.Tasks;
class Program
{
private static void Main()
{
AsyncContext.Run(GetChecksums);
}
private static async Task<IList<byte>> GetChecksums()
{
var bytes = Directory.EnumerateFiles("FolderWithZips")
.ToObservable()
.SelectMany(path => Observable.Using(() => CreateZipArchive(path), archive => archive.Entries.ToObservable()))
.SelectMany(entry => Observable.Using(entry.Open, stream => Observable.FromAsync(() => CalculateChecksum(stream, entry.Length))));
return await bytes.ToList();
}
private static ZipArchive CreateZipArchive(string path)
{
return new ZipArchive(new FileStream(path, FileMode.Open, FileAccess.Read));
}
private static async Task<byte> CalculateChecksum(Stream stream, long entryLength)
{
var bytes = await GetBytesFromStream(stream, entryLength);
return bytes.Aggregate((b1, b2) => (byte) (b1^b2));
}
private static async Task<byte[]> GetBytesFromStream(Stream stream, long entryLength)
{
byte[] bytes = new byte[entryLength];
await stream.ReadAsync(bytes, 0, (int)entryLength);
return bytes;
}
}
}
運行應用程序,我得到的各種錯誤:
'System.IO.InvalidDataException':本地文件頭損壞。 'System.NotSupportedException':Stream不支持讀取。 'System.ObjectDisposedException':無法訪問處置的對象。 'System.IO.InvalidDataException':塊長度與其補碼不匹配。
我在做什麼錯?
觀察本身是否存在問題,還是因爲ZipArchive
不是線程安全的?如果不是,我如何使代碼工作?
我會對此做一個評論,因爲我目前無法驗證代碼內容,但我懷疑問題是在第一個SelectMany中創建的ZipArchive正在被Using語句處置然後才能閱讀下一行中的入口流 - 實質上,一次性範圍確定是錯誤的。我會將第二個SelectMany的邏輯移到第一個。我還會驗證您的測試數據沒有被破壞,正如第一個例外所示。 – Andrew
我想我明白你的觀點。但是,如果範圍是錯誤的,那麼我如何修改代碼以避免處理每個ZipArchive,直到處理完所有條目?它甚至有可能嗎? – SuperJMN