2017-06-03 80 views
2

我有一個程序,維護與定期心跳服務器的連接。每過一段時間,服務器都會停止響應心跳,我必須重新連接。我用一個定時器實現了這一點,如果在n秒後沒有聽到任何響應,將會調用重新連接。每次發生這種情況時,我都會泄漏一個線程,隨着時間的推移,我最終會耗盡線程。Python的select.select泄漏線程

現在,爲了便於重複使用,大量簡化了這個插圖,介紹了延遲後重新連接的方式以及總是會導致線程增加的方式。 如何殺死舊線程/套接字/選擇(可能正在等待recv)?

import socket 
import select 
import threading 

class Connection(): 

    def tick(self): 
     print(threading.active_count()) # this increases every 1s! 
     # ... certain conditions not met/it's been too long, then: 
     self.reconnect() 

    def reconnect(self): 
     self.socket.shutdown(socket.SHUT_WR) 
     self.socket.close() 
     self.timer.cancel() 
     self.connect() 

    def connect(self): 
     self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
     self.socket.connect((IP, TCP_PORT)) 
     self.timer = threading.Timer(1, self.tick) 
     self.timer.start() 
     r,_,_ = select.select([self.socket], [], []) 

if __name__ == '__main__': 
    Connection().connect() 

回答

2

我很確定,它不是select()泄漏任何線程。我們假設select()不會返回,即它永遠阻止。

在這種情況下

  • .tick()從計時器線程調用。在計時器線程內調用.reconnect()
  • .reconnect()關閉現有的套接字。這導致select()調用失敗,IOError「錯誤的文件描述符」(這也是您應該確實修復代碼的原因)。
  • .reconnect()嘗試取消當前計時器。 這不做任何事情,因爲定時器已經觸發(我們目前在定時器功能中!)。
  • .reconnect()來電.connect()那個人建立一個新的計時器,在這裏我們再次去。

所以問題是:這種操作模式在哪裏掛在現有的計時器對象上?那麼,您的所有計時器線程都會被select()調用中的IOError終止。這存儲了異常的每個線程引用。

我的猜測是,這可以防止CPython中的引用計數清理觸發,因此計時器線程只會在垃圾回收期間清理。這是不可靠的,因爲不能保證定時器線程能夠及時清理。

如果您在.connect()的開頭添加import gc; gc.collect(),問題(似乎)就會消失。但是,這是一個非解決方案。

爲什麼不使用timeout參數select()來實現類似的結果,而無需使用定時器線程?

r = [] 
while not r: 
    if self.socket: 
     self.socket.shutdown(socket.SHUT_WR) 
     self.socket.close() 

    self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
    self.socket.connect((IP, TCP_PORT)) 
    # select returns empty lists on timeout 
    r, _, _ = select.select([self.socket], [], [], 1) 

不要忘記設置self.socket = NoneConnection.__init__()這個工作。