2016-12-14 50 views
2

我想並行化一個循環(使用tbb),其中包含一些昂貴但可矢量化的迭代(隨機擴展)。我的想法是緩衝這些並刷新緩衝區,只要它達到矢量大小。這樣的緩衝區必須是線程本地的。例如,並行循環結束時使用TBB刷新線程本地緩衝區

// dummy for testing 
void do_vectorized_work(size_t k, size_t*indices) 
{} 
// dummy for testing 
bool requires_expensive_work(size_t k) 
{ return (k&7)==0; } 

struct buffer 
{ 
    size_t K=0, B[vector_size]; 
    void load(size_t i) 
    { 
    B[K++]=i; 
    if(K==vector_size) 
     flush(); 
    } 
    void flush() 
    { 
    do_vectorized_work(K,B); 
    K=0; 
    } 
}; 

void do_work_in_parallel(size_t N) 
{ 
    tbb::enumerable_thread_specific<buffer> tl_buffer; 

    tbb::parallel_for(size_t(0),N,[&](size_t i) 
    { 
    if(requires_expensive_work(i)) 
     tl_buffer.local().load(i); 
    }); 
} 

然而,這留下緩衝區非空的,所以我還是要最後一次刷新他們每個人的

for(auto&b:tl_buffer) 
    b.flush(); 

但這是串行!當然,我也可以嘗試這樣做並行

using tl_range = typename tbb::enumerable_thread_specific<buffer>::range_type; 
tbb::parallel_for(tl_buffer.range(),[](tl_range const&range) 
{ 
    for(auto r:range) 
    r->flush(); 
}); 

但我不知道這是有效的(因爲只有儘可能多的緩衝區有線程)。我想知道是否有可能避免事件發生後的最後沖洗。即是否可以使用tbb::task s(替換tbb::parallel_for),以便每個線程的最終任務是刷新其緩衝區?

回答

1

它發生,我認爲這可以通過減少解決。

struct buffer 
{ 
    std::size_t K=0, B[vector_size]; 
    void load(std::size_t i) 
    { 
    B[K++]=i; 
    if(K==vector_size) flush(); 
    } 
    void flush() 
    { 
    do_vectorized_work(K,B); 
    K=0; 
    } 
    buffer(buffer const&, tbb::split) 
    {} 
    void operator()(tbb::block_range<std::size_t> const&range) 
    { for(i:range) load(i); } 
    bool empty() 
    { return K==0; } 
    std::size_t pop() 
    { return K? B[--K] : 0; } 
    void join(buffer&rhs) 
    { while(!rhs.empty()) load(rhs.pop()); } 
}; 

void do_work_in_parallel(std::size_t N) 
{ 
    buffer buff; 
    tbb::parallel_reduce(tbb::block_range<std::size_t>(0,N,vector_size),buff); 
    if(!buff.empty()) 
    buff.flush(); 
} 
2

不,工作線程沒有關於這個特定任務是否是給定工作的最後一項任務的完整信息(這是工作竊取如何工作的)。因此,不可能在parallel_for或調度程序本身上實現這樣的功能。因此,我建議你採用你描述的這兩種方法。

你可以對此做兩件事。

  • 使其異步。即排隊一個任務,這將使所有的東西都被刷新。這將有助於從主線程的熱路徑中刪除此代碼。如果在完成此任務時需要設置任何依賴項,請注意。
  • 使用tbb::task_scheduler_observer爲了初始化線程特定的數據,並在線程關閉或沒有工作時保持一段時間時懶惰地釋放它。後者需要使用local observer feature,這尚未正式支持,但已經持續了幾年。

例子:

#define TBB_PREVIEW_LOCAL_OBSERVER 1 
#include <tbb/tbb.h> 
#include <assert.h> 

typedef void * buffer_t; 
const static int bufsz = 1024; 
class thread_buffer_allocator: public tbb::task_scheduler_observer { 
    tbb::enumerable_thread_specific<buffer_t> _buf; 
public: 
    thread_buffer_allocator() 
    : tbb::task_scheduler_observer(/*local=*/ true) { 
    observe(true); // activate the observer 
    } 
    ~thread_buffer_allocator() { 
    observe(false); // deactivate the observer 
    for(auto &b : _buf) { 
     printf("destructor: cleared: %p\n", b); 
     free(b); 
    } 
    } 
    /*override*/ void on_scheduler_entry(bool worker) { 
    assert(_buf.local() == nullptr); 
    _buf.local() = malloc(bufsz); 
    printf("on entry: %p\n", _buf.local()); 
    } 
    /*override*/ void on_scheduler_exit(bool worker) { 
    printf("on exit\n"); 
    if(_buf.local()) { 
     printf("on exit: cleared %p\n", _buf.local()); 
     free(_buf.local()); 
     _buf.local() = nullptr; 
    } 
    } 
}; 

int main() { 
    thread_buffer_allocator buffers_scope; 
    tbb::parallel_for(0, 1024*1024*1024, [&](auto i){ 
    usleep(i%3); 
    }); 
    return 0; 
} 
+0

感謝您的支持。我不認爲異步方法比我在OP中描述的嘗試更好。使用'tbb :: task_scheduler_observer'的方法聽起來很有趣。你可以使用代碼片段概述這將如何工作? – Walter

+0

@Walter更新。雖然我只在線上編譯器上嘗試過,但是它並沒有與當地觀察員進行最近的TBB測試:http://coliru.stacked-crooked.com/a/11728cd935579cfe – Anton