2016-08-01 55 views
1

我之前使用過Python,但僅適用於Flask應用程序,但我以前從未使用過芹菜。在閱讀文檔並設置好所有東西(並且它的工作原理是我已經用多個工人測試過)之後,我試圖運行一個SQL查詢,並且對於從查詢返回的每一行都將它發送給一個芹菜工人。從MySQL查詢返回的每一行運行Celery任務?

下面是一個非常基本的代碼示例。

from celery import Celery 
import MySQLdb 

app = Celery('tasks', broker='redis://localhost:6379/0') 
@app.task 
def print_domain(): 
    db = MySQLdb.connect(host="localhost", user="DB_USER", passwd="DB_PASS", db="DB_NAME") 
    cur = db.cursor() 
    cur.execute("SELECT * FROM myTable") 

    for row in cur.fetchall(): 
     print_query_result(row[0]) 

    db.close() 

def print_query_result(result): 
    print result 

基本上它選擇'myTable'表中的所有內容,並且每返回一行就打印出來。如果我使用Python調用代碼,它會正常工作並打印MySQL表中的所有數據。當我使用.delay()函數將其發送給工作人員進行處理時,僅將其發送給一個工作人員,並僅輸出數據庫中的最上一行。

我一直在嘗試閱讀子任務,但我不知道如果我正朝着正確的方向前進。

總之,我想要這樣的事情發生,但我沒有從哪裏開始。有沒有人有任何想法?

  • SQL查詢表
  • 選擇所有的行發送的每一行/導致一名工人來處理一些代碼
  • 返回碼結果返回到數據庫
  • 隊列中拿起一個項目(如任何)

在此先感謝。

編輯1:

我已經更新了我的代碼使用SQLAlchemy代替,但結果仍返回就像我的舊的查詢這是罰款。

from celery import Celery 
from models import DBDomains 

app = Celery('tasks', broker='redis://localhost:6379/0') 
@app.task 
def print_domain(): 
    query = DBDomains.query.all() 
    for i in query: 
     print i.domain 
     print_query_result.s() 

@app.task 
def print_query_result(): 
    print "Received job" 

print_domain.delay() 

運行.py文件返回時工人:

[2016-08-02 02:08:40,881: INFO/MainProcess] Received task: tasks.print_domain[65d7667a-fc70-41f7-8caa-b991f360a9de] 
[2016-08-02 02:08:41,036: WARNING/Worker-3] result1 
[2016-08-02 02:08:41,037: WARNING/Worker-3] result2 
[2016-08-02 02:08:41,039: INFO/MainProcess] Task tasks.print_domain[65d7667a-fc70-41f7-8caa-b991f360a9de] succeeded in 0.154022816569s: None 

正如你所看到的,工作人員會從我查詢表「RESULT1」和「結果2」,但隨後它不似乎只是在打印「接收作業」的子任務中執行命令。

更新:它看起來像子任務必須有一個.delay()作爲每個芹菜文檔,所以我的代碼看起來像這樣,併成功地分配在工作人員現在的工作。

from celery import Celery 
from models import DBDomains 

app = Celery('tasks', broker='redis://localhost:6379/0') 
@app.task 
def print_domain(): 
    query = DBDomains.query.all() 
    for i in query: 
     subtask = print_query_result.s(i.domain) 
     subtask.delay() 


@app.task 
def print_query_result(domain): 
    print domain 

print_domain.delay() 
+0

所以你想要一個任務,將在數據庫上進行查詢,併爲每個返回的行將排隊另一個任務?或者,對於執行查詢並將新任務分配爲常規函數的頂層而言,是否可以接受? –

+0

是的,基本上它將是一個任務,使得查詢,然後爲每個結果產生另一個任務/隊列項目以供工作人員處理。原因是我會處理數以千計(如果不是數萬)的數據行,每隔30秒左右就會查詢一次,所以我的想法是,越多的工作人員運行數據的速度越快。它還留下了擴展的空間,爲更多的工作人員提供更多的數據,我必須處理。我也在考慮線程,但是芹菜似乎更容易,你可以擴大遠程工作人員。 – mphowarth

回答

1

每當您從任務中調用任務時,必須使用subtasks。幸運的是,語法很簡單。

from celery import Celery 

app = Celery('tasks', broker='redis://127.0.0.1:6379/0') 


@app.task 
def print_domain(): 
    for x in range(20): 
     print_query_result.s(x) 


@app.task 
def print_query_result(result): 
    print(result) 

(替代有效範圍內(20與您的查詢結果x)的。)如果你正在觀看的芹菜輸出,你會看到創建和整個員工分配的任務。

+0

不幸的是,似乎沒有工作,我用我現在的代碼更新了我的OP。即使查詢結果肯定會返回,它甚至看起來不像子任務被正確調用。感謝迄今的援助。 – mphowarth

+0

我已經更新了我的OP,因爲我似乎發現了這個問題。感謝您的幫助,並指引我朝着正確的方向發展! – mphowarth

+0

奇數可能是版本中的差異。我發佈的示例工作正常。 –