2

什麼是實現以下目標的最有效方式:主進程正在收集和分發事件(事件循環run_forever)給子進程。這些子進程保持活躍狀態​​並從外部世界收集信號或消耗信號並執行cpu綁定操作。到目前爲止,我想出了這樣的事情:python asyncio永遠運行並進程間通信

import os 
import time 
import signal 
import asyncio 
from asyncio import PriorityQueue 
from multiprocessing import Process, Pipe 

class Event(object): 
    __slots__ = ['type','priority','payload','timestamp'] 
    def __init__(self, _type = None, _priority = None, _payload = None): 
     self.type, self.priority, self.payload, self.timestamp = _type, _priority, _payload, time.time() 
    def __str__(self): 
     return "%s(%s,%s,%s)" % (self.__class__.__name__, self.type, self.priority, self.payload) 
    def __lt__(self, other): 
     return (self.priority, self.timestamp) > (other.priority, other.timestamp) 

class EventQueue(PriorityQueue): 
    def _put(self, event): 
     super()._put((event.priority, event)) 

@asyncio.coroutine 
def consumeAnyEvent(eq, acon_write_p = None): ## more args with write_conn pipes 
    while True: 
     priority, event = yield from eq.get() 
     print("consumed",event) 
     if event.type == 'sig_a': 
      acon_write_p.send(event) 
     if event.type == 'sig_b': 
      pass 
     if event.type == 'sig_c': 
      pass 
     ## and so on - broadcast events to relevant sub-processes 
     yield from asyncio.sleep(0) 

@asyncio.coroutine 
def produceSignalA(eq,read_p): 
    while True: 
     yield from asyncio.sleep(0) 
     row = read_p.recv() 
     if row: 
      yield from eq.put(Event('sig_a', payload = row)) 

class someSource(object): 
    """db, http, file watch or other io""" 
    def fetch(self): 
     pass 

def someSlowMethod(a=None): 
    """cpu-bound operations""" 
    pass 

def signalAPublisher(pipe): 
    read_p, write_p = pipe 
    read_p.close() 
    s = someSource() 
    while True: 
     result = s.fetch() 
     if result: 
      write_p.send(result) 

def signalAConsumer(pipe): 
    read_p, write_p = pipe 
    while True: 
     inp = read_p.recv()   
     if inp: 
      result = someSlowMethod(inp) 
      write_p.send(result) 

def main(): 
    ## main process is responsible for handling events: 
    ## colllecting from all signal publisher subprocessses 
    ## and broadcasting to all interested consumer subprocesses 
    eventQueue = EventQueue()  
    apub_read_p, apub_write_p = Pipe() 
    acon_read_p, acon_write_p = Pipe() 
    ## more pipes for Signal B, ... Signal Z 
    signalAPublisher_p = Process(target=signalAPublisher, args=((apub_read_p, apub_write_p),))  
    signalAConsumer_p = Process(target=signalAPublisher, args=((acon_read_p, acon_write_p),)) 
    signalAPublisher_p.start() 
    signalAConsumer_p.start() 
    ## and so on for Signal B, Signal C, ... Signal Z 
    loop = asyncio.get_event_loop() 
    try: 
     tasks = asyncio.gather(
      loop.create_task(produceSignalA(eventQueue,apub_read_p)), 
      loop.create_task(consumeAnyEvent(eventQueue,acon_write_p))  
     ) 
     loop.run_forever() 
    except KeyboardInterrupt: 
     print("Caught keyboard interrupt. Canceling tasks...") 
     tasks.cancel() 
    finally: 
     loop.close() 
     os.kill(signalAPublisher_p.pid, signal.SIGTERM) 
     os.kill(signalAConsumer_p.pid, signal.SIGTERM) 
     ## kill for Signal B, ... Signal Z 

if __name__ == '__main__': 
    main() 

但是我有一種感覺,上面的效率不高/足夠優雅,我失去了一些東西。任何想法,建議?

+1

http://zeromq.org/ –

+0

@PadraicCunningham謝謝你的見解 - 去學習!將它融入python有多痛苦?理想情況下,我不想超越標準的Python庫。 – Nicholas

回答

2

作爲開始,嘗試使用ProcessPoolExecutorrun_in_executor()分發任何CPU綁定的程序,否則,只要使用正規async def/async for/awaitasyncio沒有任何隊列。

import asyncio 
import time 
from concurrent.futures.process import ProcessPoolExecutor 

import random 


async def coro_a(n): 
    print("> a", n) 
    await asyncio.sleep(random.uniform(0.1, 1)) 
    result = await asyncio.gather(coro_b(n), 
            loop.run_in_executor(None, slow_method_c, n)) 
    print("< a", n, result) 


async def coro_b(n): 
    print("> b", n) 
    await asyncio.sleep(random.uniform(0.1, 1)) 
    result = await loop.run_in_executor(None, slow_method_d, n) 
    print("< b", n, result) 
    return ("B", result) 


def slow_method_c(n): 
    print("> c", n) 
    time.sleep(random.uniform(0.5, 5)) 
    print("< c", n) 
    return ("C", n) 


def slow_method_d(n): 
    print("> d", n) 
    time.sleep(random.uniform(0.5, 5)) 
    print("< d", n) 
    return ("D", n) 


async def main_producer(): 
    tasks = [] 
    for i in range(10): 
     tasks.append(asyncio.ensure_future(coro_a(i + 1))) 
     await asyncio.sleep(1) 
    await asyncio.wait(tasks) 


loop = asyncio.get_event_loop() 
loop.set_default_executor(ProcessPoolExecutor()) 
loop.run_until_complete(main_producer()) 
loop.close()