2009-08-12 105 views
1

我遇到了一個問題,即從HttpResponseStream讀取失敗,因爲我正在打包的StreamReader讀取速度更快,響應流獲取實際響應。我正在檢索一個相當小的文件(大約60k),但將響應處理爲實際對象的解析器失敗,因爲它遇到了一個意外的字符(代碼65535),根據經驗,我知道這是從您閱讀StreamReader並沒有可用的其他字符。從HttpResponseStream讀取失敗

對於記錄,我知道返回的內容是有效的,並且會正確解析,因爲每次運行代碼時文件中的不同位置發生故障。這是parser.Load()行,它在下面失敗。

有沒有一種方法可以確保我在嘗試解析它之前已經讀取了所有內容:將響應流複製到MemoryStream或字符串中,然後處理它?

/// <summary> 
    /// Makes a Query where the expected Result is an RDF Graph ie. CONSTRUCT and DESCRIBE Queries 
    /// </summary> 
    /// <param name="sparqlQuery">SPARQL Query String</param> 
    /// <returns>RDF Graph</returns> 
    public Graph QueryWithResultGraph(String sparqlQuery) 
    { 
     try 
     { 
      //Build the Query URI 
      StringBuilder queryUri = new StringBuilder(); 
      queryUri.Append(this._endpoint.ToString()); 
      queryUri.Append("?query="); 
      queryUri.Append(Uri.EscapeDataString(sparqlQuery)); 

      if (!this._defaultGraphUri.Equals(String.Empty)) 
      { 
       queryUri.Append("&default-graph-uri="); 
       queryUri.Append(Uri.EscapeUriString(this._defaultGraphUri)); 
      } 

      //Make the Query via HTTP 
      HttpWebResponse httpResponse = this.DoQuery(new Uri(queryUri.ToString()),false); 

      //Set up an Empty Graph ready 
      Graph g = new Graph(); 
      g.BaseURI = this._endpoint; 

      //Parse into a Graph based on Content Type 
      String ctype = httpResponse.ContentType; 
      IRDFReader parser = MIMETypesHelper.GetParser(ctype); 
      parser.Load(g, new StreamReader(httpResponse.GetResponseStream())); 

      return g; 
     } 
     catch (UriFormatException uriEx) 
     { 
      //URI Format Invalid 
      throw new Exception("The format of the URI was invalid", uriEx); 
     } 
     catch (WebException webEx) 
     { 
      //Some sort of HTTP Error occurred 
      throw new Exception("A HTTP Error occurred", webEx); 
     } 
     catch (RDFException) 
     { 
      //Some problem with the RDF or Parsing thereof 
      throw; 
     } 
     catch (Exception) 
     { 
      //Other Exception 
      throw; 
     } 
    } 

    /// <summary> 
    /// Internal Helper Method which executes the HTTP Requests against the SPARQL Endpoint 
    /// </summary> 
    /// <param name="target">URI to make Request to</param> 
    /// <param name="sparqlOnly">Indicates if only SPARQL Result Sets should be accepted</param> 
    /// <returns>HTTP Response</returns> 
    private HttpWebResponse DoQuery(Uri target, bool sparqlOnly) 
    { 
     //Expect errors in this function to be handled by the calling function 

     //Set-up the Request 
     HttpWebRequest httpRequest; 
     HttpWebResponse httpResponse; 
     httpRequest = (HttpWebRequest)WebRequest.Create(target); 

     //Use HTTP GET/POST according to user set preference 
     if (!sparqlOnly) 
     { 
      httpRequest.Accept = MIMETypesHelper.HTTPAcceptHeader(); 
      //For the time being drop the application/json as this doesn't play nice with Virtuoso 
      httpRequest.Accept = httpRequest.Accept.Replace("," + MIMETypesHelper.JSON[0], String.Empty); 
     } 
     else 
     { 
      httpRequest.Accept = MIMETypesHelper.HTTPSPARQLAcceptHeader(); 
     } 
     httpRequest.Method = this._httpMode; 
     httpRequest.Timeout = this._timeout; 

     //HTTP Debugging 
     if (Options.HTTPDebugging) 
     { 
      Tools.HTTPDebugRequest(httpRequest); 
     } 

     httpResponse = (HttpWebResponse)httpRequest.GetResponse(); 

     //HTTP Debugging 
     if (Options.HTTPDebugging) 
     { 
      Tools.HTTPDebugResponse(httpResponse); 
     } 

     return httpResponse; 
    } 

編輯

爲了澄清什麼,我已經說過這是在分析器中的錯誤,這是StreamReader的讀取速度比響應流中提供數據的問題。我可以解決這個問題通過執行以下操作,但想更好或更優雅的解決方案的建議:

  //Parse into a Graph based on Content Type 
      String ctype = httpResponse.ContentType; 
      IRDFReader parser = MIMETypesHelper.GetParser(ctype); 
      Stream response = httpResponse.GetResponseStream(); 
      MemoryStream temp = new MemoryStream(); 
      Tools.StreamCopy(response, temp); 
      response.Close(); 
      temp.Seek(0, SeekOrigin.Begin); 
      parser.Load(g, new StreamReader(temp)); 

編輯2

BlockingStreamReader類按埃蒙的建議:

/// <summary> 
/// A wrapper to a Stream which does all its Read() and Peek() calls using ReadBlock() to handle slow underlying streams (eg Network Streams) 
/// </summary> 
public sealed class BlockingStreamReader : StreamReader 
{ 
    private bool _peeked = false; 
    private int _peekChar = -1; 

    public BlockingStreamReader(StreamReader reader) : base(reader.BaseStream) { } 

    public BlockingStreamReader(Stream stream) : base(stream) { } 

    public override int Read() 
    { 
     if (this._peeked) 
     { 
      this._peeked = false; 
      return this._peekChar; 
     } 
     else 
     { 
      if (this.EndOfStream) return -1; 

      char[] cs = new char[1]; 
      base.ReadBlock(cs, 0, 1); 

      return cs[0]; 
     } 
    } 

    public override int Peek() 
    { 
     if (this._peeked) 
     { 
      return this._peekChar; 
     } 
     else 
     { 
      if (this.EndOfStream) return -1; 

      this._peeked = true; 

      char[] cs = new char[1]; 
      base.ReadBlock(cs, 0, 1); 

      this._peekChar = cs[0]; 
      return this._peekChar; 
     } 
    } 

    public new bool EndOfStream 
    { 
     get 
     { 
      return (base.EndOfStream && !this._peeked); 
     } 
    } 
} 

編輯3

這是一個很好的解決方案n可以包裝任何TextReader並提供EndOfStream屬性。它使用內部緩衝區,在包裝的TextReader上使用ReadBlock()填充。所有閱讀()讀者的方法可以在使用此緩衝區定義,緩衝區大小是可配置的:

/// <summary> 
/// The BlockingTextReader is an implementation of a <see cref="TextReader">TextReader</see> designed to wrap other readers which may or may not have high latency. 
/// </summary> 
/// <remarks> 
/// <para> 
/// This is designed to avoid premature detection of end of input when the input has high latency and the consumer tries to read from the input faster than it can return data. All methods are defined by using an internal buffer which is filled using the <see cref="TextReader.ReadBlock">ReadBlock()</see> method of the underlying <see cref="TextReader">TextReader</see> 
/// </para> 
/// </remarks> 
public sealed class BlockingTextReader : TextReader 
{ 
    private char[] _buffer; 
    private int _pos = -1; 
    private int _bufferAmount = -1; 
    private bool _finished = false; 
    private TextReader _reader; 

    public const int DefaultBufferSize = 1024; 

    public BlockingTextReader(TextReader reader, int bufferSize) 
    { 
     if (reader == null) throw new ArgumentNullException("reader", "Cannot read from a null TextReader"); 
     if (bufferSize < 1) throw new ArgumentException("bufferSize must be >= 1", "bufferSize"); 
     this._reader = reader; 
     this._buffer = new char[bufferSize]; 
    } 

    public BlockingTextReader(TextReader reader) 
     : this(reader, DefaultBufferSize) { } 

    public BlockingTextReader(Stream input, int bufferSize) 
     : this(new StreamReader(input), bufferSize) { } 

    public BlockingTextReader(Stream input) 
     : this(new StreamReader(input)) { } 

    private void FillBuffer() 
    { 
     this._pos = -1; 
     if (this._finished) 
     { 
      this._bufferAmount = 0; 
     } 
     else 
     { 
      this._bufferAmount = this._reader.ReadBlock(this._buffer, 0, this._buffer.Length); 
      if (this._bufferAmount == 0 || this._bufferAmount < this._buffer.Length) this._finished = true; 
     } 
    } 

    public override int ReadBlock(char[] buffer, int index, int count) 
    { 
     if (count == 0) return 0; 
     if (buffer == null) throw new ArgumentNullException("buffer"); 
     if (index < 0) throw new ArgumentException("index", "Index must be >= 0"); 
     if (count < 0) throw new ArgumentException("count", "Count must be >= 0"); 
     if ((buffer.Length - index) < count) throw new ArgumentException("Buffer too small"); 

     if (this._bufferAmount == -1 || this._pos >= this._bufferAmount) 
     { 
      if (!this._finished) 
      { 
       this.FillBuffer(); 
       if (this.EndOfStream) return 0; 
      } 
      else 
      { 
       return 0; 
      } 
     } 

     this._pos = Math.Max(0, this._pos); 
     if (count <= this._bufferAmount - this._pos) 
     { 
      //If we have sufficient things buffered to fufill the request just copy the relevant stuff across 
      Array.Copy(this._buffer, this._pos, buffer, index, count); 
      this._pos += count; 
      return count; 
     } 
     else 
     { 
      int copied = 0; 
      while (copied < count) 
      { 
       int available = this._bufferAmount - this._pos; 
       if (count < copied + available) 
       { 
        //We can finish fufilling this request this round 
        int toCopy = Math.Min(available, count - copied); 
        Array.Copy(this._buffer, this._pos, buffer, index + copied, toCopy); 
        copied += toCopy; 
        this._pos += toCopy; 
        return copied; 
       } 
       else 
       { 
        //Copy everything we currently have available 
        Array.Copy(this._buffer, this._pos, buffer, index + copied, available); 
        copied += available; 
        this._pos = this._bufferAmount; 

        if (!this._finished) 
        { 
         //If we haven't reached the end of the input refill our buffer and continue 
         this.FillBuffer(); 
         if (this.EndOfStream) return copied; 
         this._pos = 0; 
        } 
        else 
        { 
         //Otherwise we have reached the end of the input so just return what we've managed to copy 
         return copied; 
        } 
       } 
      } 
      return copied; 
     } 
    } 

    public override int Read(char[] buffer, int index, int count) 
    { 
     return this.ReadBlock(buffer, index, count); 
    } 

    public override int Read() 
    { 
     if (this._bufferAmount == -1 || this._pos >= this._bufferAmount - 1) 
     { 
      if (!this._finished) 
      { 
       this.FillBuffer(); 
       if (this.EndOfStream) return -1; 
      } 
      else 
      { 
       return -1; 
      } 
     } 

     this._pos++; 
     return (int)this._buffer[this._pos]; 
    } 

    public override int Peek() 
    { 
     if (this._bufferAmount == -1 || this._pos >= this._bufferAmount - 1) 
     { 
      if (!this._finished) 
      { 
       this.FillBuffer(); 
       if (this.EndOfStream) return -1; 
      } 
      else 
      { 
       return -1; 
      } 
     } 

     return (int)this._buffer[this._pos + 1]; 
    } 

    public bool EndOfStream 
    { 
     get 
     { 
      return this._finished && (this._pos >= this._bufferAmount - 1); 
     } 
    } 

    public override void Close() 
    { 
     this._reader.Close(); 
    } 

    protected override void Dispose(bool disposing) 
    { 
     this.Close(); 
     this._reader.Dispose(); 
     base.Dispose(disposing); 
    } 
} 
+0

因此,在它推出九年後,你恰好是世界上第一個發現StreamReader的讀取速度比它想要讀取的Stream的速度更快的解決方案嗎? – 2009-08-12 09:33:17

+0

不,我只是想知道是否有人有任何解決方案比上述更優雅 – RobV 2009-08-12 09:40:36

+0

解決方案是什麼? StreamReader的讀取速度不及Stream的速度。 – 2009-08-12 09:49:28

回答

1

不知道具體情況,你正在使用的解析器,我只能猜測錯誤,但有一個相當容易使錯誤的.NET框架I/O庫幾乎鼓勵你...

你知道的事實,流和TextReaders可能會讀取比請求更少的字節/字符?

特別地,TextReader.Read(燒焦[]緩衝液,INT指數,詮釋計數)的文檔說:

返回值

類型:System .. ::的Int32。

已讀取的字符數。該數字將爲小於或等於計數,這取決於數據是否在流內可用。如果在沒有更多字符需要讀取時調用此方法,則返回零。

強調我的。

例如,如果您打電話給reader.Read(緩衝區,0,100)不能假定已經讀取了100個字符。

編輯:解析器很可能會假設這一點;和這解釋了你觀察到的行爲:如果你完全緩存在一個MemoryStream中的流,總會有足夠的字符滿足請求 - 但是如果你不這樣做,解析器會收到比在不可預知的時間請求更少的字符,基礎流是「緩慢」的。

EDIT2:您可以通過在解析器與TextReader.ReadBlock更換TextReader.Read()的所有實例解決你的bug()。

+0

我意識到這一點,我不確定它是否需要計數作爲StreamReader中的一個錯誤,看起來似乎是當底層流可能很慢時它的行爲。解析器不是問題,如果我使用第二個代碼片段(添加到原始問題),它在解析之前讀取整個流,解析罰款 – RobV 2009-08-12 09:25:13

+0

這是_is_解析器中具有很高可能性的錯誤。在設計上,如果基礎流是「慢」,則流讀取器返回的字符數少於請求的字符數。使用內存流作爲底層流會導致streamreader始終返回全部字符數 - 解決解析器中的錯誤。 – 2009-08-13 07:54:21

+0

解析器使用基本的tokenizer,它使用Read()方法逐字符讀取,因此您很可能是正確的,我將測試ReadBlock()並接受您的答案,如果證明解決問題 – RobV 2009-08-13 13:09:10

0

爲了支持阻塞讀的情況,而不是繼承StreamReader,你也可以繼承TextReader:這避免了與EndOfStream問題,這意味着你可以任何讀者攔截 - 不只是StreamReader S:

public sealed class BlockingReader : TextReader 
{ 
    bool hasPeeked; 
    int peekChar; 
    readonly TextReader reader; 

    public BlockingReader(TextReader reader) { this.reader = reader; } 

    public override int Read() 
    { 
     if (!hasPeeked) 
      return reader.Read(); 
     hasPeeked = false; 
     return peekChar; 
    } 

    public override int Peek() 
    { 
     if (!hasPeeked) 
     { 
      peekChar = reader.Read(); 
      hasPeeked = true; 
     } 
     return peekChar; 
    } 

    public override int Read(char[] buffer, int index, int count) 
    { 
     if (buffer == null) 
      throw new ArgumentNullException("buffer"); 
     if (index < 0) 
      throw new ArgumentOutOfRangeException("index"); 
     if (count < 0) 
      throw new ArgumentOutOfRangeException("count"); 
     if ((buffer.Length - index) < count) 
      throw new ArgumentException("Buffer too small"); 

     int peekCharsRead = 0; 
     if (hasPeeked) 
     { 
      buffer[index] = (char)peekChar; 
      hasPeeked = false; 
      index++; 
      count--; 
      peekCharsRead++; 
     } 

     return peekCharsRead + reader.ReadBlock(buffer, index, count); 
    } 

    protected override void Dispose(bool disposing) 
    { 
     try 
     { 
      if (disposing) 
       reader.Dispose(); 
     } 
     finally 
     { 
      base.Dispose(disposing); 
     } 
    } 
}