2015-10-27 106 views
0

我讀取目錄中的一組json文件來檢查name屬性並填充HashMap<Name,List<File_path>>。有沒有辦法讓這個過程更快地使用線程,並且它們將並行執行,還是一次只能讀取一個文件?它會導致填充HashMap的同步問題?當前的代碼如下用Java讀取多線程文件

Map<String, List<String>> map = new HashMap<String, List<String>>(); 
    JsonParser parser = new JsonParser(); 
    File[] files = new File(dir).listFiles(); 
    for (File tfile : files) { 
     Object obj = parser.parse(new FileReader(tfile.getAbsolutePath())); 
     JsonObject jsonObject = (JsonObject) obj; 
     JsonArray array = (JsonArray) jsonObject.get("array"); 
     String name = array.get(0).getAsJsonObject().get("name").toString(); 
     if (map.containsKey(name)) { 
      List<String> paths = map.get(name); 
      paths.add(tfile.getAbsolutePath()); 
      map.put(name, paths); 
     } else { 
      List<String> paths = new LinkedList<String>(); 
      paths.add(tfile.getAbsolutePath()); 
      map.put(name, paths); 
     } 
    } 
+4

最有可能你不會得到很多的並行化代碼的快樂。文件系統是一個瓶頸,所以除非解析需要很長時間(在這種情況下,你可以分開讀取和解析),否則你只會得到更復雜的代碼。它有多慢,你有沒有簡介應用程序? – Kayaman

+0

任何性能問題的第一步:剖析代碼。在這種情況下,可能性是IO界限(假設這些文件位於同一磁盤通道上),線程可能無濟於事。 – mpez0

+0

@Kayaman:由於文件非常小,解析部分很快。不,我還沒有開始分析該應用程序,但我認爲如果目錄中有大量文件,可能會比較慢,因爲您提到文件系統是一個瓶頸。 – prem89

回答

1

我做了與你相似(讀分佈在多個線程3000個小文件)一個簡單的PC上的Windows 7和Java 1.7的實驗。

file read with buffering and multithread comparation

正如你所看到的,性能得到了大大提高(高達69%),只有5個線程。

我還將緩衝區大小作爲參數包含在比較中,但正如您所看到的,其影響並不顯着(這是由於我的文件每個都是1-8 Kb)。

想到改善程序的最後一件事是預先將HashMap的大小設置爲最大最終大小的70%。

編輯

這裏是我的代碼:

package demo; 

import java.io.BufferedInputStream; 
import java.io.File; 
import java.io.FileInputStream; 
import java.io.FileNotFoundException; 
import java.io.FilenameFilter; 
import java.io.IOException; 
import java.io.InputStream; 
import java.util.ArrayList; 
import java.util.Collection; 
import java.util.regex.Pattern; 

import com.sun.japex.TestCase; 

public abstract class AbstractDirectoryFilesReadDriver extends com.sun.japex.JapexDriverBase 
{ 
    private final int bufferSize; 

    protected AbstractDirectoryFilesReadDriver(int bufferSize) 
    { 
     super(); 
     this.bufferSize=bufferSize; 
    } 

    @Override 
    public void run(TestCase testCase) 
    { 
     int numberOfThreads=testCase.getIntParam("number_threads"); 
     final String mask=testCase.getParam("filename_mask"); 
     File dir=new File(testCase.getParam("dir")); 
     long totalSize=testCase.getLongParam("total_size"); 
     File[] list=dir.listFiles(new FilenameFilter() 
     { 
      @Override 
      public boolean accept(File dir, String name) 
      { 
       return Pattern.matches(mask, name); 
      } 
     }); 

     // Split the list of files between a number of threads. 
     Collection<Collection<File>> collections=splitDirList(list, numberOfThreads); 

     // Start the threads and let every one read its subset of files. 
     Collection<Thread> threads=new ArrayList<Thread>(numberOfThreads); 
     Collection<MyRunnable> runnables=new ArrayList<MyRunnable>(numberOfThreads); 
     for (Collection<File> collection : collections) 
     { 
      MyRunnable runnable=new MyRunnable(collection); 
      runnables.add(runnable); 
      Thread thread=new Thread(runnable); 
      threads.add(thread); 
      thread.start(); 
     } 
     try 
     { 
      for (Thread thread : threads) 
      { 
       thread.join(); 
      } 

      // Check the read size: Ensure that all the files have been fully read: 
      long size=0; 
      for (MyRunnable runnable : runnables) 
      { 
       size+=runnable.getSize(); 
      } 

      System.out.println("numberOfThreads=" + numberOfThreads + ", size=" + size); 
      if (size != totalSize) 
      { 
       throw new RuntimeException("Size check failed: expected size=" + totalSize + ", read size=" + size); 
      } 
     } 
     catch (InterruptedException e) 
     { 
      throw new Error(e); 
     } 
    } 

    private Collection<Collection<File>> splitDirList(File[] list, int numberOfParts) 
    { 
     int n=0; 
     Collection<Collection<File>> collection=new ArrayList<Collection<File>>(numberOfParts); 
     int load=(int)Math.ceil(list.length/(double)numberOfParts); 
     for (int i=0; i < numberOfParts; i++) 
     { 
      Collection<File> part=new ArrayList<File>(load); 
      for (int j=0; j < load && n < list.length; j++) 
      { 
       part.add(list[n++]); 
      } 
      collection.add(part); 
     } 
     return collection; 
    } 

    private long readFiles(Collection<File> files) 
     throws FileNotFoundException, 
     IOException 
    { 
     long size=0; 
     for (File file : files) 
     { 
      size+=readInputStream(createInputStream(file)); 
     } 
     return size; 
    } 

    private InputStream createInputStream(File file) 
     throws FileNotFoundException 
    { 
     InputStream input=new FileInputStream(file); 
     if (this.bufferSize > 0) 
     { 
      input=new BufferedInputStream(input, this.bufferSize); 
     } 
     return input; 
    } 

    /** 
    * Reads fully a inputStream. 
    * 
    * @param input InputStream. 
    * @return Content as String. 
    * @exception java.io.IOException If an error occured while reading. 
    */ 
    private static long readInputStream(java.io.InputStream input) 
     throws java.io.IOException 
    { 
     long size=0; 
     byte[] buffer=new byte[4096]; 
     int n; 
     do 
     { 
      n=input.read(buffer); 
      if (n > 0) 
      { 
       size+=n; 
      } 
     } 
     while (n >= 0); 
     return size; 
    } 

    private class MyRunnable implements Runnable 
    { 
     private final Collection<File> files; 

     public MyRunnable(Collection<File> files) 
     { 
      super(); 
      this.files=files; 
     } 

     private long size; 

     public long getSize() 
     { 
      return this.size; 
     } 

     @Override 
     public void run() 
     { 
      try 
      { 
       this.size=readFiles(this.files); 
      } 
      catch (IOException e) 
      { 
       throw new Error(e); 
      } 
     } 
    } 
} 

public class UnbufferedDirectoryFilesReadDriver extends AbstractDirectoryFilesReadDriver 
{ 
    public UnbufferedDirectoryFilesReadDriver() 
    { 
     super(0); 
    } 
} 

public class Buffered4096DirectoryFilesReadDriver extends AbstractDirectoryFilesReadDriver 
{ 
    public Buffered4096DirectoryFilesReadDriver() 
    { 
     super(4096); 
    } 
} 

public class Buffered8192DirectoryFilesReadDriver extends AbstractDirectoryFilesReadDriver 
{ 
    public Buffered8192DirectoryFilesReadDriver() 
    { 
     super(8192); 
    } 
} 

的JAPEX配置文件:

<testSuite name="DirectoryFilesReadDriver" xmlns="http://www.sun.com/japex/testSuite"> 
    <param name="japex.classPath" value="target/test-classes" /> 
    <param name="japex.chartType" value="barchart"/> 
    <param name="japex.warmupIterations" value="1"/> 
    <param name="japex.runIterations" value="1"/> 
    <param name="japex.runsPerDriver" value="1"/> 
    <param name="number_threads" value="1"/> 
    <param name="japex.resultUnit" value="ms"/> 

    <param name="dir" value="c:\myfiles" /> 
    <param name="filename_mask" value=".*.txt" /> 
    <!-- You must specify here the total expected size of all the files --> 
    <param name="total_size" value="..." /> 

    <driver name="demo.UnbufferedDirectoryFilesReadDriver"> 
     <param name="japex.DriverClass" value="demo.UnbufferedDirectoryFilesReadDriver" /> 
    </driver> 
    <driver name="demo.Buffered4096DirectoryFilesReadDriver"> 
     <param name="japex.DriverClass" value="demo.Buffered4096DirectoryFilesReadDriver" /> 
    </driver> 
    <driver name="demo.Buffered8192DirectoryFilesReadDriver"> 
     <param name="japex.DriverClass" value="demo.Buffered8192DirectoryFilesReadDriver" /> 
    </driver> 

    <testCase name="threads-01"> 
     <param name="number_threads" value="1" /> 
    </testCase> 
    <testCase name="threads-02"> 
     <param name="number_threads" value="2" /> 
    </testCase> 
    <testCase name="threads-05"> 
     <param name="number_threads" value="5" /> 
    </testCase> 
    <testCase name="threads-10"> 
     <param name="number_threads" value="10" /> 
    </testCase> 
    <testCase name="threads-20"> 
     <param name="number_threads" value="20" /> 
    </testCase> 
</testSuite> 

...和POM:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
    <modelVersion>4.0.0</modelVersion> 
    <groupId>dev</groupId> 
    <artifactId>demo-readfiles-multithread</artifactId> 
    <version>0.0.1-SNAPSHOT</version> 
    <name>demo-readfiles-multithread</name> 
    <dependencies> 
     <dependency> 
      <groupId>junit</groupId> 
      <artifactId>junit</artifactId> 
      <version>4.8.2</version> 
     </dependency> 
     <dependency> 
      <groupId>com.sun.japex</groupId> 
      <artifactId>japex</artifactId> 
      <version>1.2.3</version> 
      <scope>test</scope> 
     </dependency> 
    </dependencies> 
    <properties> 
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 
    </properties> 
    <build> 
     <plugins> 
      <plugin> 
       <groupId>org.apache.maven.plugins</groupId> 
       <artifactId>maven-compiler-plugin</artifactId> 
       <version>2.3.2</version> 
       <configuration> 
        <compilerVersion>1.7</compilerVersion> 
        <source>1.7</source> 
        <target>1.7</target> 
       </configuration> 
      </plugin> 
      <plugin> 
       <groupId>com.sun.japex</groupId> 
       <artifactId>japex-maven-plugin</artifactId> 
       <version>1.2.3</version> 
       <executions> 
        <execution> 
         <id>japex</id> 
         <goals> 
          <goal>japex</goal> 
         </goals> 
        </execution> 
       </executions> 
       <configuration> 
        <reportDirectory>${project.build.directory}/japex-reports</reportDirectory> 
        <html>true</html> 
        <japexConfigFiles> 
         <japexConfigFile>${basedir}/scripts/DirectoryFilesReadDriver.japex.xml</japexConfigFile> 
        </japexConfigFiles> 
       </configuration> 
      </plugin> 
     </plugins> 
    </build> 
</project> 
+0

所以有線程絕對提高了性能!你介意分享你使用線程創建的實驗代碼嗎? – prem89

+0

corse。我已將它列入我的文章。 –

1
  • 最可能的是,更多的線程不會幫助你在這裏 - 從磁盤讀取(甚至是SSD)的文件比提取名
  • 是慢得多,它需要你同步寫入地圖或使用Collections.synchronizedMap()
1

這完全是ExecutorService和一般併發框架的任務。使用ConcurrentHashMap來提高併發性和Java 8新的收集方法來避免不必要的ifs。像這樣的東西應該最大限度地提高性能:

Map<String, List<String>> map = new ConcurrentHashMap<String, List<String>>(); 
    JsonParser parser = new JsonParser(); 
    ExecutorService executorService = Executors.newCachedThreadPool(); 
    List<Future<?>> futures = new LinkedList<>(); 
    File[] files = new File(dir).listFiles(); 
    for (File tfile : files) { 
     futures.add(executorService.execute(() -> { 
      Object obj = parser.parse(new FileReader(tfile.getAbsolutePath())); 
      JsonObject jsonObject = (JsonObject) obj; 
      JsonArray array = (JsonArray) jsonObject.get("array"); 
      String name = array.get(0).getAsJsonObject().get("name").toString(); 
      map.computeIfAbsent(name, (String name) -> Collections.synchronizedList(new LinkedList<String>())).add(paths); 
     }); 
    } 
    for (Future<?> future: futures) { 
     try { 
      future.get(); 
     } 
     catch (InterruptedException e) { 
      throw new RuntimeException(e); 
     } 
     catch (ExecutionException e) { 
      throw new RuntimeException(e); 
     } 
    }