2011-12-03 114 views
5

因此,我正在做的是編寫一個WSGI流服務,它使用包裝在迭代器中的隊列來實現多播推送。以下是對服務的簡化模型:實現WSGI流服務:(如何檢測客戶端斷開連接)

# this is managed by another thread 
def processor_runner(): 
    generator = SerialMessageGenerator() 
    for message in generator: 
     for client in Processor.connections: 
      client.put(message) 

# this is managed by twisted's wsgi implementation 
def main(environ, start_response): 
    queue = Queue() 
    Processor.connections.append(queue) 
    status = '200 OK' 
    response_headers = [ 
     ('Content-Type', 'application/json'), 
     ('Transfer-Encoding', 'chunked') 
    ] 
    start_response(status, response_headers) 
    return iter(queue.get, None) 

這是工作的偉大與扭曲作爲WSGI服務器(作爲一旁,串行發生器是由一個跨進程隊列連接到所述處理器的單獨的進程) 。我的問題是,如何檢測客戶端何時斷開連接並將其從隊列中刪除?我雖然將隊列添加到客戶端套接字(即套接字,隊列)中,然後在執行put之前檢查套接字是否仍然連接。但是,我並不確切知道從環境中獲得什麼。在我共同攻擊某事之前,有沒有人有過這樣的經驗?

更新

這裏是我最後與去解決:

class IterableQueue(Queue): 

def __init__(self): 
    Queue.__init__(self) # Queue is an old style class 
    ShellProcessor.connections.append(self) 

def __iter__(self): 
    return iter(self.get, None) 

def close(self): 
    self.put(None) 
    self.task_done() 
    ShellProcessor.connections.remove(self) 
+0

此外,隨意,如果您有任何經驗上的WSGI流媒體服務的體系結構和性能進行評論。 – Bashwork

回答

1

迭代器上的扭曲調用.close()如果請求完成或中斷時存在。你可以這樣做:

# ... 
start_response(status, response_headers) 
return ResponseIterator(iter(queue.get, None), 
    on_finish=lambda: Processor.connections.remove(queue)) 

其中ResponseIterator可能是:

class ResponseIterator: 

    def __init__(self, iterator, on_finish=None): 
     self.iterator = iterator 
     self.on_finish = on_finish 

    def __iter__(self): 
     return self 

    def next(self): 
     return next(self.iterator) 

    def close(self): 
     if self.on_finish is not None: 
     self.on_finish() 
+0

不錯,我不知道它在迭代器上叫做close。太棒了! – Bashwork

相關問題