2012-02-12 50 views
1

我正在尋找一種在使用MapReduce的數據存儲區查詢中進行GROUP BY操作的方法。 AFAIK App Engine在GQL中不支持GROUP BY本身,而其他開發者建議的一種好方法是使用MapReduce在App Engine中使用MapReduce創建GROUP BY

我下載了source code,我正在研究demo code,我試着在我的情況下實施。但我沒有成功。這是我如何嘗試去做的。也許我所做的一切都是錯誤的。所以如果有人能幫我做到這一點,我會感謝。


我想要做的是:我有一大堆的數據存儲的聯繫人,每個聯繫人有個約會。在同一日期有一堆重複的聯繫人。我想要做的就是簡化羣組,收集同一日期的同一聯繫人。

E.g:

比方說,我有這個聯繫人:

  1. CONTACT_NAME:Foo1 |日期:01-10-2012
  2. CONTACT_NAME:Foo2 |日期:02-05-2012
  3. CONTACT_NAME:Foo1 |日期:2012年1月10日

所以MapReduce的操作後,這將是這樣的:

  1. CONTACT_NAME:Foo1 |日期:01-10-2012
  2. CONTACT_NAME:Foo2 |日期:2012年2月5日

對於GROUP BY功能,我認爲字數完成這項工作。


編輯

是在日誌中顯示的唯一的事情就是:

/MapReduce的/管道/運行200

運行GetContactData.WordCountPipeline((U '2012-02-02',), * {})#da26a9b555e311e19b1e6d324d450c1a

編輯完

如果我做錯了什麼,如果我使用了錯誤的方法做一組由MapReduce的,幫我如何做,與MapReduce的。


這裏是我的代碼:

from Contacts import Contacts 
from google.appengine.ext import webapp 
from google.appengine.ext.webapp import template 
from google.appengine.ext.webapp.util import run_wsgi_app 
from google.appengine.api import mail 
from google.appengine.ext.db import GqlQuery 
from google.appengine.ext import db 


from google.appengine.api import taskqueue 
from google.appengine.api import users 

from mapreduce.lib import files 
from mapreduce import base_handler 
from mapreduce import mapreduce_pipeline 
from mapreduce import operation as op 
from mapreduce import shuffler 

import simplejson, logging, re 


class GetContactData(webapp.RequestHandler): 

    # Get the calls based on the user id 
    def get(self): 
     contactId = self.request.get('contactId') 
     query_contacts = Contact.all() 
     query_contacts.filter('contact_id =', int(contactId)) 
     query_contacts.order('-timestamp_') 
     contact_data = [] 
     if query_contacts != None: 
      for contact in query_contacts: 
        pipeline = WordCountPipeline(contact.date) 
        pipeline.start() 
        record = { "contact_id":contact.contact_id, 
           "contact_name":contact.contact_name, 
           "contact_number":contact.contact_number, 
           "timestamp":contact.timestamp_, 
           "current_time":contact.current_time_, 
           "type":contact.type_, 
           "current_date":contact.date } 
        contact_data.append(record) 

     self.response.headers['Content-Type'] = 'application/json' 
     self.response.out.write(simplejson.dumps(contact_data)) 

class WordCountPipeline(base_handler.PipelineBase): 
    """A pipeline to run Word count demo. 

    Args: 
    blobkey: blobkey to process as string. Should be a zip archive with 
     text files inside. 
    """ 

    def run(self, date): 
    output = yield mapreduce_pipeline.MapreducePipeline(
     "word_count", 
     "main.word_count_map", 
     "main.word_count_reduce", 
     "mapreduce.input_readers.DatastoreInputReader", 
     "mapreduce.output_writers.BlobstoreOutputWriter", 
     mapper_params={ 
      "date": date, 
     }, 
     reducer_params={ 
      "mime_type": "text/plain", 
     }, 
     shards=16) 
    yield StoreOutput("WordCount", output) 

class StoreOutput(base_handler.PipelineBase): 
    """A pipeline to store the result of the MapReduce job in the database. 

    Args: 
    mr_type: the type of mapreduce job run (e.g., WordCount, Index) 
    encoded_key: the DB key corresponding to the metadata of this job 
    output: the blobstore location where the output of the job is stored 
    """ 

    def run(self, mr_type, output): 
     logging.info(output) # here I should append the grouped duration in JSON 
+0

如果你得到一個404,錯誤不是(還沒有?)在您的mapreduce代碼中,但在app.yaml或處理程序路由中。請先發布那些。 – mjibson 2012-02-13 01:14:53

+0

我修復了app.yaml。不是404錯誤,管道被執行。檢查編輯看看發生了什麼。我不認爲它是正確的。如果你能幫助我如何通過MapReduce創建GROUP。謝謝! – rogcg 2012-02-13 01:45:58

回答

0

我基於代碼@autumngard在這個question提供和修改,以適應我的目的和它的工作。