2014-01-08 71 views
9

我一直在關注裏克布蘭森的PyCon視頻:Messaging at Scale at Instagram。您可能需要觀看視頻才能回答此問題。裏克布蘭森使用芹菜,Redis和RabbitMQ。爲了讓您瞭解速度,每個用戶都有一個歸檔列表。每個列表都包含他們關注的人發佈的照片​​的媒體ID。例如,賈斯汀比伯擁有150萬追隨者。當他發佈照片時,該照片的ID需要插入到每個追隨者的每個個人重新列表列表中。這被稱爲扇出式寫入方法。但是,這種方法存在一些可靠性問題。它可以工作,但對於擁有數百萬追隨者的Justin Bieber或Lady Gaga之類的人來說,在網絡請求(您需要0-500ms才能完成請求)中執行此操作可能會出現問題。屆時,請求將超時。Django,Celery,Redis,RabbitMQ:編寫扇出的鏈式任務

所以Rick Branson決定使用Celery,一個基於分佈式消息傳遞的異步任務隊列/作業隊列。任何繁重的工作,例如將媒體ID插入追隨者列表,都可以在Web請求之外異步完成。該請求將完成,芹菜將繼續將ID插入到所有列表中。

這種方法可以創造奇蹟。但是,你不想把賈斯廷的所有追隨者都交給芹菜,因爲它會捆綁一名芹菜工人。爲什麼不讓多個工作人員同時工作,以便更快完成工作?卓見!你會想把這個塊分成更小的塊,並且每個批次都有不同的工作人員。裏克布蘭森做了一批一萬名追隨者,他使用一種稱爲光標的方法來爲所有賈斯汀比伯的追隨者插入媒體ID直到它完成。在視頻中,他在3:56中談到這個問題

我想知道是否有人可以更多地解釋這個問題並展示如何完成它的例子。我目前正在嘗試嘗試相同的設置。我使用Andy McCurdy的redis-py python客戶端庫與我的redis服務器進行通信。對於我服務中的每個用戶,我創建一個redis追隨者列表。

因此與343的ID,用戶必須在以下關鍵列表:

followers:343 

我還爲每個用戶創建一個homefeed列表。每個用戶都有自己的列表。 因此,與1990年的ID,用戶必須在以下關鍵列表:

homefeed:1990 

在「追隨者:343」的Redis列表,它包含了誰遵循用戶343.用戶343人的所有ID有20,007名關注者。下面,我檢索列表中從索引0開始到結束-1的所有ID,以向您展示它的外觀。

>>> r_server.lrange("followers:343", 0, -1) 
['8', '7', '5', '3', '65', '342', '42', etc...] ---> for the sake of example, assume this list has another 20,000 IDs. 

你看到的是誰遵循用戶343

這裏所有用戶的ID的列表是我凸出/ mydjangoapp/tasks.py其中包含我insert_into_homefeed功能:

from __future__ import absolute_import 
from celery import shared_task 
import redis 
pool = redis.ConnectionPool(host='XX.XXX.XXX.X', port=6379, db=0, password='XXXXX') 

@shared_task 
def insert_into_homefeed(photo_id, user_id): 
    # Grab the list of all follower IDs from Redis for user_id. 
    r_server = redis.Redis(connection_pool=pool) 

    followers_list = r_server.lrange("followers:%s" % (user_id), 0, -1) 

    # Now for each follower_id in followers_list, find their homefeed key 
    # in Redis and insert the photo_id into that homefeed list. 

    for follower_id in followers_list: 
     homefeed_list = r_server.lpush("homefeed:%s" % (follower_id), photo_id) 
    return "Fan Out Completed for %s" % (user_id) 

在這個任務中,當從Django視圖調用時,它將抓取所有跟隨用戶343的人的ID,然後將照片ID插入到他們的所有家庭飼養列表中。

這是我的上傳視圖在我的proj/mydjangoapp/views.py。我基本上叫芹菜的延遲的方法,並在neccessary變量傳遞,這樣的要求迅速結束:

# Import the Celery Task Here 
from mydjangoapp.tasks import insert_into_homefeed 


@csrf_exempt 
def Upload(request): 
    if request.method == 'POST': 
     data = json.loads(request.body) 
     newPhoto = Photo.objects.create(user_id = data['user_id'], description= data['description'], photo_url = data['photo_url']) 
     newPhoto_ID = newPhoto.pk 
     insert_into_homefeed.delay(newPhoto_ID, data['user_id']) 
     return HttpResponse("Request Completed") 

我怎麼可以做這樣一種方式,它會10000進行批處理?

回答

8

視頻中描述的方法是任務「鏈接」。

要讓您的任務方法以鍊形式運行,您希望將表示索引的額外參數添加到追隨者列表中。該任務不是處理完整的追隨者列表,而是從固定的批處理大小開始,從索引參數開始。完成後,任務應創建一個新任務並傳遞新索引。

INSERT_INTO_HOMEFEED_BATCH = 10000 

@shared_task 
def insert_into_homefeed(photo_id, user_id, index=0): 
    # Grab the list of all follower IDs from Redis for user_id. 
    r_server = redis.Redis(connection_pool=pool) 

    range_limit = index + INSERT_INTO_HOMEFEED_BATCH - 1 # adjust for zero-index 

    followers_list_batch = r_server.lrange("followers:%s" % (user_id), index, range_limit) 

    if not followers_list_batch: 
     return # zero followers or no more batches 

    # Now for each follower_id in followers_list_batch, find their homefeed key 
    # in Redis and insert the photo_id into that homefeed list. 
    for follower_id in followers_list: 
     homefeed_list = r_server.lpush("homefeed:%s" % (follower_id), photo_id) 

    insert_into_homefeed.delay(photo_id, user_id, range_limit + 1) 

這種運作良好,因爲Redis的lists are ordered和lrange命令doesn't return an error on out-of-range inputs

+0

感謝您的快速回復! :)好方法!但是這不是一個無限循環嗎?即使在完成整個列表後,這個任務會不會一遍又一遍地被調用? – noahandthewhale

+0

啊!我剛剛看到了如果followers_list_batch: – noahandthewhale

+0

你明白了。這可能是一個很好的跡象,我應該使用明確的返回語句。 –

相關問題