2016-11-16 44 views
0

我最近將Celery安裝升級到4.0。經過幾天的升級過程摔跤後,我終於開始工作......有點。有些任務會返回,但最終的任務不會。將芹菜從3.1升級到4.0後,Redis沒有返回結果

我有一個類,SFF,是發生在和解析文件:

# Constructor with I/O file 
def __init__(self, file): 

    # File data that's gonna get used a lot 
    sffDescriptor = file.fileno() 
    fileName = abspath(file.name) 

    # Get the pointer to the file 
    filePtr = mmap.mmap(sffDescriptor, 0, flags=mmap.MAP_SHARED, prot=mmap.PROT_READ) 

    # Get the header info 
    hdr = filePtr.read(HEADER_SIZE) 
    self.header = SFFHeader._make(unpack(HEADER_FMT, hdr)) 

    # Read in the palette maps 
    print self.header.onDemandDataSize 
    print self.header.onLoadDataSize 
    palMapsResult = getPalettes.delay(fileName, self.header.palBankOff - HEADER_SIZE, self.header.onDemandDataSize, self.header.numPals) 

    # Read the sprite list nodes 
    nodesStart = self.header.sprListOff 
    nodesEnd = self.header.palBankOff 
    print nodesEnd - nodesStart 
    sprNodesResult = getSprNodes.delay(fileName, nodesStart, nodesEnd, self.header.numSprites) 

    # Get palette data 
    self.palettes = palMapsResult.get() 

    # Get sprite data 
    spriteNodes = sprNodesResult.get() 

    # TESTING 
    spritesResultSet = ResultSet([]) 
    numSpriteNodes = len(spriteNodes) 
    # Split the nodes into chunks of size 32 elements 
    for x in xrange(0, numSpriteNodes, 32): 
     spritesResult = getSprites.delay(spriteNodes, x, x+32, fileName, self.palettes, self.header.palBankOff, self.header.onDemandDataSizeTotal) 
     spritesResultSet.add(spritesResult) 
     break # REMEMBER TO REMOVE FOR ENTIRE SFF 

    self.sprites = spritesResultSet.join_native() 

,如果我把它分解可使用ResultSet不要緊,如果它是返回整個spritesResult一個任務,或,結果總是相同的:我使用的Python控制檯只是掛在spritesResultSet.join_native()spritesResult.get()(取決於我如何格式化它)。

這裏是有問題的任務:

@task 
def getSprites(nodes, start, end, fileName, palettes, palBankOff, onDemandDataSizeTotal): 
sprites = [] 

with open(fileName, "rb") as file: 
    sffDescriptor = file.fileno() 
    sffData = mmap.mmap(sffDescriptor, 0, flags=mmap.MAP_SHARED, prot=mmap.PROT_READ) 

    for node in nodes[start:end]: 
     sprListNode = dict(SprListNode._make(node)._asdict()) # Need to convert it to a dict since values may change. 
     #print node 
     #print sprListNode 

     # If it's a linked sprite, the data length is 0, so get the linked index. 
     if sprListNode['dataLen'] == 0: 
      sprListNodeTemp = SprListNode._make(nodes[sprListNode['index']]) 
      sprListNode['dataLen'] = sprListNodeTemp.dataLen 
      sprListNode['dataOffset'] = sprListNodeTemp.dataOffset 
      sprListNode['compression'] = sprListNodeTemp.compression 

     # What does the offset need to be? 
     dataOffset = sprListNode['dataOffset'] 
     if sprListNode['loadMode'] == 0: 
      dataOffset += palBankOff #- HEADER_SIZE 
     elif sprListNode['loadMode'] == 1: 
      dataOffset += onDemandDataSizeTotal #- HEADER_SIZE 

     #print sprListNode 

     # Seek to the data location and "read" it in. First 4 bytes are just the image length 
     start = dataOffset + 4 
     end = dataOffset + sprListNode['dataLen'] 
     #sffData.seek(start) 

     compressedSprite = sffData[start:end] 

     # Create the sprite 
     sprite = Sprite(sprListNode, palettes[sprListNode['palNo']], np.fromstring(compressedSprite, dtype=np.uint8)) 
     sprites.append(sprite) 

return json.dumps(sprites, cls=SpriteJSONEncoder) 

我知道它到達return語句,因爲如果我把打印正上方,它會在芹菜窗口打印。我也知道,正在運行的任務完成,因爲我從工人得到以下信息:

[2016年11月16日00:03:33639:信息/ PoolWorker-4]任務framedatabase.tasks.getSprites [285ac9b1-09b4-4cf1-a251-da6212863832] 0.137236133218s成功:'[{「width」:120,「palNo」:30,「group」:9000,「xAxis」:0,「yAxis」:0,「數據「:...」

這裏是我的芹菜設置在settings.py:

# Celery settings 
BROKER_URL='redis://localhost:1717/1' 
CELERY_RESULT_BACKEND='redis://localhost:1717/0' 
CELERY_IGNORE_RESULT=False 
CELERY_IMPORTS = ("framedatabase.tasks",) 

...和我celery.py:

from __future__ import absolute_import 

import os 

from celery import Celery 

# set the default Django settings module for the 'celery' program. 
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'framedatabase.settings') 

from django.conf import settings # noqa 

app = Celery('framedatabase', backend='redis://localhost:1717/1', broker="redis://localhost:1717/0", 
    include=['framedatabase.tasks']) 

# Using a string here means the worker will not have to 
# pickle the object when using Windows. 
app.config_from_object('django.conf:settings', namespace='CELERY') 
app.autodiscover_tasks() 


@app.task(bind=True) 
def debug_task(self): 
    print('Request: {0!r}'.format(self.request)) 

回答

0

發現問題。顯然,這是導致死鎖;在節中提到的「避免發動同步子任務」芹菜文檔在這裏:http://docs.celeryproject.org/en/latest/userguide/tasks.html#tips-and-best-practices

所以我擺脫了線:

sprNodesResult.get() 

,改變了最終結果的鏈:

self.sprites = chain(getSprNodes.s(fileName, nodesStart, nodesEnd, self.header.numSprites), 
    getSprites.s(0,32,fileName,self.palettes,self.header.palBankOff,self.header.onDemandDataSizeTotal))().get() 

它的工作原理!現在我只需要找到一種方法來按照我想要的方式拆分它!