2015-08-17 40 views
3

我有一個python 3.4.3,postgreSQL 9.4,aiopg-0.7.0。一個多線程應用程序的例子,從這個網站被採取。如何使用游泳池?線程在選擇操作時掛起。如何在多線程應用程序中使用aiopg池?

import time 
import asyncio 
import aiopg 
import functools 
from threading import Thread, current_thread, Event 
from concurrent.futures import Future 

class B(Thread): 
    def __init__(self, start_event): 
     Thread.__init__(self) 
     self.loop = None 
     self.tid = None 
     self.event = start_event 

    def run(self): 
     self.loop = asyncio.new_event_loop() 
     asyncio.set_event_loop(self.loop) 
     self.tid = current_thread() 
     self.loop.call_soon(self.event.set) 
     self.loop.run_forever() 

    def stop(self): 
     self.loop.call_soon_threadsafe(self.loop.stop) 

    def add_task(self, coro): 
     """this method should return a task object, that I 
     can cancel, not a handle""" 
     def _async_add(func, fut): 
      try: 
       ret = func() 
       fut.set_result(ret) 
      except Exception as e: 
       fut.set_exception(e) 

     f = functools.partial(asyncio.async, coro, loop=self.loop) 
     if current_thread() == self.tid: 
      return f() # We can call directly if we're not going between threads. 
     else: 
      # We're in a non-event loop thread so we use a Future 
      # to get the task from the event loop thread once 
      # it's ready. 
      fut = Future() 
      self.loop.call_soon_threadsafe(_async_add, f, fut) 
      return fut.result() 

    def cancel_task(self, task): 
     self.loop.call_soon_threadsafe(task.cancel) 


@asyncio.coroutine 
def test(pool, name_task): 
    while True: 
     print(name_task, 'running') 
     with (yield from pool.cursor()) as cur: 
      print(name_task, " select. ") 
      yield from cur.execute("SELECT count(*) FROM test") 
      count = yield from cur.fetchone() 
      print(name_task, ' Result: ', count) 
     yield from asyncio.sleep(3) 

@asyncio.coroutine 
def connect_db(): 
    dsn = 'dbname=%s user=%s password=%s host=%s' % ('testdb', 'user', 'passw', '127.0.0.1') 
    pool = yield from aiopg.create_pool(dsn) 
    print('create pool type =', type(pool)) 
    # future.set_result(pool) 
    return (pool) 

event = Event() 
b = B(event) 
b.start() 
event.wait() # Let the loop's thread signal us, rather than sleeping 
loop_db = asyncio.get_event_loop() 
pool = loop_db.run_until_complete(connect_db()) 
time.sleep(2) 
t = b.add_task(test(pool, 'Task1')) # This is a real task 
t = b.add_task(test(pool, 'Task2')) 

while True: 
    time.sleep(10) 

b.stop() 

在不返回結果 '從cur.execute產量( 「SELECT COUNT(*)FROM測試」)'

+0

可能會有所幫助:http://pylover.dobisel.com/posts/aiopg-aiopg_sa-and-aiopg8000/ – pylover

回答

4

長話短說:你不能共享來自不同事件循環aiopg池對象。

每個aiopg.Pool都耦合到事件循環。如果您不明確指定loop參數,則會從asyncio.get_event_loop()調用中獲取。

因此,你的例子中你有一個從主線程耦合到事件循環的池。

當你從單獨的線程執行數據庫查詢時,你試圖通過執行線程循環來完成它,而不是主循環。它不起作用。

相關問題