你需要管道和blackmagic,Python有兩個。
from multiprocessing import Process, Pipe
def F1(stream):
for x in stream:
yield str(x)+'a'
def F2(stream):
for x in stream:
yield x+'b'
def F3(stream):
for x in stream:
yield x+'c'
def F4(stream):
for x in stream:
yield x+'d'
class PIPE_EOF:
pass
class IterableConnection(object):
def __init__(self, pipe):
self.pipe = pipe
def __iter__(self):
return self
def __next__(self):
try:
ret = self.pipe.recv()
if ret == PIPE_EOF:
raise StopIteration
return ret
except EOFError:
raise StopIteration
def next(self):
return self.__next__()
def parallel_generator_chain(*args, **kwargs):
if 'data' in kwargs:
data = kwargs['data']
else:
raise RuntimeError('Missing "data" argument.')
def decorator(func, _input, _output):
def wrapper(*args, **kwargs):
for item in func(_input):
_output.send(item)
_output.send(PIPE_EOF)
return wrapper
for func in args:
in_end, out_end = Pipe(duplex = False)
in_end = IterableConnection(in_end)
func = decorator(func, data, out_end)
p = Process(target = func)
p.start()
data = in_end
for output in data:
yield output
if 'xrange' not in globals():
xrange = range
if __name__ == '__main__':
for x in parallel_generator_chain(xrange, F1, F2, F3, F4, data=100000000):
print(x)
#for x in F4(F3(F2(F1(range(1000000))))):
# print(x)
注:我見過http://stackoverflow.com/questions/5684992/how-can-i-parallelize-a-pipeline-of-generators-iterators-in-python,但我的實際使用情況比簡單的「映射」更復雜一些:在處理值時,「F1」,「F2」,「F3」和「F4」中的每一個實際上都會積累一些狀態。 –
如果任何人有興趣看到完整的代碼,它是在http://pit-claudel.fr/clement/blog/an-experimental-estimation-of-the-entropy-of-english-in-50-lines-of -python-code/ –
如果他們累積狀態,你如何保證它甚至可以並行化它們? – BrenBarn