2013-07-05 59 views
4

我想從Queue.Queue或TCP套接字中讀取消息,以先到者爲準。 如何在不使用2個線程的情況下實現? 平臺是CPython的2.7.5在WindowsPython - 如何在同一時間在隊列和套接字上等待

+0

你不能...輕鬆。類似於'select(2)'的東西可以用於兩個阻塞套接字,但不適用於「隊列」。如果可以接受的話,可以得到最接近的就是在循環中使用非阻塞方法。 – Aya

+0

這是個壞消息。任何建議如何解決這個問題? – GabiMe

回答

1

做到在一個單獨的線程,你將不得不使用非阻塞方法,並將它們合併到單個事件循環中。實際上,我使用select代替非阻塞套接字的I/O在這裏,因爲它是,如果你需要從多個插槽讀取稍微乾淨...

import socket 
import select 
import Queue 
import time 

TIMEOUT = 0.1 # 100ms 


def process_queue_item(item): 
    print 'Got queue item: %r' % item 


def process_socket_data(data): 
    print 'Got socket data: %r' % data 


def main(): 

    # Build queue 
    queue = Queue.Queue() 
    for i in range(10): 
     queue.put(i) 
    queue.put(None) # Using None to indicate no more data on queue 
    queue_active = True 

    # Build socket 
    sock = socket.socket() 
    sock.connect(('www.google.com', 80)) 
    sock.send('GET/HTTP/1.0\r\n\r\n') 
    socket_active = True 

    # Main event loop 
    while 1: 

     # If there's nothing to read, bail out 
     if not (socket_active or queue_active): 
      break 

     # By default, sleep at the end of the loop 
     do_sleep = True 

     # Get data from socket without blocking if possible 
     if socket_active: 
      r, w, x = select.select([sock], [], [], TIMEOUT) 
      if r: 
       data = sock.recv(64) 
       if not data: # Hit EOF 
        socket_active = False 
       else: 
        do_sleep = False 
        process_socket_data(data) 

     # Get item from queue without blocking if possible 
     if queue_active: 
      try: 
       item = queue.get_nowait() 
       if item is None: # Hit end of queue 
        queue_active = False 
       else: 
        do_sleep = False 
        process_queue_item(item) 
      except Queue.Empty: 
       pass 

     # If we didn't get anything on this loop, sleep for a bit so we 
     # don't max out CPU time 
     if do_sleep: 
      time.sleep(TIMEOUT) 


if __name__ == '__main__': 
    main() 

輸出看起來像......

Got socket data: 'HTTP/1.0 302 Found\r\nLocation: http://www.google.co.uk/\r\nCache-Co' 
Got queue item: 0 
Got socket data: 'ntrol: private\r\nContent-Type: text/html; charset=UTF-8\r\nSet-Cook' 
Got queue item: 1 
Got socket data: 'ie: PREF=ID=a192ab09b4c13176:FF=0:TM=1373055330:LM=1373055330:S=' 
Got queue item: 2 
etc. 
+1

每秒鐘迭代10次左右,並進行輪詢。不是非常有效,但我想沒有其他辦法。如何使用2個線程並將數據推送到單個隊列?這是更好的方法嗎? – GabiMe

+0

@GabiMe你可以調整'TIMEOUT'值來品嚐。如果您準備使用多個線程,則有很多選項。如果每個線程都可以處理自己收到的數據,那麼我只需要將每個阻塞調用放在它自己的線程中。 – Aya

0

你可以做下面幾行內容:

def check_for_message(queue,socket,sock_accept_size=512): 
    socket.setblocking(0) 
    while True: 
     try: 
      sock_msg=socket.recv(sock_accept_size) 
     except socket.error: 
      """Do stuff if there is no message""" 
      sock_msg=None 
     try: 
      que_msg=queue.get() 
     except Queue.Empty: 
      """Do stuff if there is no message""" 
      que_msg=None 
     yield (que_msg,sock_msg) 

然後你就可以使用遍歷它:

for que_message,sock_message in check_for_message(que_instance,socket_instance): 
    print que_message,sock_message 
2

有一個很好的技巧來做到這一點here適用於您的問題。

import queue 
import socket 
import os 

class PollableQueue(queue.Queue): 
    def __init__(self): 
     super().__init__() 
     # Create a pair of connected sockets 
     if os.name == 'posix': 
      self._putsocket, self._getsocket = socket.socketpair() 
     else: 
      # Compatibility on non-POSIX systems 
      server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
      server.bind(('127.0.0.1', 0)) 
      server.listen(1) 
      self._putsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
      self._putsocket.connect(server.getsockname()) 
      self._getsocket, _ = server.accept() 
      server.close() 

    def fileno(self): 
     return self._getsocket.fileno() 

    def put(self, item): 
     super().put(item) 
     self._putsocket.send(b'x') 

    def get(self): 
     self._getsocket.recv(1) 
     return super().get() 
相關問題