我之前使用過Python,但僅適用於Flask應用程序,但我以前從未使用過芹菜。在閱讀文檔並設置好所有東西(並且它的工作原理是我已經用多個工人測試過)之後,我試圖運行一個SQL查詢,並且對於從查詢返回的每一行都將它發送給一個芹菜工人。從MySQL查詢返回的每一行運行Celery任務?
下面是一個非常基本的代碼示例。
from celery import Celery
import MySQLdb
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def print_domain():
db = MySQLdb.connect(host="localhost", user="DB_USER", passwd="DB_PASS", db="DB_NAME")
cur = db.cursor()
cur.execute("SELECT * FROM myTable")
for row in cur.fetchall():
print_query_result(row[0])
db.close()
def print_query_result(result):
print result
基本上它選擇'myTable'表中的所有內容,並且每返回一行就打印出來。如果我使用Python調用代碼,它會正常工作並打印MySQL表中的所有數據。當我使用.delay()函數將其發送給工作人員進行處理時,僅將其發送給一個工作人員,並僅輸出數據庫中的最上一行。
我一直在嘗試閱讀子任務,但我不知道如果我正朝着正確的方向前進。
總之,我想要這樣的事情發生,但我沒有從哪裏開始。有沒有人有任何想法?
- SQL查詢表
- 選擇所有的行發送的每一行/導致一名工人來處理一些代碼
- 返回碼結果返回到數據庫
- 隊列中拿起一個項目(如任何)
在此先感謝。
編輯1:
我已經更新了我的代碼使用SQLAlchemy代替,但結果仍返回就像我的舊的查詢這是罰款。
from celery import Celery
from models import DBDomains
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def print_domain():
query = DBDomains.query.all()
for i in query:
print i.domain
print_query_result.s()
@app.task
def print_query_result():
print "Received job"
print_domain.delay()
運行.py文件返回時工人:
[2016-08-02 02:08:40,881: INFO/MainProcess] Received task: tasks.print_domain[65d7667a-fc70-41f7-8caa-b991f360a9de]
[2016-08-02 02:08:41,036: WARNING/Worker-3] result1
[2016-08-02 02:08:41,037: WARNING/Worker-3] result2
[2016-08-02 02:08:41,039: INFO/MainProcess] Task tasks.print_domain[65d7667a-fc70-41f7-8caa-b991f360a9de] succeeded in 0.154022816569s: None
正如你所看到的,工作人員會從我查詢表「RESULT1」和「結果2」,但隨後它不似乎只是在打印「接收作業」的子任務中執行命令。
更新:它看起來像子任務必須有一個.delay()作爲每個芹菜文檔,所以我的代碼看起來像這樣,併成功地分配在工作人員現在的工作。
from celery import Celery
from models import DBDomains
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def print_domain():
query = DBDomains.query.all()
for i in query:
subtask = print_query_result.s(i.domain)
subtask.delay()
@app.task
def print_query_result(domain):
print domain
print_domain.delay()
所以你想要一個任務,將在數據庫上進行查詢,併爲每個返回的行將排隊另一個任務?或者,對於執行查詢並將新任務分配爲常規函數的頂層而言,是否可以接受? –
是的,基本上它將是一個任務,使得查詢,然後爲每個結果產生另一個任務/隊列項目以供工作人員處理。原因是我會處理數以千計(如果不是數萬)的數據行,每隔30秒左右就會查詢一次,所以我的想法是,越多的工作人員運行數據的速度越快。它還留下了擴展的空間,爲更多的工作人員提供更多的數據,我必須處理。我也在考慮線程,但是芹菜似乎更容易,你可以擴大遠程工作人員。 – mphowarth