2016-09-16 35 views
2

我對併發問題很陌生,因此對我毫無所知。我想創建一個使用Scala Map作爲後備存儲的線程安全容器。我寧願只公開它的方法的一個子集,而不是將用戶暴露給底層的Map。包裝一個不可變的集合並在Scala中維護線程安全?

例子可能看起來像下面...

class MyContainer[A] { 

    def add(thing: A): Unit = { 
    backingStore = backingStore + (thing.uuid -> thing) 
    } 

    def filter(p: A => Boolean): Option[Iterable[A]] = { 
    val filteredThings = backingStore.values.filter(p) 
    if (filteredThings.isEmpty) None else Some(filteredThings) 
    } 

    def remove(uuid: UUID): Option[A] = backingStore.get(uuid) match { 
    case optionalThing @ Some(thing) => 
     backingStore = backingStore - uuid; optionalThing 
    case None => None 
    } 

    @ volatile private[this] var backingStore = immutable.HashMap.empty[UUID, A] 

} 

...我懷疑,即使底層的後備存儲是不變的,它的參考是volatile,容器不是線程安全的。

假設我有兩個單獨的線程可以訪問上述容器的實例。線程1過濾底層集合並獲得一些結果;同時線程2刪除一個項目。線程1的結果可能包含對線程2刪除的項目的引用?可能還有其他問題。

我正確的說,上面的實現不是線程安全的嗎?使用Scala使上述線程安全的最習慣方法是什麼?

編輯:如果可能,我寧願避免阻塞和同步。如果必須使用阻塞/同步,那麼是否需要volatile參考?那不可改變的收藏的重點是什麼?難道我不能使用可變集合嗎?

回答

2

您正在使用寫入時複製的方法,因此您的併發讀取和寫入問題是它們不是嚴格排序的,但這不是一個真正的問題:它只是一個時間問題,因爲如果A正在寫入而B在閱讀時,A不能保證A是否會看到B的編輯。

你真正的問題是當你有C和D同時寫入時:然後他們都可以讀取相同的起始地圖,更新自己的副本,然後只寫自己的編輯。誰先寫誰會改變他們的改變。考慮一個包含(A,B)的起始映射,並且線程C和D分別添加條目'C'和'D',而線程E和F讀取映射;所有這一切都在同時發生。一個可能的reuslt是:

Ç讀取地圖(A,B)
d讀取地圖(A,B)
Ç寫入地圖(A,B,C)
È讀取映射圖(A,B,C )
d寫入映射圖(A,B,d)
˚F讀取映射圖(A,B,d)

的 'C' 條目trnasiently出現,然後永遠失去。

可靠地對寫入進行排序的唯一方法是確保從不同時輸入寫入。或者使用同步locke nforce單個條目寫入塊或通過使用單個Akka actor來執行更新來確保其被序列化。

如果您關心讀取與寫入的排序,您還需要同步讀取,但如果您有多個線程訪問它,那麼這不太可能是真正的問題。

+0

@ladams如果我理解正確的話,我需要sychronize添加/刪除(即寫)方法;但對於像過濾器這樣的讀取方法,我不需要同步(除非我真的關心讀取和寫入的順序)。 – davidrpugh

+0

@ladams如果我只同步'add'和'remove'方法,那麼我仍然需要將對後備存儲的引用註釋爲@volatile是否正確? – davidrpugh

+0

是的,您需要同步添加/刪除方法。 將引用標記爲volatile有助於更新可見性的及時性:如果您不這樣做,讀者線程可能會在其他線程更新後保留對舊版本後備存儲的引用,時間不確定它可能會多次。 爲確保其他線程儘快可見,您需要做兩件事:讓寫入器從處理器緩存寫入內存(發生在同步塊的退出時),讓讀者將其讀入緩存中,軍隊。 – Iadams

0

我正確的說,上面的實現不是線程安全的嗎?

是的。它不是線程安全的。但它確實有正確的內存visibility semantics

爲了簡單起見,你可以把它線程安全的:

class MyContainer[A <: {def uuid: UUID}] { 

    def add(thing: A): Unit = this.synchronized{ 
    backingStore = backingStore + (thing.uuid -> thing) 
    } 

    def filter(p: A => Boolean): Option[Iterable[A]] = this.synchronized{ 
    val filteredThings = backingStore.values.filter(p) 
    if (filteredThings.isEmpty) None else Some(filteredThings) 
    } 

    def remove(uuid: UUID): Option[A] = this.synchronized{ 
    backingStore.get(uuid) match { 
     case optionalThing @ Some(thing) => 
     backingStore = backingStore - uuid; optionalThing 
     case None => None 
    } 
    } 

    import scala.collection.immutable.HashMap 
    private[this] var backingStore = HashMap.empty[UUID, A] 
} 
+0

這個語法是什麼意思:'[A <:{def uuid:UUID}]'?從來沒有見過這個。 – Samar

+0

@Samar它意味着一個[結構類型](https://twitter.github.io/scala_school/advanced-types.html#structural)包含一個名爲'uuid'的方法是A.的上限。 –

+0

開明:)感謝Yuval! – Samar

相關問題