2017-03-16 80 views
0

我想在芹菜工作者端使用jupyter內核。每個芹菜工作者將會有一個Jupyter內核。在任務內部訪問芹菜工人實例

爲了實現它,我覆蓋了芹菜的默認Worker類,在工作人員的初始化過程中,我啓動了jupyter內核,並使用stop方法關閉了jupyter內核。

我面臨的當前問題是如何在任務運行時訪問任務內的內核實例?

是否有更好的方法覆蓋celery應用程序的Worker類定義,而不是app.Worker = CustomWorker

這裏是自定義工作者的芹菜配置。

from __future__ import absolute_import, unicode_literals 
from celery import Celery 
from jupyter_client import MultiKernelManager 

app = Celery('proj', 
    broker='redis://', 
    backend='redis://', 
    include=['tasks']) 

app.conf.update(
    result_expires=3600 
) 

class CustomWorker(app.Worker): 
    def __init__(self, *args, **kwargs): 
     self.km = MultiKernelManager() 
     self.kernel_id = self.km.start_kernel() 
     print("Custom initializing") 
     self.kernel_client = km.get_kernel(kernel_id).client() 
     super(CustomWorker, self).__init__(*args, **kwargs) 

    def on_close(self): 
     self.km.shutdown_kernel(self.kernel_id) 
     super(CustomWorker, self).on_close() 

app.Worker = CustomWorker 

if __name__ == '__main__': 
    app.start() 

這裏是tasks.py

from __future__ import absolute_import, unicode_literals 
from celery import app 

from celery import Task 
from tornado import gen 
from jupyter_client import MultiKernelManager 
from zmq.eventloop import ioloop 
from zmq.eventloop.zmqstream import ZMQStream 
ioloop.install() 

reply_futures = {} 

# This is my celery task where I pass the arbitary python code to execute on 
# some celery worker(actually to the corresponding kernel) 
@app.task 
def pythontask(code): 
    # I don't know how to get the kernel_client for current celery worker !!? 
    kernel_client = self.get_current_worker().kernel_client 
    mid = kernel_client.execute(code) 

    # defining the callback which will be executed when message arrives on 
    # zmq stream 
    def reply_callback(session, stream, msg_list): 
     idents, msg_parts = session.feed_identities(msg_list) 
     reply = session.deserialize(msg_parts) 
     parent_id = reply['parent_header'].get('msg_id') 
     reply_future = reply_futures.get(parent_id) 
     if reply_future: 
      reply_future.set_result(reply) 

    @gen.coroutine 
    def execute(kernel_client, code): 
     msg_id = kernel_client.execute(code) 
     f = reply_futures[msg_id] = Future() 
     yield f 
     raise gen.Return(msg_id) 

    # initializing the zmq streams and attaching the callback to receive message 
    # from the kernel 
    shell_stream = ZMQStream(kernel_client.shell_channel.socket) 
    iopub_stream = ZMQStream(kernel_client.iopub_channel.socket) 
    shell_stream.on_recv_stream(partial(reply_callback, kernel_client.session)) 
    iopub_stream.on_recv_stream(partial(reply_callback, kernel_client.session)) 

    # create a IOLoop 
    loop = ioloop.IOLoop.current() 
    # listen on the streams 
    msg_id = loop.run_sync(lambda: execute(kernel_client,code)) 
    print(reply_msgs[msg_id]) 
    reply_msgs[msg_id] = [] 

    # Disable callback and automatic receiving. 
    shell_stream.on_recv_stream(None) 
    iopub_stream.on_recv_stream(None) 

回答

0

骨架並稱工人實例信息請求對象解決我的問題。要這樣做,我會覆蓋工人類的_process_task方法。

def _process_task(self, req): 
    try: 
    req.kwargs['kernel_client'] = self.kernel_client 
    print("printing from _process_task {}".format(req.kwargs)) 
    req.execute_using_pool(self.pool) 
    except TaskRevokedError: 
    try: 
     self._quick_release() # Issue 877 
    except AttributeError: 
     pass 
    except Exception as exc: 
    logger.critical('Internal error: %r\n%s',exc, traceback.format_exc(), exc_info=True) 

這是我的任務,我訪問kernel_client

@app.task(bind=True) 
def pythontask(self,code, kernel_client=None): 

    mid = kernel_client.execute(code) 

    print("{}".format(kernel_client)) 
    print("{}".format(mid)) 

這個事情的作品只有當我在單機模式下啓動的工人,否則不是拋出一些酸洗錯誤。反正使用獨奏工人是我的要求,所以這個解決方案適用於我