0

我處理兩個python隊列。
我的問題的簡短描述:
客戶端通過waiting queue(q1),然後他們(客戶端)之後送達。
waiting queue的大小不能大於N(在我的程序中爲10)。
如果waiting queue變滿,則客戶端傳遞給outside queue(q2,大小20)。如果外部隊列已滿,客戶端將被拒絕並且不能提供服務。
每個離開等待隊列的客戶端都允許來自外部隊列的另一個客戶端加入等待隊列。無法將項目從一個隊列排隊到另一個隊列

使用隊列應該是線程安全的。

下面我大致實現了我想要的。但是我遇到了這個問題 - 在執行serve函數期間將客戶端從隊列外(q1)排入等待隊列(q2)。我想我失去了或忘記了一些重要的東西。我認爲這種說法q1.put(client)永久封鎖,但不知道爲什麼。

import time 
import threading 
from random import randrange 
from Queue import Queue, Full as FullQueue 


class Client(object): 
    def __repr__(self): 
     return '<{0}: {1}>'.format(self.__class__.__name__, id(self)) 


def serve(q1, q2): 
    while True: 
     if not q2.empty(): 
      client = q2.get() 
      print '%s leaved outside queue' % client 
      q1.put(client) 
      print '%s is in the waiting queue' % client 
      q2.task_done() 

     client = q1.get() 
     print '%s leaved waiting queue for serving' % client 
     time.sleep(2) # Do something with client 
     q1.task_done() 


def main(): 
    waiting_queue = Queue(10) 
    outside_queue = Queue(20) 

    for _ in range(2): 
     worker = threading.Thread(target=serve, args=(waiting_queue, outside_queue)) 
     worker.setDaemon(True) 
     worker.start() 

    delays = [randrange(1, 5) for _ in range(100)] 

    # Every d seconds 10 clients enter to the waiting queue 
    for d in delays: 
     time.sleep(d) 
     for _ in range(10): 
      client = Client() 
      try: 
       waiting_queue.put_nowait(client) 
      except FullQueue: 
       print 'Waiting queue is full. Please line up in outside queue.' 
       try: 
        outside_queue.put_nowait(client) 
       except FullQueue: 
        print 'Outside queue is full. Please go out.' 

    waiting_queue.join() 
    outside_queue.join() 
    print 'Done' 

回答

0

最後我找到了解決方案。我檢查文檔更周到 If full() returns True it doesn’t guarantee that a subsequent call to get() will not blockhttps://docs.python.org/2/library/queue.html#Queue.Queue.full

這就是爲什麼q1.full()不是幾個線程可靠。在將項插入隊列和檢查隊列已滿後,我添加了互斥鎖:

class Client(object): 
    def __init__(self, ident): 
     self.ident = ident 

    def __repr__(self): 
     return '<{0}: {1}>'.format(self.__class__.__name__, self.ident) 


def serve(q1, q2, mutex): 
    while True: 
     client = q1.get() 
     print '%s leaved waiting queue for serving' % client 
     time.sleep(2) # Do something with client 
     q1.task_done() 

     with mutex: 
      if not q2.empty() and not q1.full(): 
       client = q2.get() 
       print '%s leaved outside queue' % client 
       q1.put(client) 
       print '%s is in the waiting queue' % client 
       q2.task_done() 


def main(): 
    waiting_queue = Queue(10) 
    outside_queue = Queue(20) 

    lock = threading.RLock() 

    for _ in range(2): 
     worker = threading.Thread(target=serve, args=(waiting_queue, outside_queue, lock)) 
     worker.setDaemon(True) 
     worker.start() 

    # Every 1-5 seconds 10 clients enter to the waiting room 
    i = 1 # Used for unique <int> client's id 
    while True: 
     delay = randrange(1, 5) 
     time.sleep(delay) 
     for _ in range(10): 
      client = Client(i) 
      try: 
       lock.acquire() 
       if not waiting_queue.full(): 
        waiting_queue.put(client) 
       else: 
        outside_queue.put_nowait(client) 
      except FullQueue: 
       # print 'Outside queue is full. Please go out.' 
       pass 
      finally: 
       lock.release() 

      i += 1 

    waiting_queue.join() 
    outside_queue.join() 
    print 'Done' 

現在它運行良好。

相關問題