2017-03-07 28 views
9

我想使用Rayon的par_iter()來優化我的功能。Rayon中的每個線程的初始化

的單線程的版本是這樣的:

fn verify_and_store(store: &mut Store, txs: Vec<Tx>) { 

    let result = txs.iter().map(|tx| { 

     tx.verify_and_store(store) 

    }).collect(); 

    ... 
} 

每個Store實例必須由一個線程使用,但只能中Store多個實例可以同時使用,這樣我就可以使通過clone -ing這個多線程store

fn verify_and_store(store: &mut Store, txs: Vec<Tx>) { 

    let result = txs.par_iter().map(|tx| { 

     let mut local_store = store.clone(); 

     tx.verify_and_store(&mut local_store) 

    }).collect(); 

    ... 
} 

然而,這種克隆store迭代,這是WA太慢了。我想每個線程使用一個商店實例。

人造絲這可能嗎?還是應該採取手動線程和工作隊列?

回答

5

它可以使用線程局部變量,以確保local_store沒有在給定的線程創建不止一次。

例如,該編譯(full source):然而

fn verify_and_store(store: &mut Store, txs: Vec<Tx>) { 
    use std::cell::RefCell; 
    thread_local!(static STORE: RefCell<Option<Store>> = RefCell::new(None)); 

    let mut result = Vec::new(); 

    txs.par_iter().map(|tx| { 
     STORE.with(|cell| { 
      let mut local_store = cell.borrow_mut(); 
      if local_store.is_none() { 
       *local_store = Some(store.clone()); 
      } 
      tx.verify_and_store(local_store.as_mut().unwrap()) 
     }) 
    }).collect_into(&mut result); 
} 

有兩個問題與此代碼。一,如果store克隆需要做一些事情的時候par_iter()完成後,諸如沖洗自己的緩衝區,它根本不會發生 - 他們Drop纔會被調用時,人造絲的工作線程退出,甚至is not guaranteed

第二個,也是更嚴重的問題,是的store的克隆每個工人線程創建一次。如果Rayon緩存其線程池(並且我相信它),這意味着後續與verify_and_store無關的後續調用將繼續使用store的最後一個已知克隆,這可能與當前存儲無關。

這可以通過代碼有點複雜予以糾正:

  • 商店在Mutex<Option<...>>,而不是Option克隆變量,使他們可以通過調用par_iter()線程訪問。這將導致每次訪問互斥鎖,但該鎖將是無爭議的,因此便宜。

  • 周圍使用互斥的Arc爲了收集到向量中創建商店克隆的引用。此向量用於在迭代完成後將商店重置爲None以清理商店。

  • 將整個調用放在一個不相關的互斥體中,這樣兩個並行調用verify_and_store最終不會看到對方的存儲體克隆。 (如果在迭代之前創建並安裝了新的線程池,這可能是可以避免的。)希望這個序列化不會影響verify_and_store的性能,因爲每個調用都將使用整個線程池。

結果是不是很漂亮,但它編譯,僅使用安全代碼,似乎工作:

fn verify_and_store(store: &mut Store, txs: Vec<Tx>) { 
    use std::sync::{Arc, Mutex}; 
    type SharedStore = Arc<Mutex<Option<Store>>>; 

    lazy_static! { 
     static ref STORE_CLONES: Mutex<Vec<SharedStore>> = Mutex::new(Vec::new()); 
     static ref NO_REENTRY: Mutex<()> = Mutex::new(()); 
    } 
    thread_local!(static STORE: SharedStore = Arc::new(Mutex::new(None))); 

    let mut result = Vec::new(); 
    let _no_reentry = NO_REENTRY.lock(); 

    txs.par_iter().map({ 
     |tx| { 
      STORE.with(|arc_mtx| { 
       let mut local_store = arc_mtx.lock().unwrap(); 
       if local_store.is_none() { 
        *local_store = Some(store.clone()); 
        STORE_CLONES.lock().unwrap().push(arc_mtx.clone()); 
       } 
       tx.verify_and_store(local_store.as_mut().unwrap()) 
      }) 
     } 
    }).collect_into(&mut result); 

    let mut store_clones = STORE_CLONES.lock().unwrap(); 
    for store in store_clones.drain(..) { 
     store.lock().unwrap().take(); 
    } 
} 
+1

豈不可惜那裏似乎不是是什麼範圍限定在這個調用(雖然這對於一個體面的案例子集顯然是有用的)。 –

+0

@ChrisEmerson是的,我對這個答案感到擔憂的是,我無法想象使用安全代碼來清理創建的存儲(或者在一切完成時運行其他任意命令,比如將其刷新到磁盤)的方法。更糟糕的是,下一次調用'verify_and_store'將繼續使用** last **已知的'Store'克隆,這可能與當前的'store'無關。 – user4815162342

+0

謝謝。這可行,但在我個人的情況下,我發現人造絲有'par_chunks'來減少克隆的數量。雖然這可能仍然會導致每個線程有多個克隆,但它沒有@ user4815162342描述的範圍問題。 – Tomas