2011-06-23 83 views
3

gtk應用程序中,所有執行都發生在gtk_main函數中。而其他圖形框架工程具有類似的事件循環,如app.exec對於QTclutter_main對於Clutter。但是ZeroMQ基於假設存在插入的while (1) ...循環(例如,請參閱here的示例)。如何在GTK/QT/Clutter應用程序中使用ZeroMQ?

你如何結合這兩種執行策略?

我目前想在用C寫的一個混亂的應用程序使用zeromq,所以我當然會喜歡直接回答是的,但請加答案的其他變體也是如此。

回答

4

這聽起來像ZeroMQ代碼要簡單地被執行了一遍又一遍,儘可能多。最簡單的方法是將ZeroMQ代碼放入空閒函數或超時函數中,並使用非阻塞版本的函數(如果存在)。

雜波,你會使用clutter_threads_add_idle()clutter_threads_add_timeout()。對於GTK,您可以使用g_idle_add()g_timeout_add()

更困難但可能更好的方法是使用g_thread_create()爲ZeroMQ代碼創建一個單獨的線程,並且只使用while(1)構造和阻塞函數。如果你這樣做了,你還必須找到一些線程相互通信的方式--GLib的互斥鎖和異步隊列通常都可以。

+2

會在空閒定時器,結果不投票zeromq在100%的CPU使用率? –

+2

在一個空閒函數中,可能是的,至少當GTK主循環沒有做其他事情時。在超時功能中,不。 – ptomato

2

我發現有一個叫Zeromqt QT集成庫。望着源,一體化的核心是:

ZmqSocket::ZmqSocket(int type, QObject *parent) : QObject(parent) 
{ 
    ... 
    notifier_ = new QSocketNotifier(fd, QSocketNotifier::Read, this); 
    connect(notifier_, SIGNAL(activated(int)), this, SLOT(activity())); 
} 

... 

void ZmqSocket::activity() 
{ 
    uint32_t flags; 
    size_t size = sizeof(flags); 
    if(!getOpt(ZMQ_EVENTS, &flags, &size)) { 
     qWarning("Error reading ZMQ_EVENTS in ZMQSocket::activity"); 
     return; 
    } 
    if(flags & ZMQ_POLLIN) { 
     emit readyRead(); 
    } 
    if(flags & ZMQ_POLLOUT) { 
     emit readyWrite(); 
    } 
    ... 
} 

因此,它是依靠QT的集成插座處理和雜波不會有類似的東西。

+1

您可能還想看看[nzmqt](https://github.com/jonnydee/nzmqt) - ZeroMQ的另一個Qt綁定。你會發現一個基於輪詢的實現。特別要看看班級[PollingZMQSocket(line 429 ++)](https://github.com/jonnydee/nzmqt/blob/7f3c54c2d3055769df157d01319a333d61cba1f1/include/nzmqt/nzmqt.hpp#L497)。也許你可以爲Clutter做些類似的事情。 –

2

你可以得到0MQ插座(ZMQ_FD選項)文件描述符和整合,以你的事件循環。我認爲gtk有一些處理套接字的機制。

10

到ZMQ和GTK或雜波相結合的正確方法是將ZMQ隊列的文件描述符連接到主事件循環。在FD可以通過使用

int fd; 
size_t sizeof_fd = sizeof(fd); 
if(zmq_getsockopt(socket, ZMQ_FD, &fd, &sizeof_fd)) 
     perror("retrieving zmq fd"); 

它連接到主循環使用io_add_watch此事進行檢索:

GIOChannel* channel = g_io_channel_unix_new(fd);  
g_io_add_watch(channel, G_IO_IN|G_IO_ERR|G_IO_HUP, callback_func, NULL); 

在回調函數,就必須先檢查是否有真的東西在閱讀之前閱讀。否則,該功能可能會阻止等待IO。

gboolean callback_func(GIOChannel *source, GIOCondition condition,gpointer data) 
{ 
    uint32_t status; 
    size_t sizeof_status = sizeof(status); 

    while (1){ 
     if (zmq_getsockopt(socket, ZMQ_EVENTS, &status, &sizeof_status)) { 
      perror("retrieving event status"); 
      return 0; // this just removes the callback, but probably 
         // different error handling should be implemented 
     } 
     if (status & ZMQ_POLLIN == 0) { 
      break; 
     } 

     // retrieve one message here 
    } 
    return 1; // keep the callback active 
} 

請注意:這是沒有實際測試過,我做了一個翻譯用Python +雜波,這是我用,但我敢肯定它會工作。 僅供參考,下面是完整的Python + Clutter代碼,其實際工作。

import sys 
from gi.repository import Clutter, GObject 
import zmq 

def Stage(): 
    "A Stage with a red spinning rectangle" 
    stage = Clutter.Stage() 

    stage.set_size(400, 400) 
    rect = Clutter.Rectangle() 
    color = Clutter.Color() 
    color.from_string('red') 
    rect.set_color(color) 
    rect.set_size(100, 100) 
    rect.set_position(150, 150) 

    timeline = Clutter.Timeline.new(3000) 
    timeline.set_loop(True) 

    alpha = Clutter.Alpha.new_full(timeline, Clutter.AnimationMode.EASE_IN_OUT_SINE) 
    rotate_behaviour = Clutter.BehaviourRotate.new(
     alpha, 
     Clutter.RotateAxis.Z_AXIS, 
     Clutter.RotateDirection.CW, 
     0.0, 359.0) 
    rotate_behaviour.apply(rect) 
    timeline.start() 
    stage.add_actor(rect) 

    stage.show_all() 
    stage.connect('destroy', lambda stage: Clutter.main_quit()) 
    return stage, rotate_behaviour 

def Socket(address): 
    ctx = zmq.Context() 
    sock = ctx.socket(zmq.SUB) 
    sock.setsockopt(zmq.SUBSCRIBE, "") 
    sock.connect(address) 
    return sock 

def zmq_callback(queue, condition, sock): 
    print 'zmq_callback', queue, condition, sock 

    while sock.getsockopt(zmq.EVENTS) & zmq.POLLIN: 
     observed = sock.recv() 
     print observed 

    return True 

def main(): 
    res, args = Clutter.init(sys.argv) 
    if res != Clutter.InitError.SUCCESS: 
     return 1 

    stage, rotate_behaviour = Stage() 

    sock = Socket(sys.argv[2]) 
    zmq_fd = sock.getsockopt(zmq.FD) 
    GObject.io_add_watch(zmq_fd, 
         GObject.IO_IN|GObject.IO_ERR|GObject.IO_HUP, 
         zmq_callback, sock) 

    return Clutter.main() 

if __name__ == '__main__': 
    sys.exit(main()) 
+0

請注意,對於io_add_watch回調而言,重要的是必須返回True。沒有這個,回調只會被調用一次。 –

1

這是Python中的一個例子,使用PyQt4。它來源於一個工作應用程序。

import zmq 
from PyQt4 import QtCore, QtGui 

class QZmqSocketNotifier(QtCore.QSocketNotifier): 
    """ Provides Qt event notifier for ZMQ socket events """ 
    def __init__(self, zmq_sock, event_type, parent=None): 
     """ 
     Parameters: 
     ---------- 
     zmq_sock : zmq.Socket 
      The ZMQ socket to listen on. Must already be connected or bound to a socket address. 
     event_type : QtSocketNotifier.Type 
      Event type to listen for, as described in documentation for QtSocketNotifier. 
     """ 
     super(QZmqSocketNotifier, self).__init__(zmq_sock.getsockopt(zmq.FD), event_type, parent) 

class Server(QtGui.QFrame): 

def __init__(self, topics, port, mainwindow, parent=None): 
    super(Server, self).__init__(parent) 

    self._PORT = port 

    # Create notifier to handle ZMQ socket events coming from client 
    self._zmq_context = zmq.Context() 
    self._zmq_sock = self._zmq_context.socket(zmq.SUB) 
    self._zmq_sock.bind("tcp://*:" + self._PORT) 
    for topic in topics: 
     self._zmq_sock.setsockopt(zmq.SUBSCRIBE, topic) 
    self._zmq_notifier = QZmqSocketNotifier(self._zmq_sock, QtCore.QSocketNotifier.Read) 

    # connect signals and slots 
    self._zmq_notifier.activated.connect(self._onZmqMsgRecv) 
    mainwindow.quit.connect(self._onQuit) 

@QtCore.pyqtSlot() 
def _onZmqMsgRecv(): 
    self._test_info_notifier.setEnabled(False) 
    # Verify that there's data in the stream 
    sock_status = self._zmq_sock.getsockopt(zmq.EVENTS) 
    if sock_status == zmq.POLLIN: 
     msg = self._zmq_sock.recv_multipart() 
     topic = msg[0] 
     callback = self._topic_map[ topic ] 
     callback(msg) 
    self._zmq_notifier.setEnabled(True) 
    self._zmq_sock.getsockopt(zmq.EVENTS) 

def _onQuit(self): 
    self._zmq_notifier.activated.disconnect(self._onZmqMsgRecv) 
    self._zmq_notifier.setEnabled(False) 
    del self._zmq_notifier 
    self._zmq_context.destroy(0) 

禁用,然後重新啓用通知在_on_ZmqMsgRecv是每對QSocketNotifier的文檔。

對於getsockopt的最終調用是有必要的。否則,通知程序在第一個事件之後停止工作。實際上我會爲此發佈一個新問題。有誰知道爲什麼這是必要的?

請注意,如果你不破壞ZMQ上下文之前發出通知,你可能會當您退出應用程序獲得類似這樣的錯誤:

QSocketNotifier: Invalid socket 16 and type 'Read', disabling... 
相關問題