2015-04-30 70 views
1

我真的不確定如何解決這個問題,所以我會先解釋它。 我需要運行一些線程,每個線程通過TCPSocket連接到某個應用程序,到目前爲止沒有問題。該應用程序非常耗時,這就是爲什麼我希望它在多個線程上並行運行,並且每個線程都與它進行通信的原因。一旦計算完成,我想將結果發送到另一個線程收集結果。爲此 我寫了一個Worker類:​​Qt多線程QThreads保持TCP連接並重用

class Worker : public QObject { 
    Q_OBJECT 

public: 
    Worker(); 
    Worker(int port); 
    ~Worker(); 
    QTcpSocket* sock; 
    void insert(); 

public slots: 
    void connect(); 
    void process(const int &id, const QString &param, const int &arity); 

signals: 
    void ready(); 
    void finished(const int &id, const int &consistent, const QString &result); 
    void error(QString err); 
}; 

現在superThread應該工作過一個巨大的文件,需要傳播它周圍的線程,然後接收和處理結果。我的做法到目前爲止)連接在主(如下另一superThread:

QThread* superThread = new QThread(); 
supWorker* super = new supWorker(); 
for (int i = 0; i < nrWorkers; i++){ 
    Worker* worker = new Worker(portRange+i); 
    QThread* workerThread = new QThread(); 
    QThread::connect(workerThread, SIGNAL(started()), worker, SLOT(connect())); 
    worker->moveToThread(workerThread); 
    workerThread->start(); 
    QThread::connect(super, SIGNAL(process(int, QString, int)), worker, SLOT(process(int,QString,int))); 
    QThread::connect(worker, SIGNAL(finished(int, int, QString)), super, SLOT(handleResult(int, int, QString))); 
} 

問題這種方式顯然是我只能將信號發送到所有連接線。我想要superThread做的只是將參數發送到其中一個線程。我不知道我如何處理連接,以便只有一個工作線程能夠接收它?

任何幫助或建築理念非常感謝,在此先感謝。

+0

Sry,但我認爲這是廣泛的SO的方式。 – Bowdzone

+0

Sry,編輯希望它現在更具體一點。 – yonobi

回答

2

不知道我是否有100%的想法,但爲什麼不將工作線程數組傳遞給超線程,保留一個表示當前活動線程索引的索引,僅將信號連接到該線程,調度需要時的信號,等待完成,斷開信號,提前索引並重復?如果將序列化的信號發送到線程是您真正想要的,這可能會起作用。

編輯

好吧其實我振作togetger使實現所需的工作流程基於Qt的樣品,並把它放在github

#pragma once 

#include <QThread> 
#include <QApplication> 
#include <QMetaType> 
#include <QTimer> 

#include <vector> 
#include <memory> 
#include <cstdio> 
#include <algorithm> 

struct Work 
{ 
    int m_work; 
}; 

struct Result 
{ 
    int m_result; 
    int m_workerIndex; 
}; 

Q_DECLARE_METATYPE(Work); 
Q_DECLARE_METATYPE(Result); 

class Worker : public QThread 
{ 
    Q_OBJECT 

public: 
    Worker(int workerIndex) : m_workerIndex(workerIndex) 
    { 
     moveToThread(this); 
     connect(this, SIGNAL(WorkReceived(Work)), SLOT(DoWork(Work))); 
     printf("[%d]: worker %d initialized\n", reinterpret_cast<int>(currentThreadId()), workerIndex); 
    } 

    void DispatchWork(Work work) 
    { 
     emit WorkReceived(work); 
    } 

public slots: 
    void DoWork(Work work) 
    { 
     printf("[%d]: worker %d received work %d\n", reinterpret_cast<int>(currentThreadId()), m_workerIndex, work.m_work); 
     msleep(100); 
     Result result = { work.m_work * 2, m_workerIndex }; 
     emit WorkDone(result); 
    } 

signals: 
    void WorkReceived(Work work); 
    void WorkDone(Result result); 

private: 
    int m_workerIndex; 
}; 

class Master : public QObject 
{ 
    Q_OBJECT 

public: 
    Master(int workerCount) : m_activeWorker(0), m_workerCount(workerCount) 
    { 
     printf("[%d]: creating master thread\n", reinterpret_cast<int>(QThread::currentThreadId())); 
    } 
    ~Master() 
    { 
     std::for_each(m_workers.begin(), m_workers.end(), [](std::unique_ptr<Worker>& worker) 
     { 
      worker->quit(); 
      worker->wait(); 
     }); 
    } 


public slots: 
    void Initialize() 
    { 
     printf("[%d]: initializing master thread\n", reinterpret_cast<int>(QThread::currentThreadId())); 
     for (int workerIndex = 0; workerIndex < m_workerCount; ++workerIndex) 
     { 
      auto worker = new Worker(workerIndex); 
      m_workers.push_back(std::move(std::unique_ptr<Worker>(worker))); 
      connect(worker, SIGNAL(WorkDone(Result)), SLOT(WorkDone(Result))); 
      worker->start(); 
     } 
     m_timer = new QTimer(); 
     m_timer->setInterval(500); 
     connect(m_timer, SIGNAL(timeout()), SLOT(GenerateWork())); 
     m_timer->start(); 
    } 
    void GenerateWork() 
    { 
     Work work = { m_activeWorker }; 
     printf("[%d]: dispatching work %d to worker %d\n", reinterpret_cast<int>(QThread::currentThreadId()), work.m_work, m_activeWorker); 
     m_workers[m_activeWorker]->DispatchWork(work); 
     m_activeWorker = ++m_activeWorker % m_workers.size(); 
    } 
    void WorkDone(Result result) 
    { 
     printf("[%d]: received result %d from worker %d\n", reinterpret_cast<int>(QThread::currentThreadId()), result.m_result, result.m_workerIndex); 
    } 
    void Terminate() 
    { 
     m_timer->stop(); 
     delete m_timer; 
    } 

private: 
    int m_workerCount; 
    std::vector<std::unique_ptr<Worker>> m_workers; 
    int m_activeWorker; 
    QTimer* m_timer; 
}; 

QtThreadExample.cpp:

#include "QtThreadExample.hpp" 
#include <QTimer> 

int main(int argc, char** argv) 
{ 
    qRegisterMetaType<Work>("Work"); 
    qRegisterMetaType<Result>("Result"); 
    QApplication application(argc, argv); 
    QThread masterThread; 
    Master master(5); 
    master.moveToThread(&masterThread); 
    master.connect(&masterThread, SIGNAL(started()), SLOT(Initialize())); 
    master.connect(&masterThread, SIGNAL(terminated()), SLOT(Terminate())); 
    masterThread.start(); 
    // Set a timer to terminate the program after 10 seconds 
    QTimer::singleShot(10 * 1000, &application, SLOT(quit())); 
    application.exec(); 
    masterThread.quit(); 
    masterThread.wait(); 
    printf("[%d]: master thread has finished\n", reinterpret_cast<int>(QThread::currentThreadId())); 
    return 0; 
} 

一般的解決方案是不實際發射從主線程本身就是一個信號,而發出從工作線程的信號 - 這樣你得到一個獨特的信號,並且可以在事件循環中異步處理工作,然後發回線程完成的信號。樣本可以並且應該根據您的需要進行重構,但是總的來說,它演示了Qt中的生產者/消費者模式,將這個想法與索引和信號線程一一對應起來。我正在使用一個普通的定時器在主線程中生成工作(Master::m_timer) - 我想你的情況下,你將使用來自套接字,文件或其他東西的信號生成工作。然後我在一個活動工作線程上調用一個方法,該方法向工作線程的事件循環發出一個信號,開始執行工作,然後發出關於完成的信號。這是一個一般的描述,看樣品,嘗試一下,如果你有任何後續問題,讓我知道。

如果您使用的是Qt對象,我猜這樣做很不錯,但是在消費者/生產者模式的傳統意義上,信號/插槽的東西實際上會讓生活變得更難一些,標準的C++ 11管道和std::condition_variable和主線程調用condition_variable::notify_one()和簡單地等待條件變量的工作線程會更容易,但是Qt有很好的包裝所有I/O的東西。所以試試看,然後決定。

下面的例子中的示例輸出,我想線程記錄表明,需要的效果來實現: enter image description here

還要說明一點,因爲QApplication運行事件循環本身,如果你沒有一個GUI實際上可以讓所有I/O對象和主類都存在於主線程中,並從那裏發出信號,因此不需要單獨的主線程。當然,如果你有一個圖形用戶界面,你不會用這個東西來加重它。

+0

感謝您的回覆,併爲這個令人困惑的問題感到抱歉,我編輯了我的帖子,希望我的問題現在更清楚一點。因爲我不知道如何正確地將信號發送到特定的且只有一個工作線程。 – yonobi

+0

@yonobi,正如我所提到的,其中一種可能性是隻連接那些您想要實際接收信號的線程。然後根據需要執行斷開/重新連接。 –

+0

@yonobi,在這裏(https://forum.qt.io/topic/12999/send-signals-to-a-specified-receiver/2)是一個關於此問題的qt論壇話題,正如你所說的,你想要的是一種針對信號插槽習語本身。如果你想調用顯式線程,爲什麼不簡單地創建一個你可以直接調用的方法,並且這會例如從等待中喚醒線程並做一些工作?你需要有一個跨平臺的解決方案嗎? –