2014-01-29 47 views
9

背景

我正在研究使用芹菜(3.1.8)處理每個巨大的文本文件(〜30GB)。這些文件格式爲fastq,包含大約118M測序「讀取」,基本上它們是標題,DNA序列和質量字符串的組合。此外,這些序列來自雙端測序運行,所以我同時迭代兩個文件(通過itertools.izip)。我希望能夠做的就是讀取每一對讀取數據,將它們發送到一個隊列,然後讓它們在我們集羣中的一臺機器上處理(不關心它們)以返回已清理的版本如果清潔需要發生(例如,基於質量)。使用芹菜來處理巨大的文本文件

我已經成立了芹菜和RabbitMQ的,和我的工作人員都推出如下:

celery worker -A tasks --autoreload -Q transient 

和配置,如:

from kombu import Queue 

BROKER_URL = 'amqp://[email protected]' 
CELERY_RESULT_BACKEND = 'rpc' 
CELERY_TASK_SERIALIZER = 'pickle' 
CELERY_RESULT_SERIALIZER = 'pickle' 
CELERY_ACCEPT_CONTENT=['pickle', 'json'] 
CELERY_TIMEZONE = 'America/New York' 
CELERY_ENABLE_UTC = True 
CELERYD_PREFETCH_MULTIPLIER = 500 

CELERY_QUEUES = (
    Queue('celery', routing_key='celery'), 
    Queue('transient', routing_key='transient',delivery_mode=1), 
) 

我選擇使用的RPC後端和鹹菜序列化的性能,以及不在 寫入任何內容到'瞬時'隊列中的磁盤(通過delivery_mode)。

芹菜啓動

要設置芹菜框架,我首先發動對64路框中的RabbitMQ服務器(3.2.3,二郎R16B03-1),寫入日誌文件,以快速的/ tmp磁盤。在羣集上的每個節點(大約34個)上啓動工作進程(如上所述),範圍從8路到64路SMP,總共688個內核。所以,我有大量可用的CPU供工作人員用來處理隊列。

作業提交/性能

芹菜一旦啓動並運行,我通過如下一個IPython的筆記本電腦提交作業:

files = [foo, bar] 
f1 = open(files[0]) 
f2 = open(files[1]) 
res = [] 
count = 0 
for r1, r2 in izip(FastqGeneralIterator(f1), FastqGeneralIterator(f2)): 
    count += 1 
    res.append(tasks.process_read_pair.s(r1, r2)) 
     if count == 10000: 
     break 
t.stop() 
g = group(res) 
for task in g.tasks: 
    task.set(queue="transient") 

這需要大約1.5秒的一對萬雙讀取。然後,我呼籲延遲對集團提交給工人,大約需要20秒,如下圖所示:

result = g.delay() 

與RabbitMQ的控制檯監視,我知道我在做的不錯,但幾乎沒有足夠快。

rabbitmq graph

問題

那麼,有沒有什麼辦法可以加快這個嗎?我的意思是,我希望每秒處理至少50000個讀取對,而不是500個。在我的芹菜配置中是否有什麼明顯的缺失?我的工人和兔子日誌本質上是空的。會喜歡一些關於如何提升我的表現的建議。每個人對閱讀的過程相當快,太:

[2014-01-29 13:13:06,352: INFO/Worker-1] tasks.process_read_pair[95ec7f2f-0143-455a-a23b-c032998951b8]: HWI-ST425:143:C04A5ACXX:3:1101:13938:2894 1:N:0:ACAGTG HWI-ST425:143:C04A5ACXX:3:1101:13938:2894 2:N:0:ACAGTG 0.00840497016907 sec 

到目前爲止

所以到這一點,我GOOGLE了所有我能想到的有芹菜,性能,路由,RabbitMQ的,等我已經瀏覽了芹菜網站和文檔。如果我不能獲得更高的性能,我將不得不放棄這種方法,轉而使用另一種解決方案(基本上將工作劃分爲許多較小的物理文件,並使用多處理或其他方式直接在每個計算節點上處理它們)。但是,如果不能在集羣上傳播這些負載,那將是一件令人遺憾的事情。另外,這看起來像是一個精緻優雅的解決方案。

在此先感謝您的幫助!你有

+0

我用的RabbitMQ,發現我的性能瓶頸,從一個封閉的法案開始/重啓每個消息隊列的連接。一旦我開始重新使用連接,發佈率提高了2個數量級。我不熟悉你正在使用的庫,但檢查它是否重用連接 – Basic

+0

是的,芹菜有一個[默認代理池](http://docs.celeryproject.org/en/latest/configuration.html#std :設置 - BROKER_POOL_LIMIT)。我會嘗試增加它,看看會發生什麼。 –

+0

增加了'BROKER_POOL_LIMIT = 1000'並彈跳了我的工人。不幸的是,沒有任何區別。 –

回答

1

一種解決方案是,讀取由

res.append(tasks.process_bytes(zlib.compress(pickle.dumps((r1, r2))), 
             protocol = pickle.HIGHEST_PROTOCOL), 
         level=1)) 

被高度壓縮,以便更換以下

res.append(tasks.process_read_pair.s(r1, r2)) 

,並呼籲另一側的pickle.loads(zlib.decompress(obj))

如果DNA序列足夠長,可以通過大塊因子獲得足夠長的DNA序列,您可以將它們按塊進行分組,然後進行轉儲和壓縮。

如果您還沒有做,另一個可以使用zeroMQ進行運輸。

我不知道什麼process_byte應該

+0

只需使用zlib就會花費我大約100/s。列表基本上不是一回事嗎? 'res.append(tasks.process_read_pair.apply_async(args =(r1,r2),queue =「transient」,compression ='zlib'))' –

+0

也不確定它會如何編寫它。你不需要壓縮r1,r2,而不是壓縮整個任務嗎? –

+0

我不知道你嘗試過壓縮='zlib'你是對的zlib是相同的水平= 9但是你可以指定水平= 1對性能影響不大 –

2

不是一個答案,但太長了評論。

讓我們縮小問題下來了一點......

首先,嘗試跳過所有正常的邏輯/信息準備,只是做與您當前圖書館最緊密的出版循環。看看你得到了多少錢。這將確定它是否與您的非隊列相關的代碼有關。

如果它仍然很慢,請設置一個新的python腳本,但使用amqplib而不是芹菜。我設法在中級桌面上以超過6000/s的速度發佈有用的工作(和json編碼),所以我知道它的性能。這將確定問題是否與芹菜庫有關。 (爲了節省您的時間,我從剪斷我的一個項目中的以下並簡化當希望不破它...)

from amqplib import client_0_8 as amqp 
try: 
    lConnection = amqp.Connection(
     host=###, 
     userid=###, 
     password=###, 
     virtual_host=###, 
     insist=False) 
    lChannel = lConnection.channel() 
    Exchange = ### 

    for i in range(100000): 
     lMessage = amqp.Message("~130 bytes of test data..........................................................................................................") 
     lMessage.properties["delivery_mode"] = 2 
     lChannel.basic_publish(lMessage, exchange=Exchange) 

    lChannel.close() 
    lConnection.close() 

except Exception as e: 
    #Fail 

你上面的兩種方法之間應該能夠追查問題隊列,圖書館或你的代碼之一。

+1

謝謝!良好的測試。使用你的代碼,我交易時速度超過7k/sec,交付模式下降到磁盤時稍微少一點。 [模式2](https://dl.dropboxusercontent.com/u/861789/Screen%20Shot%202014-01-30%20at%2012.36.52%20PM.png),[模式1](https:// dl .dropboxusercontent.com/u/861789/Screen%20Shot%202014-01-30%20at%2012.35.35%20 PM.png) –

+0

好吧,那是朝着正確方向邁出的一步。至少我們知道隊列正在工作......我想下一個最簡單的步驟就是將amqplib放到你的原始代碼中,看看它的速度是多少。如果它很高,我們會稱之爲勝利。如果沒有,它證明問題與隊列無關,而是與你的代碼 - 可能你用來存儲數據的結構是慢枚舉或類似的東西。如果它是你的代碼,[分析是要走的路](http://stackoverflow.com/a/582337/156755)。這不是一個很好的剖析器,但是迄今爲止我在pythong中發現的最好的剖析器。 – Basic

0

再次,不是一個答案,但太長的評論。每Basic's的意見/回答以下,我設置了使用相同的交換和路由爲我的應用程序下面的測試:

from amqplib import client_0_8 as amqp 
try: 
    lConnection = amqp.Connection() 
    lChannel = lConnection.channel() 
    Exchange = 'celery' 

    for i in xrange(1000000): 
     lMessage = amqp.Message("~130 bytes of test data..........................................................................................................") 
     lMessage.properties["delivery_mode"] = 1 
     lChannel.basic_publish(lMessage, exchange=Exchange, routing_key='transient') 

    lChannel.close() 
    lConnection.close() 

except Exception as e: 
    print e 

你可以看到,這是正確的一起搖擺。

test

我想現在是高達找出這之間的差異,這是怎麼回事的

0

我加入AMQP到我的邏輯裏,而且速度快。 FML。

from amqplib import client_0_8 as amqp 
try: 
    import stopwatch 
    lConnection = amqp.Connection() 
    lChannel = lConnection.channel() 
    Exchange = 'celery' 

    t = stopwatch.Timer() 
    files = [foo, bar] 
    f1 = open(files[0]) 
    f2 = open(files[1]) 
    res = [] 
    count = 0 
    for r1, r2 in izip(FastqGeneralIterator(f1), FastqGeneralIterator(f2)): 
     count += 1 
     #res.append(tasks.process_read_pair.s(args=(r1, r2))) 
     #lMessage = amqp.Message("~130 bytes of test data..........................................................................................................") 
     lMessage = amqp.Message(" ".join(r1) + " ".join(r2)) 
     res.append(lMessage) 
     lMessage.properties["delivery_mode"] = 1 
     lChannel.basic_publish(lMessage, exchange=Exchange, routing_key='transient') 
     if count == 1000000: 
      break 
    t.stop() 
    print "added %d tasks in %s" % (count, t) 

    lChannel.close() 
    lConnection.close() 

except Exception as e: 
    print e 

img

所以,我做了一個改變提交一個異步任務芹菜循環,如下:

res.append(tasks.speed.apply_async(args=("FML",), queue="transient")) 

速度的方法就是這樣的:

@app.task() 
def speed(s): 
    return s 

提交我的任務 ag艾因!

img

因此,它不會出現有什麼關係:

  1. 我如何迭代提交到隊列
  2. 我正在提交
  3. 消息

但是,它必須與功能的排隊?!?!我很困惑。

0

再次,不是一個答案,而是更多的觀察。通過簡單地從RPC改變我的後端Redis的,我的三倍以上我吞吐量:

img

2

重用生產實例應該給你一些性能改進:

with app.producer_or_acquire() as producer: 
    task.apply_async(producer=producer) 

而且任務可能是代理對象,如果是必須爲每個調用進行評估:

task = task._get_current_object() 

使用group會自動重新用製片人,通常是你會 在這樣一個循環做:

process_read_pair = tasks.process_read_pair.s 
g = group(
    process_read_pair(r1, r2) 
    for r1, r2 in islice(
     izip(FastGeneralIterator(f1), FastGeneralIterator(f2)), 0, 1000) 
) 
result = g.delay() 

您還可以考慮安裝這是寫在C. 的librabbitmq模塊amqp://交通將自動如有,請使用它(或可以手動使用librabbitmq://指定:直接

pip install librabbitmq 

出版消息使用底層文庫可以是更快 ,因爲它會繞過芹菜路由助理等,但我不會 認爲這太慢了。如果是這樣的話,那麼Celery中肯定有優化的餘地, ,因爲我目前主要集中在優化消費者方面。

還請注意,您可能希望在同一個任務來處理多個DNA雙, 如使用較粗的任務粒度可爲CPU /內存緩存有益等等, ,它往往會飽和並行無論如何,因爲這是一個有限的資源。

注:瞬時隊長應該是durable=False

+0

請注意,在使用獨奏池時沒有結果,但工作人員應該能夠每秒消耗50.000個任務,儘管我在某段時間內沒有進行基準測試,因此性能可能會有所下降。發佈時也可能有助於使用多個連接。 – asksol

+0

謝謝,當然還有更多的東西可以嘗試。我一直在使用C客戶端,這在我的代碼片段中並不明顯。我目前使用redis後端而不是rpc進行測試,並使用批量提交的50k個讀取對進行測試,這使我能夠以5000-ish/sec提交,雖然不是很棒,但肯定比以前更好。 –