2014-02-20 105 views
5

我正試圖在C++中實現未來調用機制。雖然這只是一個測試代碼(有點匆忙),但我打算爲我正在使用的語言的運行時使用類似的東西來實現透明並行。將執行從一個線程移動到另一個線程以執行任務並行和調用

,我幹我的工作的代碼,使其稍微小了一點,但它仍然是很大的:

#include <cstdlib> 
#include <cstdio> 
#include <iostream> 
#include <vector> 
#include <queue> 
#include <future> 
#include <thread> 
#include <functional> 
#include <type_traits> 
#include <utility> 
using namespace std; 
using namespace std::chrono; 

//------------------------------------------------------------------------------ 
// Simple locked printer 

static std::recursive_mutex print_lock; 

inline void print_() { 
    return; 
}; 

template<typename T, typename... Args> 
inline void print_(T t, Args... args) { 
    print_lock.lock(); 
    std::cout << t; 
    print_(args...); 
    print_lock.unlock(); 
}; 
//------------------------------------------------------------------------------ 

template<typename R> 
class PooledTask { 
    public: 
    explicit PooledTask(function<R()>); 

    // Possibly execute the task and return the value 
    R &operator()() { 

     // If we can get the lock, we're not executing 
     if(lock.try_lock()) { 

     // We may already have executed it 
     if(done) 
      goto end; 

     // Otherwise, execute it now 
     try { 
      result = move(task()); 
     } catch(...) { 
      // If an exception is thrown, save it for later 
      eptr = current_exception(); 
      failed = true; 
     }; 

     done = true; 

     goto end; 

     } else { 

     // Wait until the task is completed 
     lock.lock(); 

     end: { 
      lock.unlock(); 

      // Maybe we got an exception! 
      if(failed) 
      rethrow_exception(eptr); 

      // Otherwise, just return the result 
      return result; 
     }; 
     }; 
    }; 

    private: 
    exception_ptr eptr; 
    function<R()> task; 
    bool done; 
    bool failed; 
    mutex lock; 
    R result; 
}; 

extern class TaskPool pool; 

class TaskPool { 
    public: 
    TaskPool() noexcept: TaskPool(thread::hardware_concurrency() - 1) { 
     return; 
    }; 

    TaskPool(const TaskPool &) = delete; 
    TaskPool(TaskPool &&) = delete; 

    template<typename T> 
    void push(PooledTask<T> *task) noexcept { 

     lock_guard<mutex> guard(lock); 

     builders.push([=] { 
     try { 
      (*task)(); 
     } catch(...) { 
      // Ignore it here! The task will save it. :) 
     }; 
     }); 

    }; 

    ~TaskPool() { 
     // TODO: wait for all tasks to finish... 
    }; 
    private: 
    queue<thread *> threads; 
    queue<function<void()>> builders; 
    mutex lock; 

    TaskPool(signed N) noexcept { 
     while(N --> 0) 
     threads.push(new thread([this, N] { 
      for(;;) { 

      pop_task(); 

      }; 
     })); 
    }; 

    void pop_task() noexcept { 

     lock.lock(); 

     if(builders.size()) { 

     auto task = builders.front(); 

     builders.pop(); 

     lock.unlock(); 

     task(); 

     } else 
     lock.unlock(); 
    }; 

} pool; 


template<typename R> 
PooledTask<R>::PooledTask(function<R()> fun): 
    task(fun), 
    done(false), 
    failed(false) 
{ 
    pool.push(this); 
}; 

// Should probably return a std::shared_ptr here... 
template<typename F, typename... Args> 
auto byfuture(F fun, Args&&... args) noexcept -> 
    PooledTask<decltype(fun(args...))> * 
{ 

    using R = decltype(fun(args...)); 

    auto pooled = new PooledTask<R> { 
    bind(fun, forward<Args>(args)...) 
    }; 

    return pooled; 
}; 


//------------------------------------------------------------------------------ 
#include <map> 

// Get the current thread id as a simple number 
static int myid() noexcept { 
    static unsigned N = 0; 
    static map<thread::id, unsigned> hash; 
    static mutex lock; 

    lock_guard<mutex> guard(lock); 

    auto current = this_thread::get_id(); 

    if(!hash[current]) 
    hash[current] = ++N; 

    return hash[current]; 
}; 
//------------------------------------------------------------------------------ 

//------------------------------------------------------------------------------ 
// The fibonacci test implementation 
int future_fib(int x, int parent) { 

    if(x < 3) 
    return 1; 

    print_("future_fib(", x, ")", " on thread ", myid(), \ 
     ", asked by thread ", parent, "\n"); 

    auto f1 = byfuture(future_fib, x - 1, myid()); 
    auto f2 = byfuture(future_fib, x - 2, myid()); 

    auto res = (*f1)() + (*f2)(); 

    delete f1; 
    delete f2; 

    return res; 
}; 
//------------------------------------------------------------------------------ 

int main() { 
    // Force main thread to get id 1 
    myid(); 

    // Get task 
    auto f = byfuture(future_fib, 8, myid()); 

    // Make sure it starts on the task pool 
    this_thread::sleep_for(seconds(1)); 

    // Blocks 
    (*f)(); 

    // Simply wait to be sure all threads are clean 
    this_thread::sleep_for(seconds(2)); 

    // 
    return EXIT_SUCCESS; 
}; 

此程序的結果是這樣的(我有一個四核,所以3個線程池中):

future_fib(8) on thread 2, asked by thread 1 
future_fib(7) on thread 3, asked by thread 2 
future_fib(6) on thread 4, asked by thread 2 
future_fib(6) on thread 3, asked by thread 3 
future_fib(5) on thread 4, asked by thread 4 
future_fib(5) on thread 3, asked by thread 3 
future_fib(4) on thread 4, asked by thread 4 
future_fib(4) on thread 3, asked by thread 3 
future_fib(3) on thread 4, asked by thread 4 
future_fib(3) on thread 3, asked by thread 3 
future_fib(3) on thread 4, asked by thread 4 
future_fib(3) on thread 3, asked by thread 3 
future_fib(4) on thread 4, asked by thread 4 
future_fib(4) on thread 3, asked by thread 3 
future_fib(3) on thread 4, asked by thread 4 
future_fib(3) on thread 3, asked by thread 3 
future_fib(5) on thread 3, asked by thread 3 
future_fib(4) on thread 3, asked by thread 3 
future_fib(3) on thread 3, asked by thread 3 
future_fib(3) on thread 3, asked by thread 3 

此實現自己都慢比正常的斐波那契功能。

所以這裏的問題:當池中運行fib(8),它會創建將在接下來的線程上運行兩個任務,但是,當它到達auto res = (*f1)() + (*f2)();,兩個任務都已經在運行,所以它會阻塞f1(上運行線程3)。

爲了提高速度,我需要做的是爲線程2而不是在f1上進行阻塞,以假定線程3正在執行的任務,讓它準備好接受另一個任務,所以沒有線程會睡覺做計算。

這篇文章在這裏http://bartoszmilewski.com/2011/10/10/async-tasks-in-c11-not-quite-there-yet/說有必要做我想做的事,但沒有指定如何。

我的疑問是:我怎麼可能做到這一點?

有沒有其他的選擇做我想要的?

+1

[Threading Building Blocks(TBB)library](https://www.threadingbuildingblocks.org/)怎麼樣?它提供了帶有線程池的併發任務系統。 – yohjp

+1

看看C++ 1z的'.then()'方案嗎? 'return pooled_fib(x-2).then([x](auto && r1){auto r2 = pooled_fib(x-1); return r1.get()+ r2.get();});'或者somesuch。 – Yakk

回答

1

我想你可能有機會與resumable functions currently proposed for C++ standartization。該提案尚未獲得批准,但Visual Studio 15 CTP實現了該提案,因此您可以嘗試製作原型(如果可以使用MSVC編譯器)。

Gor的Nishanov(最新建議論文的作者之一)描述了計算斐波那契數的一個非常相似的示例「父偷調度」開始在他的談話CppCon 23:47:https://www.youtube.com/watch?v=KUhSjfSbINE

但是請注意, ,我找不到spawnable<T>實施的任何資源/樣本,因此您可能需要聯繫提案作者以獲取詳細信息。

+0

看起來真不錯!我會嘗試與他聯繫。非常感謝你! :) – paulotorrens

+0

讀完論文後的確如此:這的確是一樣的想法(我真的試圖通過手工構建一個偷竊時間表),通過閱讀他的方法,我可以看到我做錯了什麼。現在我可以完成我的工作! :) – paulotorrens

0

看你的代碼是完全的東西,會比計算FIB 8

例如切換到內核空間更長的時間來找出線程ID是什麼將在窗口的最可能的口味的時間比工作更長時間在這裏完成。

並行化並不是爲了讓一堆線程競爭共享內存。這是你可以犯的最糟糕的錯誤。

並行化任務時,您將輸出分成不連續的塊,以便並行線程分別寫入自己的內存,避免內存和緩存爭用,從而導致應用程序崩潰。

當你有3個線程觸及3個獨立的內存位置時,永遠不需要使用鎖或其他同步原語。在大多數窗口上還需要內核模式切換。

所以你真正需要知道的唯一事情就是線程全部完成。這可以通過許多Interlocked Exchange方法或OS驅動的事件句柄來實現。

如果您想成爲一名認真的開發人員,請刪除線程ID,刪除鎖定代碼,並開始思考如何在沒有這些問題的情況下處理此問題。

在2車道高速公路上考慮2輛車。一個比另一個更快。你永遠不知道哪輛車在另一輛車前面。問問你自己有沒有辦法在兩條車道上定位這些車,誰在前面誰不在乎誰在移動更快?你應該得出結論,如果每輛車停留在自己的車道上,那麼永遠不會有問題。這是最簡單的並行化。

現在考慮你將在不同的大陸的不同機器上產生這些工作。嘗試交換關於線程和內存的信息是否合理?不,這不對。你很簡單地將問題分解成幾乎完全沒有任何關係的離散功能塊,忘記過度控制,讓信息時代的魔法發生。

我希望這會有所幫助。

+1

這只是一個例子,當然在這種情況下,代碼將充滿比fib更長的事情。您可以替換無鎖隊列的鎖。這並不能解決f1必須隱含等待f2直到operator +可以執行的問題。我們需要的是一種釋放線程的方法,而不是等待一個信號,這是一個鎖定或旋轉等待互鎖比較交換(這是通常如何實現鎖定)或執行到另一個任務(即通過任務竊取) –

+1

附錄:operator +必須在f1和f2之後執行,並且不希望更改代碼以使用延續或通過使用像CPC continuation-passing-C這樣的預處理器來更改構建。你還可以採用其他方法而不鎖定? –

+0

沒錯。當然,我沒有在速度測試中保留帶有線程ID的'print_'。問題在於這個任務只在線程3和4之間共享,因爲2被阻塞,正在等待。我嘗試了今天早上執行的一個任務,但最終我得到了一個'bus error:10',我認爲這是由堆棧溢出引起的(具有諷刺意味)。那麼,我認爲這可能是問題,也許我可以通過使用自旋鎖而不是互斥來獲得一些改進。也許即使我使用全部3個線程,它仍然會比串行版本慢。只是想嘗試一下,因爲我需要自動並行。 – paulotorrens

相關問題