2011-05-19 39 views
9

我對GAE中mapreduce支持的當前狀態有些困惑。根據文檔http://code.google.com/p/appengine-mapreduce/還沒有支持reduce階段,但在I/O 2011(http://www.youtube.com/watch?v=EIxelKcyCC0)的會話描述中寫道:「現在可以在App Engine上運行完整的Map Reduce作業」。我不知道如果我能在這個任務中使用的MapReduce:在Google App Engine中使用mapreduce的簡單計數器示例

我想要做什麼:

我有外地的顏色模型車:

class Car(db.Model): 
    color = db.StringProperty() 

我想運行MapReduce的過程(從時間,cron定義),它可以計算每種顏色中有多少輛車,並將此結果存儲在數據存儲區中。看起來像一個非常適合mapreduce的工作(但如果我錯了我的話),階段「map」將爲每個Car實體生成對(,1),階段「reduce」應該通過color_name合併這個數據,給我預期的結果。最後的結果我想是與存儲在數據存儲計算數據,這樣的事情實體:

class CarsByColor(db.Model): 
    color_name = db.StringProperty() 
    cars_num = db.IntegerProperty() 

問題: 我不知道如何在AppEngine上實現這個...錄像顯示例子使用定義的映射和減少函數,但它們似乎是與數據存儲無關的非常普遍的示例。我發現的所有其他示例都使用一個函數處理來自DatastoreInputReader的數據,但它們似乎只是「映射」階段,沒有示例說明如何執行「reduce」(以及如何將減少的結果存儲在數據存儲)。

回答

6

我在這裏提供的解決方案我最終使用GAE中的mapreduce(沒有縮小階段)。如果我從頭開始,我可能會使用由Drew Sears提供的解決方案。

它工作在GAE的Python 1.5.0

應用。YAML我添加了MapReduce的處理程序:

- url: /mapreduce(/.*)? 
    script: $PYTHON_LIB/google/appengine/ext/mapreduce/main.py 

和我的mapreduce的代碼處理程序(我使用的URL/mapred_update收集由MapReduce的產生的結果):

- url: /mapred_.* 
    script: mapred.py 

創建mapreduce.yaml用於加工車實體:

mapreduce: 
- name: Color_Counter 
    params: 
    - name: done_callback 
    value: /mapred_update 
    mapper: 
    input_reader: google.appengine.ext.mapreduce.input_readers.DatastoreInputReader 
    handler: mapred.process 
    params: 
    - name: entity_kind 
     default: models.Car 

說明:done_callback是mapreduce完成其操作後調用的url。 mapred.process是一個處理單個實體和更新計數器(它在mapred.py文件中定義)的函數。型號汽車在models.py

mapred.py定義:

from models import CarsByColor 
from google.appengine.ext import db 
from google.appengine.ext.mapreduce import operation as op 
from google.appengine.ext.mapreduce.model import MapreduceState 

from google.appengine.ext import webapp 
from google.appengine.ext.webapp.util import run_wsgi_app 

def process(entity): 
    """Process individual Car""" 
    color = entity.color 
    if color: 
     yield op.counters.Increment('car_color_%s' % color) 

class UpdateCounters(webapp.RequestHandler): 
    """Create stats models CarsByColor based on the data 
    gathered by mapreduce counters""" 
    def post(self): 
     """Called after mapreduce operation are finished""" 
     # Finished mapreduce job id is passed in request headers 
     job_id = self.request.headers['Mapreduce-Id'] 
     state = MapreduceState.get_by_job_id(job_id) 
     to_put = [] 
     counters = state.counters_map.counters 
     # Remove counter not needed for stats 
     del counters['mapper_calls'] 
     for counter in counters.keys(): 
      stat = CarsByColor.get_by_key_name(counter) 
      if not stat: 
       stat = CarsByColor(key_name=counter, 
           name=counter) 
      stat.value = counters[counter] 
      to_put.append(stat) 
     db.put(to_put) 

     self.response.headers['Content-Type'] = 'text/plain' 
     self.response.out.write('Updated.') 


application = webapp.WSGIApplication(
            [('/mapred_update', UpdateCounters)], 
            debug=True) 
def main(): 
    run_wsgi_app(application) 

if __name__ == "__main__": 
    main()    

有CarsByColor模式的改變定義相比,略有問題的。

您可以從url:http://yourapp/mapreduce/手動啓動mapreduce作業,並希望從cron(我還沒有測試過cron)。

9

你並不需要減少階段。你可以用線性任務鏈做到這一點,或多或少如下:

def count_colors(limit=100, totals={}, cursor=None): 
    query = Car.all() 
    if cursor: 
    query.with_cursor(cursor) 
    cars = query.fetch(limit) 
    for car in cars: 
    try: 
     totals[car.color] += 1 
    except KeyError: 
     totals[car.color] = 1 
    if len(cars) == limit: 
    cursor = query.cursor() 
    return deferred.defer(count_colors, limit, totals, cursor) 
    entities = [] 
    for color in totals: 
    entity = CarsByColor(key_name=color) 
    entity.cars_num = totals[color] 
    entities.append(entity) 
    db.put(entities) 

deferred.defer(count_colors) 

這應該遍歷所有的汽車,通過一個查詢光標和流水賬了一系列即席任務,以及商店總數在最後。

如果您必須在單個模型中合併來自多個數據存儲區,多個模型或多個索引的數據,那麼縮小階段可能有意義。因爲我不認爲它會給你買東西。

另一種選擇:使用任務隊列維護每種顏色的實時計數器。當您創建一輛汽車時,啓動一項任務以增加該顏色的總量。當你更新一輛汽車時,啓動一個任務來減少舊的顏色,而另一個任務則遞增新的顏色。更新計數器以避免競爭條件。

相關問題