2012-03-13 67 views
3

我有一個Java程序需要將大量較大的行插入到SQL Server數據庫中。行數是800k,每個的大小大約是200字節。使用Java線程並行插入到數據庫中

當前它們被分成50個批次,然後每個批次都使用一個語句插入。 (我們已經通過JTDS日誌記錄確認每個批次都使用一次sp_exec調用。)調整批量大小在25到250之間看起來沒有任何顯着影響,50幾乎是最佳的。

我已經嘗試將批次分成(比如說)5個組,並使用線程並行處理每個組。這顯着更快 - 比5個線程快兩倍以上。

我的問題是關於使線程使用健壯。特別是,如果任何批次失敗,將會拋出異常。我想讓這個異常被捕獲並傳遞給調用者,並且我希望在我們傳遞它之前100%確定其他線程已經完成(中止或完成)。因爲在稍後在程序中恢復異常時,我們不希望意外的行繼續到達表中。

這裏是我做了什麼:

/** Method to insert a single batch. */ 
private void insertBatchPostings(Collection<Posting> postings) throws PostingUpdateException 
{ 
    // insert the batch using a single INSERT invokation 
    // throw a PostingUpdateException if anything goes wrong 
} 

private static final int insertionThreads = 5; 

/** Method to insert a collection of batches in parallel, using the above. */ 
protected void insertBatchPostingsThreaded(Collection<Collection<Posting>> batches) throws PostingUpdateException 
{ 
    ExecutorService pool = Executors.newFixedThreadPool(insertionThreads); 
    Collection<Future> futures = new ArrayList<Future>(batches.size()); 

    for (final Collection<Posting> batch : batches) { 
     Callable c = new Callable() { 
      public Object call() throws PostingUpdateException { 
       insertBatchPostings(batch); 
       return null; 
      }    
     }; 
     /* So we submit each batch to the pool, and keep a note of its Future so we can check it later. */ 
     futures.add(pool.submit(c)); 
    } 

    /* Pool is running, indicate that no further work will be submitted to it. */ 
    pool.shutdown(); 

    /* Check all the futures for problems. */ 
    for (Future f : futures) { 
     try { 
      f.get(); 
     } catch (InterruptedException ex) { 
      throw new PostingUpdateException("Interrupted while processing insert results: " + ex.getMessage(), ex); 
     } catch (ExecutionException ex) { 
      pool.shutdownNow(); 
      throw (PostingUpdateException) ex.getCause(); 
     } 
    } 
} 

通過這個返回我要保證所有的線程都處於休眠狀態的時間。

問題

(我想澄清我在問什麼。)

  1. 是上面的代碼完全健壯,在沒有線程插入將繼續insertBatchPostingsThreaded後操作回報?
  2. 是否有更好更簡單的方法使用Java併發功能來實現這一點?我的代碼看起來過於複雜(讓我懷疑錯過邊緣案例)。
  3. 一旦任何一個線程出現故障,最好的方法是讓它失效嗎?

我不是一個自然的Java程序員,所以我希望最終得到的東西不會宣傳這個事實。 :)

+0

Augh。你可以使用泛型來使你的代碼更具可讀性嗎? – 2012-03-13 00:03:12

+0

@Edmund禁用批量插入表索引可提高速度。你必須觸發索引重新計算。 – hidralisk 2012-03-13 00:24:49

+0

@Louis - 我從工作計劃中逐字拷貝它以確保它是準確的;這是一個傳統的應用程序。但我試圖將它翻譯成現代Java。我認爲for循環最讓你感到憤怒,但我也翻譯了集合類型。 – Edmund 2012-03-13 00:29:22

回答

1

番石榴的Futures.successfulAsList採取期貨列表作爲輸入,並返回一個未來「其價值是包含所有成功輸入期貨價值的列表」。您可以在生成的Future上調用get(),然後遍歷您的原始未來列表以檢查是否有任何故障。

+0

我的其他要求(我已經添加到問題中)是,如果有任何失敗,池中的其餘任務可以被取消或中止,以便快速失敗。番石榴有什麼可以幫助嗎? – Edmund 2012-03-13 00:39:00

+0

啊。我沒有看到你想要所有其他線程都失敗。然而,使用'ListenableFuture'來添加回調函數來取消所有其他期貨並不困難...... – 2012-03-13 01:12:29

+0

ListenableFuture也會調用監聽器,監聽器又會調用池中的shutdownNow?看看Java源代碼,似乎shutdownNow努力取消所有排隊的任務,所以它可能已經在我的代碼中做到了,但是如果我可以使用Guava中的某些東西來使代碼更清潔,那麼我就是爲了這一點。 – Edmund 2012-03-13 22:20:39