2016-10-25 46 views
2

我們公司正在用C++ 11重寫大部分遺留C代碼。 (這也意味着我是一名C程序員學習C++)。我需要關於消息處理程序的建議。C++中高效的消息工廠和處理程序

我們有分佈式系統 - 服務器進程通過TCP向客戶端進程發送打包消息。

在C代碼這是正在做: - 解析基於類型和子類型的消息,其是總是首先2個字段

- call a handler as handler[type](Message *msg) 

- handler creates temporary struct say, tmp_struct to hold the parsed values and .. 

- calls subhandler[type][subtype](tmp_struct) 

沒有每個類型/子類型只有一個處理程序。

移到C++ 11和多線程環境。我的基本想法是 -

1)爲每個類型/子類型組合註冊一個處理器對象。這是
實際上矢量的矢量 - 矢量<矢量>

class MsgProcessor { 

    // Factory function 
    virtual Message *create(); 
    virtual Handler(Message *msg) 
} 

這將通過不同的消息處理器

class AMsgProcessor : public MsgProcessor { 

     Message *create() override(); 
     handler(Message *msg); 
} 

2)被繼承獲取使用查找處理器進入向量的向量。 使用重載的create()工廠函數獲取消息。 這樣我們就可以保留消息中的實際消息和解析值。

3)現在有點破解,這個消息應該發送到其他線程進行繁重的處理。爲了避免再次在向量中查找,在消息中添加了一個指向proc的指針。

class Message { 
    const MsgProcessor *proc; // set to processor, 
           // which we got from the first lookup 
           // to get factory function. 
}; 

所以其他線程,將只是做

Message->proc->Handler(Message *); 

這看起來不錯,但希望是,這將有助於從工廠單獨的消息處理程序。這是爲了這種情況,當多個類型/子類型想要創建相同的消息,但處理方式不同。

我正在尋找這件事,碰上了:

http://www.drdobbs.com/cpp/message-handling-without-dependencies/184429055?pgno=1

它提供了一種完全的消息從處理程序中分離出來。但我想知道我的上述簡單方案是否會被認爲是可接受的設計。這也是實現我想要的錯誤方式嗎?

速度方面的效率是該應用程序中最重要的要求。我們已經在做一些內存Jumbs => 2個向量+虛擬函數調用創建消息。有兩種方法可以到達處理程序,這從緩存的角度來看並不好,我猜。

+0

沒有冒犯我認爲你沒有清楚描述你的用例,然後跳到你的設計如此之快......至少你可能需要解決對於什麼樣的負載性能的期望?你介意寫一個簡短的介紹嗎? – DAG

+0

我不知道你的意思是加載。在我們的系統中,我們必須每秒處理200k條消息。沒有進入實際的產品細節。服務器將短的配置消息發送到客戶端。配置消息帶有一個類型,子類型和數據。有一個很大的沒有。類型/亞型組合 - 約1000個。這些被不同的編號消耗掉。的模塊。所以爲所有模塊註冊和接收消息提供一個乾淨的界面非常重要。 – MGH

+0

你預計何時會調用'create'方法?什麼是返回的'消息*'?這將是一個空的消息,你會填充解析的數據? – Arunmu

回答

1

雖然您的要求不清楚,但我認爲我的設計可能就是您要找的。

檢查出http://coliru.stacked-crooked.com/a/f7f9d5e7d57e6261爲完全成熟的示例。

它具有以下組件:

  1. 爲消息處理器IMessageProcessor的接口類。
  2. 表示消息的基類。 Message
  3. 一種登記類,它本質上是用於存儲對應於(類型,子類型)對中的消息處理器一個單。 Registrator。它將映射存儲在unordered_map中。您也可以調整它以獲得更好的性能。所有暴露的API Registratorstd::mutex保護。
  4. MessageProcessor的具體實現。 AMsgProcessorBMsgProcessor
  5. simulate功能,以顯示它如何融合在一起。

粘貼代碼在這裏還有:

/* 
* http://stackoverflow.com/questions/40230555/efficient-message-factory-and-handler-in-c 
*/ 

#include <iostream> 
#include <vector> 
#include <tuple> 
#include <mutex> 
#include <memory> 
#include <cassert> 
#include <unordered_map> 

class Message; 

class IMessageProcessor 
{ 
public: 
    virtual Message* create() = 0; 
    virtual void handle_message(Message*) = 0; 
    virtual ~IMessageProcessor() {}; 
}; 

/* 
* Base message class 
*/ 
class Message 
{ 
public: 
    virtual void populate() = 0; 
    virtual ~Message() {}; 
}; 

using Type = int; 
using SubType = int; 
using TypeCombo = std::pair<Type, SubType>; 
using IMsgProcUptr = std::unique_ptr<IMessageProcessor>; 

/* 
* Registrator class maintains all the registrations in an 
* unordered_map. 
* This class owns the MessageProcessor instance inside the 
* unordered_map. 
*/ 
class Registrator 
{ 
public: 
    static Registrator* instance(); 

    // Diable other types of construction 
    Registrator(const Registrator&) = delete; 
    void operator=(const Registrator&) = delete; 

public: 
    // TypeCombo assumed to be cheap to copy 
    template <typename ProcT, typename... Args> 
    std::pair<bool, IMsgProcUptr> register_proc(TypeCombo typ, Args&&... args) 
    { 
    auto proc = std::make_unique<ProcT>(std::forward<Args>(args)...); 
    bool ok; 
    { 
     std::lock_guard<std::mutex> _(lock_); 
     std::tie(std::ignore, ok) = registrations_.insert(std::make_pair(typ, std::move(proc))); 
    } 
    return (ok == true) ? std::make_pair(true, nullptr) : 
          // Return the heap allocated instance back 
          // to the caller if the insert failed. 
          // The caller now owns the Processor 
          std::make_pair(false, std::move(proc)); 
    } 

    // Get the processor corresponding to TypeCombo 
    // IMessageProcessor passed is non-owning pointer 
    // i.e the caller SHOULD not delete it or own it 
    std::pair<bool, IMessageProcessor*> processor(TypeCombo typ) 
    { 
    std::lock_guard<std::mutex> _(lock_); 

    auto fitr = registrations_.find(typ); 
    if (fitr == registrations_.end()) { 
     return std::make_pair(false, nullptr); 
    } 
    return std::make_pair(true, fitr->second.get()); 
    } 

    // TypeCombo assumed to be cheap to copy 
    bool is_type_used(TypeCombo typ) 
    { 
    std::lock_guard<std::mutex> _(lock_); 
    return registrations_.find(typ) != registrations_.end(); 
    } 

    bool deregister_proc(TypeCombo typ) 
    { 
    std::lock_guard<std::mutex> _(lock_); 
    return registrations_.erase(typ) == 1; 
    } 

private: 
    Registrator() = default; 

private: 
    std::mutex lock_; 
    /* 
    * Should be replaced with a concurrent map if at all this 
    * data structure is the main contention point (which I find 
    * very unlikely). 
    */ 
    struct HashTypeCombo 
    { 
    public: 
    std::size_t operator()(const TypeCombo& typ) const noexcept 
    { 
     return std::hash<decltype(typ.first)>()(typ.first)^
      std::hash<decltype(typ.second)>()(typ.second); 
    } 
    }; 

    std::unordered_map<TypeCombo, IMsgProcUptr, HashTypeCombo> registrations_; 
}; 

Registrator* Registrator::instance() 
{ 
    static Registrator inst; 
    return &inst; 
    /* 
    * OR some other DCLP based instance creation 
    * if lifetime or creation of static is an issue 
    */ 
} 


// Define some message processors 

class AMsgProcessor final : public IMessageProcessor 
{ 
public: 
    class AMsg final : public Message 
    { 
    public: 
    void populate() override { 
     std::cout << "Working on AMsg\n"; 
    } 

    AMsg() = default; 
    ~AMsg() = default; 
    }; 

    Message* create() override 
    { 
    std::unique_ptr<AMsg> ptr(new AMsg); 
    return ptr.release(); 
    } 

    void handle_message(Message* msg) override 
    { 
    assert (msg); 
    auto my_msg = static_cast<AMsg*>(msg); 

    //.... process my_msg ? 
    //.. probably being called in some other thread 
    // Who owns the msg ?? 
    (void)my_msg; // only for suppressing warning 

    delete my_msg; 

    return; 
    } 

    ~AMsgProcessor(); 
}; 

AMsgProcessor::~AMsgProcessor() 
{ 
} 

class BMsgProcessor final : public IMessageProcessor 
{ 
public: 
    class BMsg final : public Message 
    { 
    public: 
    void populate() override { 
     std::cout << "Working on BMsg\n"; 
    } 

    BMsg() = default; 
    ~BMsg() = default; 
    }; 

    Message* create() override 
    { 
    std::unique_ptr<BMsg> ptr(new BMsg); 
    return ptr.release(); 
    } 

    void handle_message(Message* msg) override 
    { 
    assert (msg); 
    auto my_msg = static_cast<BMsg*>(msg); 

    //.... process my_msg ? 
    //.. probably being called in some other thread 
    //Who owns the msg ?? 
    (void)my_msg; // only for suppressing warning 

    delete my_msg; 

    return; 
    } 

    ~BMsgProcessor(); 
}; 

BMsgProcessor::~BMsgProcessor() 
{ 
} 


TypeCombo read_from_network() 
{ 
    return {1, 2}; 
} 


struct ParsedData { 
}; 

Message* populate_message(Message* msg, ParsedData& pdata) 
{ 
    // Do something with the message 
    // Calling a dummy populate method now 
    msg->populate(); 
    (void)pdata; 
    return msg; 
} 

void simulate() 
{ 
    TypeCombo typ = read_from_network(); 
    bool ok; 
    IMessageProcessor* proc = nullptr; 

    std::tie(ok, proc) = Registrator::instance()->processor(typ); 
    if (!ok) { 
    std::cerr << "FATAL!!!" << std::endl; 
    return; 
    } 

    ParsedData parsed_data; 
    //..... populate parsed_data here .... 

    proc->handle_message(populate_message(proc->create(), parsed_data)); 
    return; 
} 


int main() { 

    /* 
    * TODO: Not making use or checking the return types after calling register 
    * its a must in production code!! 
    */ 
    // Register AMsgProcessor 
    Registrator::instance()->register_proc<AMsgProcessor>(std::make_pair(1, 1)); 
    Registrator::instance()->register_proc<BMsgProcessor>(std::make_pair(1, 2)); 

    simulate(); 

    return 0; 
} 

更新1

混亂的主要來源,在這裏似乎是因爲即使系統的體系結構是未知的。

任何自尊的事件系統架構將類似於下面:

  1. 線程輪詢的插座描述符池。
  2. 用於處理計時器相關事件的線程池。
  3. 比較小的數量(取決於應用程序)的線程做長時間阻塞的工作。

所以,你的情況:

  1. 您將得到線程執行epoll_waitselectpoll的網絡事件。
  2. 完全讀取數據包,並使用Registrator::get_processor呼叫得到處理。 注意get_processor呼叫可以在沒有任何鎖定進行,如果可以保證基本unordered_map並未修改即會作出任何新的插入,一旦我們開始接收事件。
  3. 使用所獲得的處理器,我們可以得到Message,並填充它。
  4. 現在,這是我不確定你想如何的部分。在這一點上,我們有processor上,您可以撥打handle_message無論是從當前線程即這是做epoll_wait或通過發佈任務(處理器和消息)到線程接收隊列其分派到另一個線程的線程。
+0

感謝您的回答。我有一個指向處理器的原因是,我在問題中提到了這個問題,我們需要在不同的線程中調用處理程序。接收和解析後的消息被髮送到其他線程進行處理。處理非常耗時,因爲它涉及大量的PCI寫操作等。我們可能已經設計了所有線程接收消息並處理它,但這需要某種鎖定。因此,我們將其設計爲一條管線。你認爲在消息中保持指針指向proc是否可行? – MGH

+0

這創建了一種循環依賴,處理器應該看到消息和消息inturn應該看到處理器。可能用前向聲明解決。在我最初的代碼中,我讓每個模塊實例化處理器。處理器的基類,例如IMessageProcessor,將自己添加到註冊器中。註冊/取消註冊將通過新建/刪除來完成。這是我不必處理派生類的構造函數參數。你通過使用可變參數模板來解決它。 – MGH

+0

我認爲,我的計劃的缺點是註冊錯誤必須作爲例外而不是返回類型來處理。你對這兩個計劃有評論嗎? – MGH