2010-03-17 66 views
6

我嘗試使用txredis(非阻塞扭曲API for redis)爲持久消息隊列嘗試設置與我正在工作的scrapy項目。我發現雖然客戶端沒有被阻塞,但是它變得比原來慢得多,因爲反應器迴路中應該發生的一件事情被分成幾千個步驟。Twisted:爲什麼將延遲迴調傳遞給延遲線程會讓線程突然間阻塞?

因此,我嘗試使用redis-py(常規阻塞扭曲API)並將調用封裝在延遲線程中。它工作的很好,但是當我打電話給redis時,我想執行一個內部延遲,因爲我想設置連接池以嘗試進一步提高速度。

下面是我從扭曲的文檔採取延期線程一些示例代碼解釋,說明我的使用情況:

#!/usr/bin/env python 
from twisted.internet import reactor,threads 
from twisted.internet.task import LoopingCall 
import time 

def main_loop(): 
    print 'doing stuff in main loop.. do not block me!' 


def aBlockingRedisCall(): 
    print 'doing lookup... this may take a while' 
    time.sleep(10) 
    return 'results from redis' 

def result(res): 
    print res 

def main(): 
    lc = LoopingCall(main_loop) 
    lc.start(2) 
    d = threads.deferToThread(aBlockingRedisCall) 
    d.addCallback(result) 
    reactor.run() 

if __name__=='__main__': 
    main() 

這裏是我改變了連接池,使代碼遞延線程阻止:

#!/usr/bin/env python 
from twisted.internet import reactor,defer 
from twisted.internet.task import LoopingCall 
import time 

def main_loop(): 
    print 'doing stuff in main loop.. do not block me!' 

def aBlockingRedisCall(x): 
    if x<5: #all connections are busy, try later 
     print '%s is less than 5, get a redis client later' % x 
     x+=1 
     d = defer.Deferred() 
     d.addCallback(aBlockingRedisCall) 
     reactor.callLater(1.0,d.callback,x) 
     return d 

    else: 
     print 'got a redis client; doing lookup.. this may take a while' 
     time.sleep(10) # this is now blocking.. any ideas? 
     d = defer.Deferred() 
     d.addCallback(gotFinalResult) 
     d.callback(x) 
     return d 

def gotFinalResult(x): 
    return 'final result is %s' % x 

def result(res): 
    print res 

def aBlockingMethod(): 
    print 'going to sleep...' 
    time.sleep(10) 
    print 'woke up' 

def main(): 
    lc = LoopingCall(main_loop) 
    lc.start(2) 


    d = defer.Deferred() 
    d.addCallback(aBlockingRedisCall) 
    d.addCallback(result) 
    reactor.callInThread(d.callback, 1) 
    reactor.run() 

if __name__=='__main__': 
    main() 

所以我的問題是,沒有人知道爲什麼我的改變導致的遞延線程會阻止和/或任何人都可以提出一個更好的解決方案?

回答

12

好視twisted docs說:

Deferreds不使代碼 每當你使用的是阻塞的代碼,如sleep奇蹟般地沒有阻止

,你不得不推遲它到一個新的線程。

#!/usr/bin/env python 
from twisted.internet import reactor,defer, threads 
from twisted.internet.task import LoopingCall 
import time 

def main_loop(): 
    print 'doing stuff in main loop.. do not block me!' 

def aBlockingRedisCall(x): 
    if x<5: #all connections are busy, try later 
     print '%s is less than 5, get a redis client later' % x 
     x+=1 
     d = defer.Deferred() 
     d.addCallback(aBlockingRedisCall) 
     reactor.callLater(1.0,d.callback,x) 
     return d 

    else: 
     print 'got a redis client; doing lookup.. this may take a while' 
     def getstuff(x): 
      time.sleep(3) 
      return "stuff is %s" % x 

     # getstuff is blocking, so you need to push it to a new thread 
     d = threads.deferToThread(getstuff, x) 
     d.addCallback(gotFinalResult) 
     return d 

def gotFinalResult(x): 
    return 'final result is %s' % x 

def result(res): 
    print res 

def aBlockingMethod(): 
    print 'going to sleep...' 
    time.sleep(10) 
    print 'woke up' 

def main(): 
    lc = LoopingCall(main_loop) 
    lc.start(2) 


    d = defer.Deferred() 
    d.addCallback(aBlockingRedisCall) 
    d.addCallback(result) 
    reactor.callInThread(d.callback, 1) 
    reactor.run() 

if __name__=='__main__': 
    main() 

如果Redis的API是不是很複雜,可能更自然的使用twisted.web重寫它,而不是隻調用了很多線程阻塞API。 http://github.com/deldotdr/txRedis

+0

真棒的感謝! – surtyaar 2010-03-18 00:36:33

0

在一個相關的說明,您或許可以通過使用專門爲扭轉創建Redis的客戶端,比如這一個收穫不少Redis 2.x的新協議和特性。你應該明確地嘗試一下。它被稱爲txredisapi。

對於持久消息隊列,我推薦RestMQ。基於redis的消息隊列系統建立在cyclone和txredisapi之上。

http://github.com/gleicon/restmq

乾杯

+1

OP在第一行說他嘗試使用txRedis。 – pr1001 2010-07-22 20:11:26

1

還有一個上最新的Redis客戶端扭曲它已經支持: