2012-10-10 68 views
2

讀書時,我是新來的在Java併發編程。的BufferedReader的ReadLine在一個線程

我需要閱讀,分析和處理一個非常快速增長的日誌文件,所以我一定是 快。 我的想法是讀取文件(逐行)並且在匹配相關的行我想 傳遞者行單獨的線程,可以上線做進一步的處理。 我在下面的示例代碼中調用了這些線程「IOThread」。

我的問題是,在IOthread.run()的BufferedReader中的readline顯然從未返回。 什麼是讀取線程內流的工作方式? 有沒有比以下更好的方法?

import java.io.BufferedReader; 
import java.io.FileReader; 
import java.io.InputStream; 
import java.io.InputStreamReader; 
import java.io.PipedInputStream; 
import java.io.PipedOutputStream; 

class IOThread extends Thread { 
    private InputStream is; 
    private int t; 

    public IOThread(InputStream is, int t) { 
     this.is = is; 
     this.t = t; 
     System.out.println("iothread<" + t + ">.init"); 
    } 

    public void run() { 
     try { 
      System.out.println("iothread<" + t + ">.run"); 
      String line; 

      BufferedReader streamReader = new BufferedReader(new InputStreamReader(is)); 
      while ((line = streamReader.readLine()) != null) { 
       System.out.println("iothread<" + t + "> got line " + line); 
      } 
      System.out.println("iothread " + t + " end run"); 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 
    } 
} 

public class Stm { 
    public Stm(String filePath) { 
     System.out.println("start"); 

     try { 
      BufferedReader reader = new BufferedReader(new FileReader(filePath)); 

      PipedOutputStream po1 = new PipedOutputStream(); 
      PipedOutputStream po2 = new PipedOutputStream(); 
      PipedInputStream pi1 = new PipedInputStream(po1); 
      PipedInputStream pi2 = new PipedInputStream(po2); 
      IOThread it1 = new IOThread(pi1,1); 
      IOThread it2 = new IOThread(pi2,2); 

      it1.start(); 
      it2.start(); 
//   it1.join(); 
//   it2.join(); 

      String line; 
      while ((line = reader.readLine()) != null) { 
       System.out.println("got line " + line); 

       if (line.contains("aaa")) { 
        System.out.println("passing to thread 1: " + line); 
        po1.write(line.getBytes()); 
       } else if (line.contains("bbb")) { 
        System.out.println("passing to thread 2: " + line); 
        po2.write(line.getBytes()); 
       } 
      } 
      reader.close(); 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 
    } 

    public static void main(String[] args) { 
     new Stm(args[0]); 
    } 

} 

一個例子的輸入文件是:

line 1 
line 2 
line 3 aaa ... 
line 4 
line 5 bbb ... 
line 6 aaa ... 
line 7 
line 8 bbb ... 
line 9 bbb ... 
line 10 

呼叫上面的代碼與輸入文件作爲參數的文件名。

+0

可能是'PipedInputStream'期待更多的字節被讀取......你能關閉()''PipedOutputStream'來測試嗎? – SJuan76

+0

哦,無論如何,我與Sanjay ...使用來自內部進程通信的輸入流和輸出流是凌亂的。將受影響的行傳遞給線程不是更好嗎? – SJuan76

+1

我在'while((line = streamReader.readLine())!= null)'line in run()方法中得到'java.io.IOException:寫入結束死亡異常。 –

回答

2

恕我直言,你已完成了倒退。爲「處理」內容創建多個線程,而不是從文件中讀取數據。當從文件讀取數據時,您無論如何都是瓶頸,因此擁有多個線程並沒有什麼區別。最簡單的解決方案是在給定線程中儘可能快地讀取線,並將線存儲在共享隊列中。然後可以通過任意數量的線程訪問該隊列以執行相關處理。

這樣,當I/O或讀取器線程忙於讀取/等待數據時,您實際上可以執行併發處理。如果可能,在閱讀器線程中將「邏輯」保持在最小值。只要閱讀這些行,讓工作線程完成真正的繁重工作(匹配模式,進一步處理等)。只需要一個線程安全的隊列,你應該是猶太教。

編輯:使用BlockingQueue的一些變體,無論是基於數組或基於鏈表。

+0

感謝您的輸入。 我的例子被簡化了。在現實生活中,線程必須做更多的事情,所以我的瓶頸不是閱讀,而是處理。 – pitseeker

+0

@pitseeker:好的,這就是我想要做的。不要創建多個IOThreads。創建一組從共享隊列讀取的工作線程。該隊列將由您的單個IOThread填充。 –

+0

謝謝桑傑 - 我會試試看。 – pitseeker

3

由於以下原因,您在iothread中的讀者始終處於第一次循環的頭部: 您從STM線程傳遞讀取行的內容,但不添加新行字符(\ n)。由於緩衝讀取器等待一個新的行字符(如.readLine()),它會一直等待。你可以修改你的代碼是這樣的:

if (line.contains("aaa")) { 
       System.out.println("passing to thread 1: " + line); 
       byte[] payload = (line+"\n").getBytes(); 
       po1.write(payload); 
      } else if (line.contains("bbb")) { 
       System.out.println("passing to thread 2: " + line); 
       byte[] payload = (line+"\n").getBytes(); 
       po2.write(payload); 
      } 

但是我不得不說,這是不是在所有的一流解決方案,你可以使用一個阻塞隊列或類似的內容提供您IOThreads東西。這樣你就可以避免把你的輸入轉換成字符串轉換爲字符串並返回字符串(不是說擺脫所有的流)。

+0

Yesss !!我正在寫同樣的答案,邁克爾是對的。 –

+1

@Michael我在'while((line = streamReader.readLine())!= null)'方法中得到'java.io.IOException:寫入end dead'異常 – 2012-10-10 11:58:52

+1

@ AK7:請參閱http:///techtavern.wordpress.com/2008/07/16/whats-this-ioexception-write-end-dead/正如我所說,我鼓勵你擺脫流,並使用阻塞隊列和一些工作人員。 – Michael

相關問題