2013-06-27 34 views
4

我目前有一個程序在單線程模式下讀取文件(非常大),並創建搜索索引,但在單線程環境下索引太長。如何在多線程模式下讀取文件?

現在我試圖讓它在多線程模式下工作,但不確定實現這一點的最佳方式。

我的主程序創建一個緩衝讀取器,並將實例傳遞給線程,線程使用緩衝讀取器實例讀取文件。

我不認爲這是按預期工作,而是每個線程一次又一次地讀同一行。

有沒有辦法讓線程只讀取不被其他線程讀取的行?我需要分割文件嗎?有沒有一種方法來實現這一點,而不分裂文件?

樣品主程序:

import java.io.BufferedReader; 
import java.io.FileNotFoundException; 
import java.io.FileReader; 
import java.util.ArrayList; 

public class TestMTFile { 
    public static void main(String args[]) { 
     BufferedReader reader = null; 
     ArrayList<Thread> threads = new ArrayList<Thread>(); 
     try { 
      reader = new BufferedReader(new FileReader(
        "test.tsv")); 
     } catch (FileNotFoundException e1) { 
      e1.printStackTrace(); 
     } 
     for (int i = 0; i <= 10; i++) { 
      Runnable task = new ReadFileMT(reader); 
      Thread worker = new Thread(task); 
      // We can set the name of the thread 
      worker.setName(String.valueOf(i)); 
      // Start the thread, never call method run() direct 
      worker.start(); 
      // Remember the thread for later usage 
      threads.add(worker); 
     } 

     int running = 0; 
     int runner1 = 0; 
     int runner2 = 0; 
     do { 
      running = 0; 
      for (Thread thread : threads) { 
       if (thread.isAlive()) { 
        runner1 = running++; 
       } 
      } 
      if (runner2 != runner1) { 
       runner2 = runner1; 
       System.out.println("We have " + runner2 + " running threads. "); 

      } 
     } while (running > 0); 

     if (running == 0) { 
      System.out.println("Ended"); 
     } 
    } 
} 

主題:

import java.io.BufferedReader; 
import java.io.IOException; 

public class ReadFileMT implements Runnable { 
    BufferedReader bReader = null; 

    ReadFileMT(BufferedReader reader) { 
     this.bReader = reader; 
    } 

    public synchronized void run() { 
     String line; 
     try { 
      while ((line = bReader.readLine()) != null) { 

       try { 
        System.out.println(line); 
       } catch (Exception e) { 

       } 
      } 
     } catch (IOException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
    } 
} 
+4

你專注於錯誤的部分。這是你需要多線程的索引,而不是閱讀。 –

+2

我非常懷疑瓶頸是_read_文件。這很可能是索引。 – jtahlborn

回答

7

您的瓶頸很可能是索引,不是的文件讀取。假設您的索引系統支持多個線程,您可能需要一個生產者/消費者設置,其中一個線程讀取文件並將每行壓入BlockingQueue(生產者),以及多個線程從BlockingQueue中提取線並將它們推入索引(消費者)。

6

this thread - 如果你的文件都在同一磁盤上,那麼你不能做的比讀他們更好與單線程,雖然它可能會進程與多個線程的文件,一旦你已經讀入他們mai記憶。

0

首先,我同意@ Zim-Zam它是文件IO,而不是索引,可能是速率確定步驟。 (所以我不同意@jtahlborn)。取決於索引的複雜程度。

其次,在您的代碼中,每個線程都有自己的獨立BufferedReader。因此他們都會讀取整個文件。一種可能的解決方法是使用他們共享的單個BufferedReader。然後你需要同步BufferedReader.readLine()方法(我認爲),因爲javadoc對BufferedReader是否線程安全無語。而且,由於我認爲IO是僵化的,所以這將成爲瓶頸,我懷疑多線程是否會讓你受益匪淺。但試試看,我偶爾錯了。 :-)

p.s.我同意@jtahlmorn生產者/消費者模式比我的BufferedReader想法更好,但這對你來說會更有用。

1

如果您可以使用Java 8,則可以使用Streams API快速輕鬆地完成此操作。文件讀入到一個MappedByteBuffer,它可以打開一個文件最大爲2GB非常quicky,然後讀取的線條勾勒出緩衝液(你需要確保你的JVM有足夠的額外內存來保存文件):

package com.objective.stream; 

import java.io.BufferedReader; 
import java.io.ByteArrayInputStream; 
import java.io.FileInputStream; 
import java.io.IOException; 
import java.io.InputStreamReader; 
import java.nio.MappedByteBuffer; 
import java.nio.channels.FileChannel; 
import java.nio.file.Path; 
import java.nio.file.Paths; 
import java.util.stream.Stream; 

public class StreamsFileProcessor { 
    private MappedByteBuffer buffer; 

    public static void main(String[] args){ 
     if (args[0] != null){ 
      Path myFile = Paths.get(args[0]); 
      StreamsFileProcessor proc = new StreamsFileProcessor(); 
      try { 
       proc.process(myFile); 
      } catch (IOException e) { 
       e.printStackTrace(); 
      } 
     } 
    } 

    public void process(Path file) throws IOException { 
     readFileIntoBuffer(file); 
     getBufferStream().parallel() 
      .forEach(this::doIndex); 
    } 

    private Stream<String> getBufferStream() throws IOException { 
     try (BufferedReader reader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(buffer.array())))){ 
      return reader.lines(); 
     } 
    } 

    private void readFileIntoBuffer(Path file) throws IOException{ 
     try(FileInputStream fis = new FileInputStream(file.toFile())){ 
      FileChannel channel = fis.getChannel(); 
      buffer = channel.map(FileChannel.MapMode.PRIVATE, 0, channel.size()); 
     } 
    } 

    private void doIndex(String s){ 
     // Do whatever I need to do to index the line here 
    } 
}