2014-09-29 50 views
0

我正在開發Python中的一個小型irc客戶端(版本2.7)。我希望利用多從我目前連接到所有的服務器閱讀,但我遇到了一個問題,使用多處理從多個套接字獲取信息

import socket 
import multiprocessing as mp 
import types 
import copy_reg 
import pickle 


def _pickle_method(method): 
    func_name = method.im_func.__name__ 
    obj = method.im_self 
    cls = method.im_class 
    return _unpickle_method, (func_name, obj, cls) 

def _unpickle_method(func_name, obj, cls): 
    for cls in cls.mro(): 
     try: 
      func = cls.__dict__[func_name] 
     except KeyError: 
      pass 
     else: 
      break 
    return func.__get__(obj, cls) 

copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method) 

class a(object): 

    def __init__(self): 
     sock1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
     sock1.connect((socket.gethostbyname("example.com"), 6667)) 
     self.servers = {} 
     self.servers["example.com"] = sock1 

    def method(self, hostname): 
     self.servers[hostname].send("JOIN DAN\r\n") 
     print "1" 

    def oth_method(self): 
     pool = mp.Pool() 
     ## pickle.dumps(self.method) 
     pool.map(self.method, self.servers.keys()) 
     pool.close() 
     pool.join() 

if __name__ == "__main__": 
    b = a() 
    b.oth_method() 

當它擊中線pool.map(self.method, self.servers.keys())我得到的錯誤

TypeError: expected string or Unicode object, NoneType found 

從我讀過的內容來看,當我嘗試醃製不可挑剔的東西時會發生什麼。爲了解決這個問題,我首先製作了_pickle_method_unpickle_method,如here所述。然後我意識到我(最初)試圖通過pool.map()套接字列表(非常不可用),因此我將其更改爲主機名列表,因爲可以對字符串進行酸洗。不過,我仍然遇到這個錯誤。

然後我試着直接撥打pickle.dumps()self.method,self.servers.keys()self.servers.keys()[0]。正如預期的那樣,它的工作罰款後兩者,但是從我第一次拿到

TypeError: a class that defines __slots__ without defining __getstate__ cannot be pickled. 

一些更多的研究,導致我this question,這似乎表明,這個問題與使用插座(和gnibbler's answer這個問題似乎證實了這一點)。

有沒有一種方法可以實際爲此使用多處理?從我已經(非常簡要地)閱讀pathos.multiprocessing可能是我需要的,但我真的很想堅持標準庫,如果可能的話。

我也沒有設置使用多處理 - 如果多線程會更好地工作,並避免這個問題,那麼我比這些解決方案更開放。

+0

你是否真的試圖將一個套接字傳遞給子進程,還是隻是你試圖避免的偶然發生的事情?對於前者,你需要遷移套接字,這必須在比Python酸洗更低的層次上完成,並且對於每個平臺都是不同的,因爲在封面之下,套接字只是文件描述符的包裝,而你需要操作系統使相同的文件描述符意味着您的子進程中使用相同的套接字。 – abarnert 2014-09-29 08:20:39

+0

同時,您是否有理由在第一時間使用多處理而不是多線程呢?「從一堆服務器中讀取」就像你可以得到I/O約束的範例一樣,這正是線程的優點。 – abarnert 2014-09-29 08:21:50

+0

不,我將子進程的字符串鍵傳遞給引用套接字的字典。子進程然後使用字符串鍵來訪問套接字,做套接字的東西,然後返回。我使用多處理而不是多線程的原因是因爲我是多新的東西,而且我讀到python中的線程速度很慢。話雖如此,我對多線程解決方案非常開放 – Dannnno 2014-09-29 08:22:41

回答

3

您的根本問題是您無法將套接字傳遞給子進程。簡單的解決方案是使用線程代替。

更詳細地:


酸洗綁定方法需要酸洗三兩件事:函數名,對象和類。 (我認爲multiprocessing會自動爲你做這件事,但是你手動做,這很好。)要醃製對象,你必須醃製它的成員,在你的情況下,它包括一個字典,其值是套接字。

您不能在Python 2.x中使用默認酸洗協議來使用套接字。鏈接問題的答案解釋了原因,並提供了簡單的解決方法:不要使用默認的酸洗協議。但socket還有一個額外的問題;它只是一個圍繞在C擴展模塊中定義的類型的包裝,它在酸洗時有它自己的問題。你也許能夠解決這個問題...

但是,這仍然沒有幫助。在封面之下,C擴展類本身就是一個文件描述符的包裝。文件描述符只是一個數字。您的操作系統將文件描述符映射爲打開每個進程的套接字(以及文件和管道等);一個進程中的文件#4不是另一個進程中的文件#4。因此,您需要實際將套接字的文件描述符遷移到操作系統級別的子級。這不是一件簡單的事情,它在每個平臺上都有所不同。而且,當然,除了遷移文件描述符之外,還必須傳遞足夠的信息來重新構造socket對象。所有這些都是可行的。甚至可能會有一個圖書館爲你包裝它。但這並不容易。


另一種可能性是在啓動任何孩子之前打開所有的套接字,並將它們設置爲由孩子們繼承。但是,即使你可以重新設計你的代碼來完成這些事情,這隻能在POSIX系統上運行,而不能在Windows上運行。


更簡單的可能性是隻使用線程而不是進程。如果你正在做CPU綁定的工作,線程在Python中有問題(CPython,你幾乎可以肯定使用的實現),因爲全局解釋器鎖可以防止兩個線程同時解釋代碼。但是當你的線程花費所有時間在socket.recv和類似的I/O調用上等待時,使用線程沒有問題。他們避免了酸洗數據和遷移套接字等所有開銷和複雜性。

您可能會注意到threading模塊沒有很好的Pool類,例如multiprocessing。但令人驚訝的是, stdlib中的一個線程池類 - 它只是在multiprocessing中。您可以訪問它作爲multiprocessing.dummy.Pool

如果你願意超越stdlib,Python 3的concurrent.futures模塊有一個名爲futures的backport,你可以在PyPI上安裝它。它包括一個ThreadPoolExecutor這是一個稍微更高層次的抽象,可能更容易使用。但Pool也應該在這裏適合你,並且你已經編寫了代碼。

+0

謝謝,這是有道理的。我會在早上看看它,看看我能不能讓它工作 – Dannnno 2014-09-29 08:41:27

1

如果想嘗試跳出標準庫,那麼對於pathos.multiprocessing下面的代碼(你提到)不應該拋出酸洗錯誤,因爲dill串行知道如何序列套接字和文件句柄。然而

>>> import socket 
>>> import pathos.multiprocessing as mp 
>>> import types 
>>> import dill as pickle 
>>> 
>>> class a(object): 
... def __init__(self): 
...  sock1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
...  sock1.connect((socket.gethostbyname("example.com"), 6667)) 
...  self.servers = {} 
...  self.servers["example.com"] = sock1 
... def method(self, hostname): 
...  self.servers[hostname].send("JOIN DAN\r\n") 
...  print "1" 
... def oth_method(self): 
...  pool = mp.ProcessingPool() 
...  pool.map(self.method, self.servers.keys()) 
...  pool.close() 
...  pool.join() 
... 
>>> b = a() 
>>> b.oth_method() 

的一個問題是,你需要用系列化multiprocessing,而且在許多情況下,插座會序列,使反序列化的套接字被關閉。原因主要是因爲文件描述符未按預期複製,所以通過引用複製。使用dill,您可以自定義文件句柄的序列化,以便傳輸內容而不是使用引用...但是,這對於套接字(至少在此刻)沒有很好的轉換。

我是dillpathos作家,我有,你可能不想與multiprocessing(至少不是存儲地圖服務器,插座)要做到這一點@abarnert同意。如果您想使用multiprocessing's線程接口,並且您發現遇到任何序列化問題,pathos.multiprocessing確實有mp.ThreadingPool()而不是mp.ProcessingPool(),因此您可以訪問multiprocessing.dummy.Pool的包裝,但仍可獲得pathos提供的附加功能(例如多阻塞或異步管道和映射的參數池等)。

+0

我很好奇你是如何序列化文件句柄的。它不像'unix_sock.sendmsg(sock)'和'sock.share(pid)'完全難以編寫,但將它們包裝在進程池接口中似乎有點讓人頭疼。 (另外,在Windows上處理套接字和文件是不同的事情......)但是這可能是在這裏繼續的話題,所以我會去下載你的模塊並讀取它。 :) – abarnert 2014-09-29 19:44:41

+0

@abarnert:沒有什麼聰明的插座了。有幾個選項(最近添加在https://github.com/uqfoundation/dill上,而不是在當前版本中)用於處理序列化文件......並且它需要在Windows上進行測試。我知道Windows和任何其他操作系統之間的區別。 'multiprocessing'的分支非常簡單,我只需替換序列化程序並在頂部添加一個小小的說服層,例如,允許'map'採用多個參數。 – 2014-09-29 22:42:48