2012-03-19 78 views
0

作爲this question的後續行動,我試圖通過使用生成器xrange(int(1e8))來規避以range(int(1e8))爲例的列表構建。 xrange只是一個產生長序列值的過程的例子。 (請認爲它不容易被複制。)更多的背景是,我有一長串的時間戳/值對,我想對它們做一些處理(時間序列)。我儘量避免將這些內容拉到整個內存中,因爲這是很多數據。Python生成器的多個客戶端?

我認爲這很酷,如果我可以將多個處理單元同時應用到由我的生成器生成的這個數據流中。第一個想法是使用itertools.tee(),例如:

from itertools import tee 
g1,g2 = tee(xrange(int(1e8)),2) 
sum(g1), sum(g2) 

但後來我發現,只有第一sum()將使用發電機,而tee()內部再建立一個list(我想避免的。)。

所以我想,我需要一個異步解決方案,即每個sum()每個生成器步驟都會進行更新。 是來記東西的地方

但我既沒有真正使用之前,部分我甚至不能告訴方法是否可以正常工作,或者是有效的/有效的/高效的。

從這一點,我很樂意欣賞觀衆的任何建議!


更新

我想避免callback based solution,因爲它apparantly降低性能顯著(這是它是如何目前實施)。我加入了以下一些剖析(請加註釋,如果測試是不客觀的):

class SinkA: 
    def __init__(self, src): 
    for i in src: pass 

class SinkB: 
    def f(self,i): 
    pass 

class Source: 
    def __iter__(self): 
    for i in xrange(int(1e4)): 
     yield i 

def t1(): 
    src = Source() 
    snk = SinkA(src) 

def t2(): 
    src = Source() 
    snk = SinkB() 
    for i in src: snk.f(i) 

if __name__ == "__main__": 
    from timeit import Timer 
    n = 1000 
    t = Timer("t1()", "from __main__ import t1, t2, SinkA, SinkB, Source") 
    print "%.2f usec/pass" % (1000000 * t.timeit(number=n)/n) # 612.11 usec/pass 
    t = Timer("t2()", "from __main__ import t1, t2, SinkA, SinkB, Source") 
    print "%.2f usec/pass" % (1000000 * t.timeit(number=n)/n) # 1933.39 usec/pass 

更新2

我還能說什麼?我有這個基於回調的解決方案,看起來效率不高。基於生成器的方法似乎很有前途,但我對這種編程經驗太少,特別是當涉及更復雜的協同程序或扭曲的庫時。總而言之,我有多個消費者用於生成大量數據的過程,並且我發現了一些潛在的方法。現在我正在尋找有經驗的用戶可能已經完成類似任務的合格語句。說明哪種方法可能是適當的,以及這些方法如何相互關聯。或者我可能錯過了其他的方法。

+1

你真的不解決這個問題:你希望每個消費者看到完全相同的數據,或沒有? – Marcin 2012-03-19 12:17:27

+0

我猜你用'tee'看到的行爲是因爲你沒有並行運行你的兩個任務。 Python首先執行'sum(g1)',然後'sum(g2)'。嘗試使用循環手動執行你的總和,並查看它是否消耗爲可讀內存。 – 2012-03-19 12:25:49

+0

@CharlesBrunet,那是真的。我想以某種方式抽象掉這個手動循環。要有更好的代碼。 – moooeeeep 2012-03-19 12:37:26

回答

5

作爲一個通用的方法,我會回調更換髮電機的拉模型,和可能,包裹發電機,就像這樣:

def walk(gen, callbacks): 
    for item in gen: 
     for f in callbacks: 
      f(item) 

如果你的處理器是在單獨的線程,你想他們阻止等待,您可以註冊Queue.put(或任何等效的)作爲每個處理器的回調,並獨立輪詢這些隊列。如果您需要,這將允許您使用廣播和工作者模型。

編輯

另一種解決方案是使用coroutines

def source(self, *dests): 
    for i in xrange(int(1e4)): 
     for dest in dests: 
      dest.send(i) 

def sink(): 
    while True: 
     i = yield 

def t3(): 
    snk = sink() 
    snk.next() # activate the coroutine 
    source(snk) 

if __name__ == '__main__': 

    from timeit import Timer 
    n = 1000 
    t = Timer("t3()", "from __main__ import source, sink, t3") 
    print "%.2f usec/pass" % (1000000 * t.timeit(number=n)/n) # 872.99 usec/pass 

看起來不夠快。基本上,協程是倒置的發生器,你從發生器中拉出,推入協程。

+0

這是什麼架構?考慮到拉模型會導致線程阻塞,只要IO層能夠正常工作,那將是最簡單的編程模型。 – Marcin 2012-03-19 12:28:56

+0

如果該人需要阻止,那麼實際上回調可能是'Queue.put'(更新了答案)。 – bereal 2012-03-19 12:31:14

+0

請參閱我的編輯 – moooeeeep 2012-03-19 12:35:19

1

你並沒有真正解決這個問題,但是你希望每個消費者看到完全相同的數據(在這種情況下,tee可能是最好的解決方案)嗎?

如果不是,那麼您可以簡單地讓每個消費者從一個生成器對象讀取。

如果您確實希望他們獲得完全相同的數據,請嘗試tee(使用更多內存)與兩個生成器(更多IO),並查看哪個更快。

至於你的計時,你的數據顯示的只是多個函數調用的開銷,並且你的一個方法避免了中間函數調用。

如果你想提高性能,請嘗試在PyPy上運行它,它有一個熱點優化JIT。

+0

不幸的是PyPy不支持我的其他依賴項。 (事實上​​,問題中沒有記錄。) – moooeeeep 2012-03-19 13:39:26

+0

@moooeeeep你似乎有很多約束,你根本沒有聲明。所有這些答案都會按照你的問題回答你的問題,但是你一直抱怨他們不回答你的問題。 – Marcin 2012-03-19 13:44:08

1

由於發電機在內存中便宜,爲什麼不簡單地使用兩個獨立的發電機?

g1 = xrange(int(1e8)) 
g2 = xrange(int(1e8)) 
sum(g1), sum(g2) 
+0

兩次IO,但。 – Marcin 2012-03-19 12:18:00

+0

'xrange'只是一個產生一長串值的過程的例子。請認爲它不能輕易複製。 – moooeeeep 2012-03-19 12:20:09