2012-10-01 12 views
2

我有一種涉及多線程的棘手問題。我所做的是使用一個線程池(ExecutorService),負責打開連接並將它們放入LinkedBlockingQueue從URL存儲原始HTML內容,然後從內存中獲取InputStream(不使用連接)

到目前爲止,我已經使用:

//run method in "getter threads" 
public void run() { 

    try { 

    URL url = new URL(url_s); //url_s is given as a constructor argument 

    //if I am correct then url.openStream will wait until we have the content 
    InputStream stream = url.openStream(); 

    Request req = new Request(); //a class with two variables: 
    req.html_stream = new InputSource(stream); 
    req.source = stream; 

    //this is a class variable (LinkedBlockingQueue<Request>) 
    blocking_queue.put(req); 

    } catch (Exception ex) { 
    logger.info("Getter thread died from an exeption",ex); 
    return; 
    } 
} 

然後我有消費者線程(java.lang.Thread的),其採用這些InputSource S和InputStream S和作用:

public void run() { 
    while(running) { 
     try { 
      logger.info("waiting for data to eat"); 
      Request req = blocking_queue.take(); 
      if(req.html_stream != null) 
      eat_data(req); 
     } catch (Exception ex) { 
      logger.error(ex); 
      return; 
     } 
    } 
} 

凡eat_data電話一個採用InputSource的外部庫。庫使用單例實例來處理,所以我不能把這一步放在「getter」線程中。

當我測試這個少量數據的代碼時,它工作正常,但是當我提供了幾千個URL時,我開始有實際問題。找到確切的錯誤並不容易,但我懷疑連接在消費者線程到達之前超時,有時甚至導致死鎖。

我這樣實現它,因爲它很容易從url.openStream()轉到InputSource,但我意識到我真的必須在本地存儲數據才能工作。

如何從url.openStream()獲取某個對象,我可以將其存儲在我的LinkedBlockingQueue(內存中的所有數據)中,以便我的消費者線程有時間處理它時可以轉換爲InputSoruce?

+3

就個人而言,我不會打開連接,直到您將它們。我將URL傳遞給'Request'對象,並在需要時打開連接。這也意味着你可以控制連接的關閉(你打開它,你負責關閉連接)。恕我直言 – MadProgrammer

+0

@MadProgrammer但這會消除需要等待來自連接的答覆?這就是我做多線程的原因。使用「getter」線程以便處理器線程不需要等待IO(例如http響應)。 – user1443778

+0

不,這樣的線程的原因是減少了否則會消耗的活動連接(和資源)的數量。只有在需要時纔打開連接,完成後關閉連接。在Windows上,您遇到問題的速度很快,因爲它可以打開的活動連接的數量非常嚴格,正如您所指出的,無論如何您都會遇到超時。我個人使用'Request'來打開,讀取,存儲和關閉URL中的內容。然後,我將它放在一個處理器線程的隊列中,以便它可以自由地進行處理。 – MadProgrammer

回答

2

您可以將URL的內容複製到ByteArrayOutputStream並關閉URL流。然後將ByteArrayInputStream存儲在隊列中。

僞代碼:

InputStream in = null; 
try { 
    in = url.openStream(); 
    ByteArrayOutputStream buffer = new ByteArrayOutputStream(); 
    IOUtils.copy(in, buffer); 

    ByteArrayInputStream bin = new ByteArrayInputStream(buffer.toByteArray()); 
    queue.put(bin); 
} 

參考文獻:

  1. java.io.ByteArrayInputStream
  2. java.io.ByteArrayOutputStream
  3. org.apache.commons.io.IOUtils.IOUtils