2012-03-28 28 views
2

我正在使用Apache HTTPClient 4以默認級別訪問連接到Twitter的流式API。它工作得很好的開始,但是檢索數據幾分鐘後撈出與此錯誤:使用apache httpclient增量處理twitter的流api?

2012-03-28 16:17:00,040 DEBUG org.apache.http.impl.conn.SingleClientConnManager: Get connection for route HttpRoute[{tls}->http://myproxy:80->https://stream.twitter.com:443] 
2012-03-28 16:17:00,040 WARN com.cloudera.flume.core.connector.DirectDriver: Exception in source: TestTwitterSource 
java.lang.IllegalStateException: Invalid use of SingleClientConnManager: connection still allocated. 
    at org.apache.http.impl.conn.SingleClientConnManager.getConnection(SingleClientConnManager.java:216) 
Make sure to release the connection before allocating another one. 
    at org.apache.http.impl.conn.SingleClientConnManager$1.getConnection(SingleClientConnManager.java:190) 

我明白爲什麼我面對這個問題。我正嘗試在水槽集羣中使用這個HttpClient作爲水槽來源。代碼如下所示:

public Event next() throws IOException, InterruptedException { 

    try { 

     HttpHost target = new HttpHost("stream.twitter.com", 443, "https"); 
     new BasicHttpContext(); 
     HttpPost httpPost = new HttpPost("/1/statuses/filter.json"); 
     StringEntity postEntity = new StringEntity("track=birthday", 
       "UTF-8"); 
     postEntity.setContentType("application/x-www-form-urlencoded"); 
     httpPost.setEntity(postEntity); 
     HttpResponse response = httpClient.execute(target, httpPost, 
       new BasicHttpContext()); 
     BufferedReader reader = new BufferedReader(new InputStreamReader(
       response.getEntity().getContent())); 
     String line = null; 
     StringBuffer buffer = new StringBuffer(); 
     while ((line = reader.readLine()) != null) { 
      buffer.append(line); 
      if(buffer.length()>30000) break; 
     } 
     return new EventImpl(buffer.toString().getBytes()); 
    } catch (IOException ie) { 
     throw ie; 
    } 

} 

我試圖緩衝的響應流30,000個字符到一個StringBuffer,然後返回該所收到的數據。我顯然沒有關閉連接 - 但我不想關閉它,但我猜。 Twitter的開發者指南談到這個here它讀取:

Some HTTP client libraries only return the response body after the connection has been closed by the server. These clients will not work for accessing the Streaming API. You must use an HTTP client that will return response data incrementally. Most robust HTTP client libraries will provide this functionality. The Apache HttpClient will handle this use case, for example.

它清楚地告訴你的HttpClient將逐步返回響應數據。我已經通過了示例和教程,但是我沒有發現任何接近這樣做的事情。如果你們使用了httpclient(如果不是apache)並逐漸閱讀twitter的streaming api,請告訴我你是如何實現這一壯舉的。那些沒有,請隨時爲答案作出貢獻。 TIA。

UPDATE

我試圖這樣做:1)I移動獲得流句柄水槽源的打開方法。 2)使用一個簡單的Inpustream並將數據讀入一個字節緩衝區。因此,這裏是什麼方法體貌似現在:

 byte[] buffer = new byte[30000]; 

     while (true) { 
      int count = instream.read(buffer); 
      if (count == -1) 
       continue; 
      else 
       break; 
     } 
     return new EventImpl(buffer); 

此作品在一定程度上 - 我得到的鳴叫,他們是很好的被寫入到目標。問題出在instream.read(buffer)返回值。即使流中沒有數據,並且緩衝區具有默認的\ u000000字節和其中的30,000個字節,也會將此值寫入目標。所以目標文件看起來像這樣..「tweets..tweets..tweeets .. \ u0000 \ u0000 \ u0000 u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u d uw uw uw uw uw tweets。我知道計數不會返回-1,因爲這是一個永不結束的流,所以如何判斷緩衝區是否有來自讀命令的新內容?

+0

您是否試圖捕獲#close方法拋出的I/O異常?我相應地更新了我的答案。 – oleg 2012-03-31 09:35:02

+0

另外,\ u0000 \ u0000 ... bytes/null字節不在流中 - 當我實例化一個帶有30k個字符的緩衝區時,這些是默認字節,當流內容小於30k個字符時,剩下的字符是空字節。 – Jay 2012-04-03 08:38:39

回答

0

事實證明,這是一個水槽問題。 Flume經過優化以傳輸大小爲32kb的事件。超過32kb的任何內容,Flume都會退出。 (解決方法是調整事件大小大於32KB)。所以,我已經改變了我的代碼,至少緩存了20,000個字符。它有點作品,但它不是很好的證明。如果緩衝區長度超過32kb,這仍然可能會失敗,但是,在一小時的測試中它仍然沒有失敗 - 我相信這與Twitter不會在其公共流上發送大量數據有關。

while ((line = reader.readLine()) != null) { 
      buffer.append(line); 
      if(buffer.length()>20000) break; 
     } 
0

問題是您的代碼泄漏了連接。請確保無論您關閉內容流還是中止請求。

InputStream instream = response.getEntity().getContent(); 
    try { 
     BufferedReader reader = new BufferedReader(
       new InputStreamReader(instream)); 
     String line = null; 
     StringBuffer buffer = new StringBuffer(); 
     while ((line = reader.readLine()) != null) { 
      buffer.append(line); 
      if (buffer.length()>30000) { 
       httpPost.abort(); 
       // connection will not be re-used 
       break; 
      } 
     } 
     return new EventImpl(buffer.toString().getBytes()); 
    } finally { 
     // if request is not aborted the connection can be re-used 
     try { 
      instream.close(); 
     } catch (IOException ex) { 
      // log or ignore 
     } 
    } 
+0

nope。不工作。 Flume抱怨流已經關閉 - 甚至在開始任何處理之前都排除在外。 – Jay 2012-03-30 02:48:45

+0

#close()方法引發異常,可以安全地忽略。 – oleg 2012-03-30 06:48:52

相關問題