2010-05-11 16 views
4

我有一個系統,可以使用大約30個線程作爲時間將一個很大的taks分解成小任務。隨着每個單獨的線程完成,它將其計算結果保存到數據庫中。我想要實現的是讓每個線程將其結果傳遞給一個新的persisance類,該類將在其自己的線程中運行時執行一種雙緩衝和數據持久性。在多線程程序中緩衝db插入

例如,在100個線程將其數據移到緩衝區後,持久類會將持久類交換緩衝區並將所有100個條目持久存儲到數據庫中。這將允許使用準備好的語句,從而減少程序和數據庫之間的I/O。

有沒有這種類型的多線程雙緩衝的圖案或很好的例子?

+0

工作線程多頻繁吐出結果? – Justin 2010-05-11 19:50:43

回答

4

我見過被稱爲異步數據庫寫入或模式背後寫這個模式。這是分佈式緩存產品(Teracotta,Coherence,GigaSpaces,...)支持的典型模式,因爲您不希望緩存更新也包括將更改寫入底層數據庫。

此模式的複雜性取決於您對數據庫更新丟失的容忍度。由於完成工作和將結果寫入數據庫之間的延遲,由於錯誤,電源故障等原因,您可能會丟失更新(您可以看到圖片)。

我建議某種隊列將完成的結果寫入數據庫,然後分批處理它們(使用您的示例)或經過一段時間後處理它們。也使用時間延遲的原因是應對不能被100整除的結果集。

如果您對彈性/耐用性沒有要求,則可以在同一過程中執行所有操作。但是,如果您無法容忍任何丟失,則可以使用持久JMS隊列替換in-vm隊列(速度更慢但更安全)。

+0

+1用於標記可能存在的問題。 – 2010-05-11 16:16:00

+0

它是一個隔夜批處理過程,所以如果有足夠的內存,如果進程一直等到將所有生成的數據寫入數據庫的最終結果,那就沒問題了。沒有足夠的內存等待結束,所以我打算設置它,以便在數據傳遞一定數量的線程後它會保留到數據庫。作爲steven指出的 – Winter 2010-05-11 19:51:50

1

爲了具有較低的同步開銷,使用一個線程局部(對於每個計算線程)建立結果批次。一旦達到一定數量的結果,將該批次排入阻塞隊列。使用ArrayBlockingQueue來支持你的持久化類,因爲你可能不希望你的內存使用變得無限。您可以讓多個數據庫寫入程序線程獲得一組結果並將它們保存到數據庫中。

class WriteBehindPersister { 
ThreadLocal<List<Result>> internalBuffer; 
static ArrayBlockingQueue<List<Result>> persistQueue; 
static { 
    persistQueue = new ArrayBlockingQueue(10); 
    new WriteThread().start(); 
}  

public WriteBehindPersister() { 
    internalBuffer = new ThreadLocal<List<Result>>(); 
} 

public void persist(Result r) { 
    List<Result> localResult = internalBuffer.get(); 
    localResult.add(r); 
    if (localResult.size() > max) { 
    persistQueue.put(new ArrayList(localResult)); 
    localResult.clear(); 
    } 
} 

class WriteThread extends Thread { 
    public void run() { 
    while (true) { 
    List<Result> batch = persistQueue.take(); 
    beginTransaction(); 
    for (Result r : batch) { 
    batchInsert(r); 
    } 
    endTransaction(); 
    } 
    } 
} 

} 

此外,你可以用一個執行服務(而不是單個寫線程)同時堅持多批次到數據庫,在使用一個以上的數據庫連接的權衡。如果您的驅動程序支持,請確保使用JDBC批處理API。

+0

,您需要決定如何在計算結束時刷新隊列(或者如果在很長一段時間內沒有請求)。這一切都取決於你需要如何'在線'。 – Justin 2010-05-11 17:34:41

+0

每個工作線程都有自己的WriteBehindPersister還是WriteBehindPersister是單身? – Winter 2010-05-19 13:36:45

+0

由於每個線程都有自己的ThreadLocal內部緩衝區,因此該模式作爲單例使用。如果你不想使用threadlocal的東西,你可以爲每個線程實例化一個Persister(使用它自己的緩衝區),並用靜態隊列替換爲共享隊列的注入引用。 – Justin 2010-05-19 17:01:00