2013-12-20 67 views
8

假設我有看起來像這樣的Python流的處理代碼:並行化發生器的序列

def F1(stream): 
    for x in stream: 
     yield f1(x) 

def F2(stream): 
    for x in stream: 
     yield f2(x) 

def F3(stream): 
    for x in stream: 
     yield f3(x) 

def F4(stream): 
    for x in stream: 
     yield f4(x) 


for x in F4(F3(F2(F1(range(1000000))))): 
    print(x) 

這大致相當於range 1000000 | F1 | F2 | F3 | F4在UNIX(假設range命令),但在UNIX在每個過程管道並行運行。

有沒有簡單的方法來並行化Python代碼?

+0

注:我見過http://stackoverflow.com/questions/5684992/how-can-i-parallelize-a-pipeline-of-generators-iterators-in-python,但我的實際使用情況比簡單的「映射」更復雜一些:在處理值時,「F1」,「F2」,「F3」和「F4」中的每一個實際上都會積累一些狀態。 –

+0

如果任何人有興趣看到完整的代碼,它是在http://pit-claudel.fr/clement/blog/an-experimental-estimation-of-the-entropy-of-english-in-50-lines-of -python-code/ –

+1

如果他們累積狀態,你如何保證它甚至可以並行化它們? – BrenBarn

回答

3

你需要管道和blackmagic,Python有兩個。

from multiprocessing import Process, Pipe 


def F1(stream): 
    for x in stream: 
     yield str(x)+'a' 

def F2(stream): 
    for x in stream: 
     yield x+'b' 

def F3(stream): 
    for x in stream: 
     yield x+'c' 

def F4(stream): 
    for x in stream: 
     yield x+'d' 



class PIPE_EOF: 
    pass 

class IterableConnection(object): 
    def __init__(self, pipe): 
     self.pipe = pipe 

    def __iter__(self): 
     return self 

    def __next__(self): 
     try: 
      ret = self.pipe.recv() 
      if ret == PIPE_EOF: 
       raise StopIteration 
      return ret 
     except EOFError: 
      raise StopIteration 

    def next(self): 
     return self.__next__() 


def parallel_generator_chain(*args, **kwargs): 
    if 'data' in kwargs: 
     data = kwargs['data'] 
    else: 
     raise RuntimeError('Missing "data" argument.') 

    def decorator(func, _input, _output): 
     def wrapper(*args, **kwargs): 
      for item in func(_input): 
       _output.send(item) 
      _output.send(PIPE_EOF) 
     return wrapper 

    for func in args: 
     in_end, out_end = Pipe(duplex = False) 
     in_end = IterableConnection(in_end) 
     func = decorator(func, data, out_end) 
     p = Process(target = func) 
     p.start() 
     data = in_end 

    for output in data: 
     yield output 



if 'xrange' not in globals(): 
    xrange = range 


if __name__ == '__main__': 
    for x in parallel_generator_chain(xrange, F1, F2, F3, F4, data=100000000): 
     print(x) 

#for x in F4(F3(F2(F1(range(1000000))))): 
# print(x) 
+0

對不起,謝謝你的回覆!我一直試圖讓你的代碼工作,但我沒有管理(Python 3.3)。你的'decorator'函數不返回任何東西,代碼根本不會產生任何輸出;所以我添加了一個返回到裝飾器,但我得到'_pickle.PicklingError:不能pickle :屬性查找builtins.function失敗'。代碼是否正確運行?再次感謝。 –

+0

你說得對。我寫它很像一個概念證明而不是一個工作代碼,所以我沒有對它進行測試。不過,我用一個工作版本編輯了我的帖子。它應該與Python 2和3一起工作。 – smeso

+0

我仍然在Windows上遇到「_pickle.PicklingError:Can not pickle :attribute lookup builtins.function failed」thingy;將在Unix上嘗試它,並保持發佈。 –