2012-02-16 59 views
13

我想在多線程的Lucene中構建我的索引。所以,我開始編寫代碼並編寫下面的代碼。首先我找到文件和每個文件,我創建一個線索來索引它。之後,我加入線程並優化索引。它可以工作,但我不確定...我可以大規模信任它嗎?有什麼辦法可以改善它嗎?使用lucene改進多線程索引

import java.io.File; 
import java.io.FileFilter; 
import java.io.FileReader; 
import java.io.IOException; 
import java.io.File; 
import java.io.FileReader; 
import java.io.BufferedReader; 
import org.apache.lucene.index.IndexWriter; 
import org.apache.lucene.document.Field; 
import org.apache.lucene.document.Document; 
import org.apache.lucene.store.RAMDirectory; 
import org.apache.lucene.analysis.standard.StandardAnalyzer; 
import org.apache.lucene.analysis.StopAnalyzer; 
import org.apache.lucene.index.IndexReader; 
import org.apache.lucene.store.Directory; 
import org.apache.lucene.store.FSDirectory; 
import org.apache.lucene.util.Version; 
import org.apache.lucene.index.TermFreqVector; 

public class mIndexer extends Thread { 

    private File ifile; 
    private static IndexWriter writer; 

    public mIndexer(File f) { 
    ifile = f.getAbsoluteFile(); 
    } 

    public static void main(String args[]) throws Exception { 
    System.out.println("here..."); 

    String indexDir; 
     String dataDir; 
    if (args.length != 2) { 
     dataDir = new String("/home/omid/Ranking/docs/"); 
     indexDir = new String("/home/omid/Ranking/indexes/"); 
    } 
    else { 
     dataDir = args[0]; 
     indexDir = args[1]; 
    } 

    long start = System.currentTimeMillis(); 

    Directory dir = FSDirectory.open(new File(indexDir)); 
    writer = new IndexWriter(dir, 
    new StopAnalyzer(Version.LUCENE_34, new File("/home/omid/Desktop/stopwords.txt")), 
    true, 
    IndexWriter.MaxFieldLength.UNLIMITED); 
    int numIndexed = 0; 
    try { 
     numIndexed = index(dataDir, new TextFilesFilter()); 
    } finally { 
     long end = System.currentTimeMillis(); 
     System.out.println("Indexing " + numIndexed + " files took " + (end - start) + " milliseconds"); 
     writer.optimize(); 
     System.out.println("Optimization took place in " + (System.currentTimeMillis() - end) + " milliseconds"); 
     writer.close(); 
    } 
    System.out.println("Enjoy your day/night"); 
    } 

    public static int index(String dataDir, FileFilter filter) throws Exception { 
    File[] dires = new File(dataDir).listFiles(); 
    for (File d: dires) { 
     if (d.isDirectory()) { 
     File[] files = new File(d.getAbsolutePath()).listFiles(); 
     for (File f: files) { 
      if (!f.isDirectory() && 
      !f.isHidden() && 
      f.exists() && 
      f.canRead() && 
      (filter == null || filter.accept(f))) { 
       Thread t = new mIndexer(f); 
       t.start(); 
       t.join(); 
      } 
     } 
     } 
    } 
    return writer.numDocs(); 
    } 

    private static class TextFilesFilter implements FileFilter { 
    public boolean accept(File path) { 
     return path.getName().toLowerCase().endsWith(".txt"); 
    } 
    } 

    protected Document getDocument() throws Exception { 
    Document doc = new Document(); 
    if (ifile.exists()) { 
     doc.add(new Field("contents", new FileReader(ifile), Field.TermVector.YES)); 
     doc.add(new Field("path", ifile.getAbsolutePath(), Field.Store.YES, Field.Index.NOT_ANALYZED)); 
     String cat = "WIR"; 
     cat = ifile.getAbsolutePath().substring(0, ifile.getAbsolutePath().length()-ifile.getName().length()-1); 
     cat = cat.substring(cat.lastIndexOf('/')+1, cat.length()); 
     //doc.add(new Field("category", cat.subSequence(0, cat.length()), Field.Store.YES)); 
     //System.out.println(cat.subSequence(0, cat.length())); 
    } 
    return doc; 
    } 

    public void run() { 
    try { 
     System.out.println("Indexing " + ifile.getAbsolutePath()); 
     Document doc = getDocument(); 
     writer.addDocument(doc); 
    } catch (Exception e) { 
     System.out.println(e.toString()); 
    } 

    } 
} 

任何hep被視爲。

回答

13

如果你想並行索引,有兩件事情可以做:

  • 並行調用addDocument,
  • 提高您的合併調度的最大線程數。

你是在正確的路徑上並行調用addDocuments,但每個文檔產生一個線程將不會擴展,因爲你需要索引的文檔數量會增加。您應該使用固定大小的ThreadPoolExecutor。由於此任務主要是CPU密集型(取決於您的分析器和您檢索數據的方式),因此將計算機的CPU數設置爲最大線程數可能是一個好的開始。

關於合併調度程序,您可以增加可用於setMaxThreadCount method of ConcurrentMergeScheduler的最大線程數。請注意,順序讀取/寫入的磁盤比隨機讀取/寫入要好得多,因此,爲合併調度程序設置的線程的最大數量太大會降低索引的速度而不是加快速度。

但在嘗試並行化索引過程之前,您應該試着找出瓶頸在哪裏。如果磁盤速度過慢,則瓶頸可能是沖洗和合並步驟,因此將調用並行化addDocument(主要在於分析文檔並在內存中緩存分析結果)不會提高索引速度在所有。

一些旁註:

  • 有一個在Lucene的開發版本的一些正在進行的工作,以提高索引並行性(沖洗部分尤其是,這blog entry解釋它是如何工作)。

  • Lucene在how to improve indexing speed上有很好的wiki頁面,您可以在其中找到其他方法來提高索引速度。

+0

我真的很感激你你對線程數量的評論是非常有用的,我之前沒有提到過...... – orezvani 2012-02-25 19:17:25

5

我認爲更現代的方法是使用ThreadPoolExecutor並提交Runnable這是做你的索引。您可以等待所有線程使用.awaitTermination或CountdownLatch終止。

我不喜歡讓你的主類擴展線程,只是創建一個可運行的內部類,在構造函數中使用它的傾向。這使得您的代碼更具可讀性,因爲線程正在執行的工作與您的應用程序設置代碼明顯分離。

關於樣式的一些注意事項,我不是讓你的主類拋出異常的大愛好者,這通常意味着你沒有清楚地瞭解你使用的不同的檢查異常情況。 。除非你有一個非常具體的原因,否則通常這是不對的。

+0

預先感謝您。其實我實現了Runnable這是一個不錯的主意,並使用ThreadPoolExecutor解決了jpountz提到的程序中的一個真正的bug。 – orezvani 2012-02-25 19:16:10

+0

「awaitTermination」的缺點是它不會等待所有線程完成,但會在n個時間單位後退出。 :-(一個循環是必要的, – 2012-06-14 19:04:20

+0

同意,這將證明IndexWriter並沒有正常關閉,並且writer_lock仍然存在,即使索引編輯器沒有操縱索引目錄 – JasonHuang 2014-06-30 01:42:01