2016-08-12 61 views
1

我在mongoDB中有4000萬個數據。我正在從收集中並行讀取這些數據,並將其處理並轉儲到另一個收集中。Java多線程性能隨線程池大小的增加而變差

作業初始化的示例代碼。

ExecutorService executor = Executors.newFixedThreadPool(10); 
int count = total_number_of_records in reading collection 
int pageSize = 5000; 
int counter = (int) ((count%pageSize==0)?(count/pageSize):(count/pageSize+1)); 
for (int i = 1; i <= counter; i++) { 
     Runnable worker = new FinalParallelDataProcessingStrategyOperator(mongoDatabase,vendor,version,importDate,vendorId,i,securitiesId); 
     executor.execute(worker); 
    } 

每個線程做

public void run() { 
    try { 
     List<SecurityTemp> temps = loadDataInBatch(); 
     populateToNewCollection(temps); 
     populateToAnotherCollection(temps); 
    } catch (IOException e) { 
     e.printStackTrace(); 
    } 
} 

加載數據是通過使用下面的查詢

mongoDB.getCollection("reading_collection").find(whereClause). 
      .skip(pagesize*(n-1)).limit(pagesize).batchSize(1000).iterator(); 

pagination code reference

機配置分頁以下的事情: CPU分別用1個核2

並行實現的性能與順序相當。 統計數據的子集(319568個記錄)

No. of Threads Execution Time(minutes) 

    1     16 
    3     15 
    8     17 
    10    17 
    15    16 
    20    12 
    50    30 

如何改善這個應用程序的性能?

+0

提高線程的數目不會自動提高性能和線程太多可能會導致開銷的問題。很難說,爲什麼你有1個相同的性能 - 10個線程,也許你的瓶頸是分貝?是一個本地數據庫? – JohnnyAW

+1

也可JVM配置,如果在只有一個核心接入那麼你不會看到任何太大起色一個孤立的環境中運行。 – Gimby

+0

是的,它是本地數據庫 – omkar1707

回答

4

由於您正在從單一來源讀取輸入數據,該部分很可能是IO界限(從應用程序的角度來看),因此並行執行並不會爲您帶來太多收益。相反,我認爲在多個線程上平行執行類似的查詢(僅使用不同的分頁)會對性能產生負面影響:在數據庫上必須多次執行相同的工作,而並行查詢可能會相互影響辦法。

另一個問題是,與讀取輸入相比,處理部分是否佔用大量時間。如果它不使用並行處理,將無助於加快速度。如果是這樣,我建議如下:

  • 使用單個查詢從數據庫中獲取數據
  • 有多個工作線程,從結果集或中間隊列中獲取數據項和處理它們。沒有必要有固定的批次,每個工人在完成前一個工件的處理後就抓取下一個可用物品。

至於線程數:最小處理時間的「最佳點」取決於處理的種類。對於沒有太多IO處理的CPU密集型任務,它很可能會在可用核心數量附近 - 在您的案例中。2.

1

多線程並不會隨着線程數量的增加而提高性能。

IO綁定的應用程序不會從多線程獲得太多收益。

這取決於很多因素。請參閱本有關SE問題:

Is multithreading faster than single thread?

即使少IO束縛,CPU密集型應用程序,不配置的線程數量龐大,以提高性能。

你可以改變你的代碼爲:

ExecutorService executor = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()); 

或(ForkJoinPool如下[從JDK工程1.8版本上病房)

ExecutorService executor = Executors.newWorkStealingPool() 

Executors API:

public static ExecutorService newWorkStealingPool() 

創建使用人一個工作竊取線程池L有效處理器作爲其目標並行LEVE