2016-10-26 76 views
2

我試圖將數百萬數據行插入到數據庫中。我正在嘗試爲此使用ThreadPoolExecutor。我正在爲每個9000記錄創建一個批處理,並將批處理髮送到每個線程。在這裏,我將ThreadPool Size固定爲20.在大小增加後,它失敗了。如何檢查ThreadPoolExecutor中有多少線程可用,以及如何等待線程池有空閒線程。使用多線程將百萬數據插入到數據庫中

聽到是我的代碼,請幫助,如果我錯了。

int threadCount=10; 
     ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(threadCount); 
     int i=0; 

     StringBuffer sb=new StringBuffer(); 
     sb.append("BEGIN BATCH"); 
     sb.append(System.lineSeparator()); 
     int cnt =metaData.getColumnCount(); 
     while(rs.next()) 
     {  
      String query ="INSERT INTO "+table+" ("+columnslist.get(1)+")VALUES("+i; 
      for (int j=1 ; j <= cnt ; j++) 
      { 
       if(metaData.getColumnTypeName(j).contains("int") || metaData.getColumnTypeName(j).contains("number")) 
       { 
         query +=","+ rs.getInt(j); 
       } 
       else if(metaData.getColumnTypeName(j).contains("varchar") || metaData.getColumnTypeName(j).contains("date") || metaData.getColumnTypeName(j).contains("getTimestamp")) 
       { 
         query +=",'"+parseColumnData(rs.getString(j))+"'"; 
       } 
       else 
       { 
         query +=",'"+parseColumnData(rs.getString(j))+"'"; 
       } 
      } 
       query +=");"; 
       sb.append(query);sb.append(System.lineSeparator()); 
       if(i%9000==0) 
       { 
        sb.append("APPLY BATCH"); 
        System.out.println(threadPool.getActiveCount()); 

        Thread t = new Thread(new ExcecuteTask(sb.toString(),session)); 
        threadPool.execute(t);    
        sb.setLength(0); 
        sb.append("BEGIN BATCH"); 
        sb.append(System.lineSeparator()); 

       } 
       i++; 
      } 
      sb.append("APPLY BATCH"); 

      Thread t = new Thread(new ExcecuteTask(sb.toString(),session)); 
      threadPool.execute(t); 
      sb.setLength(0); 

      threadPool.shutdown(); 
      while (threadPool.getTaskCount() != threadPool.getCompletedTaskCount()) 
      { 
      } 

      System.out.println(table+" Loaded sucessfully"); 





public class ExcecuteTask implements Runnable 
{ 
     private String sb; 
     private Session session; 

     public ExcecuteTask(String s,Session session) 
     { 
      sb = s; 
      this.session=session; 
     } 
     public void run() 
     { 
      session.executeAsync(sb.toString()); 
     } 
} 
+0

您正在使用'FixedThreadPool',但在我看來您正在尋找一個具有可擴展線程數的'CachedThreadPool'。 – Pievis

+0

你做錯了。查看'Connection.prepareStatement'獲取編譯語句(作爲'PreparedStatement'實例)。使用'PreparedStatement.setObject'爲要插入的行設置值,然後用'Statement.addBatch'將該行添加到批處理中。使用'Statement.executeBatch'來執行批處理(即在你的案例中插入行)。使用'Statement.clearBatch'進行另一輪插入。爲SQL語句連接字符串是許多原因的禁止(SQL注入是其中之一)。使用'+ ='連接字符串是字符串連接的不可否認的... –

+0

在此處閱讀有關SQL注入防護的更多信息:[SQL注入預防備忘](https://www.owasp.org/index.php/SQL_Injection_Prevention_Cheat_Sheet ) –

回答

1

您可以通過調用它的getActiveCount方法發現在ThreadPoolExecutor活動線程的大致數量。但是,你不應該需要。

Java documentation for Executors.newFixedThreadPool

創建一個可重用不受限制的隊列操作線程的固定數目的線程池。在任何時候,最多nThreads線程都將被激活處理任務。如果在所有線程處於活動狀態時提交其他任務,則它們將在隊列中等待,直到線程可用。如果任何線程在關閉之前的執行期間由於失敗而終止,則如果需要執行後續任務,則將取代它。池中的線程將一直存在,直到它被明確關閉。

因此,您應該能夠不斷向線程池提交任務,並在線程可用時接收並運行它們。

我還注意到,在將它們提交給線程池之前,您正在將您的任務包裝在Thread對象中,而這不是必需的。

相關問題