2014-03-28 95 views
0

我正在寫一個程序,需要從一個非常大的文件(400K +行)中讀取行,並將每行中的數據發送到Web服務。我決定嘗試線程化,並且看到我沒有想到的一些行爲,看起來像我的BufferedReader開始重用它在給它調用readline()時已經給出的行。從同一文件/奇怪行爲讀取多個線程

我的程序由兩個類組成。一個「Main」類,它啓動線程並保存對BufferedReader的靜態引用,並具有靜態同步的「readNextLine()」方法,線程可以使用該方法基本上調用BufferedReder上的readLine()。和「Runnable」類調用readNextLine()並使用來自每個readNextLine()調用的數據進行webservice調用。我創建了BufferedReader和readNextLine()靜態,因爲除了將主類的實例傳遞到線程之外,我只能想到線程共享讀者的唯一方法,我不確定哪個更好。

大約5分鐘後,我開始在我的web服務中看到錯誤,表示它正在處理它已經處理過的一行。我能夠驗證線路確實是多次分開發送的。

有沒有人有任何想法,爲什麼BufferedReader似乎給線程它已經讀取的線?我的印象是readline()是連續的,我所需要做的就是確保對readline()的調用是同步的。

我會在下面展示一些主類代碼。 runnable本質上是一個while循環,它調用readNextLine()並處理每一行,直到沒有剩下的行。

主要類:

//showing reader and thread creation 
inputStream = sftp.get(path to file); 
reader = new BufferedReader(new InputStreamReader(inputStream)); 

ExecutorService executor = Executors.newFixedThreadPool(threads); 
     Collection<Future> futures = new ArrayList<Future>(); 
     for(int i=0;i<threads;i++){ 
     MyRunnable runnable = new MyRunnable(i); 
      futures.add(executor.submit(runnable)); 
     } 
     LOGGER.debug("futures.get()"); 
     for(Future f:futures){ 
      f.get(); //use to wait until all threads are done 
     } 

public synchronized static String readNextLine(){ 
    String results = null; 
    try{ 
     if(reader!=null){ 
     results = reader.readLine(); 
     } 
    }catch(Exception e){ 
     LOGGER.error("Error reading from file"); 
    } 

    return results; 
} 
+1

我認爲你需要使用'RandomAccessFile'並讓每個線程讀取從一個不同的偏移量,儘管我會使用單個線程來讀取文件的塊和每個塊讀取,拆分多個線程來聯繫您的web服務與塊的一部分。 –

+0

其實我只是偶然發現了這個,http://docs.oracle.com/javase/7/docs/api/java/nio/channels/AsynchronousFileChannel.html我認爲它可能會做你想要做的事情,如果Java 7是一個選項。 –

回答

0

我測試你說什麼,但我發現你在readNextLine()方法得到一個錯誤的邏輯,怎麼能reader.readLine()被調用的結果是null和if條件是不是null?

現在,我完成了我的演示,而且似乎效果很好,以下是演示,無需重新讀取線發生了:

static BufferedReader reader; 

public static void main(String[] args) throws FileNotFoundException, ExecutionException, InterruptedException { 
    reader = new BufferedReader(new FileReader("test.txt")); 
    ExecutorService service = Executors.newFixedThreadPool(3); 
    List<Future> results = new ArrayList<Future>(); 
    for (int i = 0; i < 3; i++) { 
     results.add(service.submit(new Runnable() { 
      @Override 
      public void run() { 
       try { 
        String line = null; 
        while ((line = readNextLine()) != null) { 
         System.out.println(line); 
        } 
       } catch (IOException e) { 
        e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. 
       } 
      } 
     })); 
    } 
} 

public synchronized static String readNextLine() throws IOException { 
    return reader.readLine(); 
}