1
我正在開發一個REST API,它應該接收JSON並啓動芹菜任務。這些芹菜任務使用Spark。我正在使用Flask框架將API和redis作爲代理消息運行。我發送一個請求時沒有錯誤,但是如果我發送一個請求而其他人沒有結束,則很可能會出現錯誤。我相信這是因爲芹菜任務是同時啓動的,Spark不會允許我多次同時使用sqlContext。下面是一個簡單的例子:使用pyspark的Celery任務的Flask API不能同時運行多個任務
1日腳本:
import os
import sys
import redis
import configparser
from celery import Celery
from celery.result import AsyncResult
from flask_cors import CORS, cross_origin
from flask import Flask, request, jsonify, url_for
# import the script where the script is stored
from celery_task import one_task
# configurate and iniate the Flask API
flask_app = Flask(__name__)
# allow users from other networks to use the API
CORS(flask_app)
flask_app.debug = True
flask_app.config['celery_broker_url'] = 'redis://localhost:6379/0'
flask_app.config['celery_result_backend'] = 'redis://localhost:6379/0'
celery = Celery("my_api",broker='redis://localhost:6379/0')
# launch celery task app
celery.conf.update(flask_app.config)
@flask_app.route('/test', methods=['POST'])
def job_route():
# the exception raised means the key are not correct
task = one_task.apply_async()
response_msg = {"payload":"ok"}
return jsonify(response_msg)
if __name__ == '__main__':
# launch the app on port from conf file on local public IP adress
flask_app.run(host="localhost", port=8000)
2D腳本:
import os
import sys
import celery
import configparser
import time
from celery import Celery
from pyspark import SparkContext
from pyspark.sql import SparkSession, SQLContext
from flask import request, url_for, jsonify
celery = Celery("my_api",broker='redis://localhost:6379/0')
sc = SparkContext()
sc.setLogLevel("WARN")
sqlContext = SQLContext(sc)
# dumb celery task
@celery.task(bind=True)
def one_task(self):
task_id = self.request.id
for i in range(10):
l = [('Alice', 1)]
data = sqlContext.createDataFrame(l, ['name', 'age'])
data = data.withColumnRenamed("nickname","name")
data.show()
# call a useless operation in order to show the error
我永遠不會有同樣的錯誤,所以兩次我不會發布它。我認爲一個解決辦法是強迫芹菜一次不能發起多個任務。我看到在這個鏈接中可以使用django(http://docs.celeryproject.org/en/latest/tutorials/task-cookbook.html),我想嘗試使用Flask的類似解決方案。
爲了正確運行代碼必須使用3個端子:在第一終端:
redis-server
2D終端:
celery -A api_debug worker --loglevel=info
3D終端:
python api_debug.py
任何幫助,將不勝感激。