我正在尋找一種在使用MapReduce的數據存儲區查詢中進行GROUP BY操作的方法。 AFAIK App Engine在GQL中不支持GROUP BY本身,而其他開發者建議的一種好方法是使用MapReduce。在App Engine中使用MapReduce創建GROUP BY
我下載了source code,我正在研究demo code,我試着在我的情況下實施。但我沒有成功。這是我如何嘗試去做的。也許我所做的一切都是錯誤的。所以如果有人能幫我做到這一點,我會感謝。
我想要做的是:我有一大堆的數據存儲的聯繫人,每個聯繫人有個約會。在同一日期有一堆重複的聯繫人。我想要做的就是簡化羣組,收集同一日期的同一聯繫人。
E.g:
比方說,我有這個聯繫人:
- CONTACT_NAME:Foo1 |日期:01-10-2012
- CONTACT_NAME:Foo2 |日期:02-05-2012
- CONTACT_NAME:Foo1 |日期:2012年1月10日
所以MapReduce的操作後,這將是這樣的:
- CONTACT_NAME:Foo1 |日期:01-10-2012
- CONTACT_NAME:Foo2 |日期:2012年2月5日
對於GROUP BY功能,我認爲字數完成這項工作。
編輯
是在日誌中顯示的唯一的事情就是:
/MapReduce的/管道/運行200
運行GetContactData.WordCountPipeline((U '2012-02-02',), * {})#da26a9b555e311e19b1e6d324d450c1a
編輯完
如果我做錯了什麼,如果我使用了錯誤的方法做一組由MapReduce的,幫我如何做,與MapReduce的。
這裏是我的代碼:
from Contacts import Contacts
from google.appengine.ext import webapp
from google.appengine.ext.webapp import template
from google.appengine.ext.webapp.util import run_wsgi_app
from google.appengine.api import mail
from google.appengine.ext.db import GqlQuery
from google.appengine.ext import db
from google.appengine.api import taskqueue
from google.appengine.api import users
from mapreduce.lib import files
from mapreduce import base_handler
from mapreduce import mapreduce_pipeline
from mapreduce import operation as op
from mapreduce import shuffler
import simplejson, logging, re
class GetContactData(webapp.RequestHandler):
# Get the calls based on the user id
def get(self):
contactId = self.request.get('contactId')
query_contacts = Contact.all()
query_contacts.filter('contact_id =', int(contactId))
query_contacts.order('-timestamp_')
contact_data = []
if query_contacts != None:
for contact in query_contacts:
pipeline = WordCountPipeline(contact.date)
pipeline.start()
record = { "contact_id":contact.contact_id,
"contact_name":contact.contact_name,
"contact_number":contact.contact_number,
"timestamp":contact.timestamp_,
"current_time":contact.current_time_,
"type":contact.type_,
"current_date":contact.date }
contact_data.append(record)
self.response.headers['Content-Type'] = 'application/json'
self.response.out.write(simplejson.dumps(contact_data))
class WordCountPipeline(base_handler.PipelineBase):
"""A pipeline to run Word count demo.
Args:
blobkey: blobkey to process as string. Should be a zip archive with
text files inside.
"""
def run(self, date):
output = yield mapreduce_pipeline.MapreducePipeline(
"word_count",
"main.word_count_map",
"main.word_count_reduce",
"mapreduce.input_readers.DatastoreInputReader",
"mapreduce.output_writers.BlobstoreOutputWriter",
mapper_params={
"date": date,
},
reducer_params={
"mime_type": "text/plain",
},
shards=16)
yield StoreOutput("WordCount", output)
class StoreOutput(base_handler.PipelineBase):
"""A pipeline to store the result of the MapReduce job in the database.
Args:
mr_type: the type of mapreduce job run (e.g., WordCount, Index)
encoded_key: the DB key corresponding to the metadata of this job
output: the blobstore location where the output of the job is stored
"""
def run(self, mr_type, output):
logging.info(output) # here I should append the grouped duration in JSON
如果你得到一個404,錯誤不是(還沒有?)在您的mapreduce代碼中,但在app.yaml或處理程序路由中。請先發布那些。 – mjibson 2012-02-13 01:14:53
我修復了app.yaml。不是404錯誤,管道被執行。檢查編輯看看發生了什麼。我不認爲它是正確的。如果你能幫助我如何通過MapReduce創建GROUP。謝謝! – rogcg 2012-02-13 01:45:58