1

我試圖跟隨this Codelab一起,告訴你如何從你的谷歌應用引擎數據存儲中獲取數據,並通過谷歌雲存儲和通過設置一個MapReduce管道。我建立了一個Google App Engine數據存儲實體,並且有一個過程來收集關於某些我想收集數據的股票的推文。我相信我已經遵循了示例中列出的所有內容,但是分解數據並將其加載到雲存儲中的所有工作的碎片引發了UnicodeEncodeErrors。下面是從我測試的應用dev的應用服務器上的日誌:UnicodeEncodeError谷歌應用引擎數據存儲到BigQuery過程

INFO  2012-12-18 20:41:07,645 dev_appserver.py:3103] "POST /mapreduce/worker_callback HTTP/1.1" 500 - 
WARNING 2012-12-18 20:41:07,648 taskqueue_stub.py:1981] Task appengine-mrshard-1582400592541472B07B9-0-0 failed to execute. This task will retry in 0.100 seconds 
ERROR 2012-12-18 20:41:09,453 webapp2.py:1552] 'ascii' codec can't encode character u'\u2019' in position 80: ordinal not in range(128) 
Traceback (most recent call last): 
File "C:\Program Files (x86)\Google\google_appengine\lib\webapp2\webapp2.py", line 1535, in __call__ 
rv = self.handle_exception(request, response, e) 
File "C:\Program Files (x86)\Google\google_appengine\lib\webapp2\webapp2.py", line 1529, in __call__ 
rv = self.router.dispatch(request, response) 
File "C:\Program Files (x86)\Google\google_appengine\lib\webapp2\webapp2.py", line 1278, in default_dispatcher 
return route.handler_adapter(request, response) 
File "C:\Program Files (x86)\Google\google_appengine\lib\webapp2\webapp2.py", line 1102, in __call__ 
return handler.dispatch() 
File "C:\Program Files (x86)\Google\google_appengine\lib\webapp2\webapp2.py", line 572, in dispatch 
return self.handle_exception(e, self.app.debug) 
File "C:\Program Files (x86)\Google\google_appengine\lib\webapp2\webapp2.py", line 570, in dispatch 
return method(*args, **kwargs) 
File "C:\Users\Tank\Documents\Aptana Studio 3 Workspace\jibdantest-bq\mapreduce\base_handler.py", line 65, in post 
self.handle() 
File "C:\Users\Tank\Documents\Aptana Studio 3 Workspace\jibdantest-bq\mapreduce\handlers.py", line 181, in handle 
entity, input_reader, ctx, tstate) 
File "C:\Users\Tank\Documents\Aptana Studio 3 Workspace\jibdantest-bq\mapreduce\handlers.py", line 298, in process_data 
output_writer.write(output, ctx) 
File "C:\Users\Tank\Documents\Aptana Studio 3 Workspace\jibdantest-bq\mapreduce\output_writers.py", line 659, in write 
ctx.get_pool("file_pool").append(self._filename, str(data)) 
UnicodeEncodeError: 'ascii' codec can't encode character u'\u2019' in position 80: ordinal not in range(128) 

下面的代碼:

import json 
import webapp2 
import urllib2 
import time 
import calendar 
import datetime 
import httplib2 

from google.appengine.ext import db 
from google.appengine.api import taskqueue 
from google.appengine.ext import blobstore 
from google.appengine.ext.webapp.util import run_wsgi_app 
from google.appengine.ext.webapp import blobstore_handlers 
from google.appengine.ext.webapp import util 
from google.appengine.ext.webapp import template 
from google.appengine.api import urlfetch 

from mapreduce.lib import files 
from mapreduce import base_handler 
from mapreduce import mapreduce_pipeline 
from apiclient.discovery import build 
from oauth2client.appengine import AppAssertionCredentials 

SCOPE = 'https://www.googleapis.com/auth/bigquery' 
PROJECT_ID = 'project_id' # Your Project ID here 
BQ_DATASET_ID = 'datastore_data' 
GS_BUCKET = 'bucketname' 
ENTITY_KIND = 'main.streamdata' 

class streamdata(db.Model): 
    querydate = db.DateTimeProperty(auto_now_add = True) 
    ticker = db.StringProperty() 
    created_at = db.StringProperty() 
    tweet_id = db.StringProperty() 
    text = db.TextProperty() 
    source = db.StringProperty() 

class DatastoreMapperPipeline(base_handler.PipelineBase): 

    def run(self, entity_type): 

     output = yield mapreduce_pipeline.MapperPipeline(
      "Datastore Mapper %s" % entity_type, 
      "main.datastore_map", 
      "mapreduce.input_readers.DatastoreInputReader", 
      output_writer_spec="mapreduce.output_writers.FileOutputWriter", 
      params={ 
       "input_reader":{ 
        "entity_kind": entity_type, 
        }, 
       "output_writer":{ 
        "filesystem": "gs", 
        "gs_bucket_name": GS_BUCKET, 
        "output_sharding":"none", 
        } 
       }, 
       shards=10) 

     yield CloudStorageToBigQuery(output) 

class CloudStorageToBigQuery(base_handler.PipelineBase): 

    def run(self, csv_output): 

     credentials = AppAssertionCredentials(scope=SCOPE) 
     http = credentials.authorize(httplib2.Http()) 
     bigquery_service = build("bigquery", "v2", http=http) 

     jobs = bigquery_service.jobs() 
     table_name = 'datastore_data_%s' % datetime.datetime.utcnow().strftime(
      '%m%d%Y_%H%M%S') 
     files = [str(f.replace('/gs/', 'gs://')) for f in csv_output] 
     result = jobs.insert(projectId=PROJECT_ID, 
          body=build_job_data(table_name,files)) 

     result.execute() 

def build_job_data(table_name, files): 
    return {"projectId": PROJECT_ID, 
      "configuration":{ 
       "load": { 
        "sourceUris": files, 
        "schema":{ 
         "fields":[ 
          { 
           "name":"querydate", 
           "type":"INTEGER", 
          }, 
          { 
           "name":"ticker", 
           "type":"STRING", 
          }, 
          { 
           "name":"created_at", 
           "type":"STRING", 
          }, 
          { 
           "name":"tweet_id", 
           "type":"STRING", 
          }, 
          { "name":"text", 
           "type":"TEXT", 
          }, 
          {  
           "name":"source", 
           "type":"STRING", 
          } 
          ] 
         }, 
        "destinationTable":{ 
         "projectId": PROJECT_ID, 
         "datasetId": BQ_DATASET_ID, 
         "tableId": table_name, 
         }, 
        "maxBadRecords": 0, 
        } 
       } 
      } 

def datastore_map(entity_type): 
    data = db.to_dict(entity_type) 
    resultlist = [timestamp_to_posix(data.get('querydate')), 
        data.get('ticker'), 
        data.get('created_at'), 
        data.get('tweet_id'), 
        data.get('text'), 
        data.get('source')] 
    result = ','.join(['"%s"' % field for field in resultlist]) 
    yield("%s\n" % result) 

def timestamp_to_posix(timestamp): 
    return int(time.mktime(timestamp.timetuple())) 

class DatastoretoBigQueryStart(webapp2.RequestHandler): 
    def get(self): 
     pipeline = DatastoreMapperPipeline(ENTITY_KIND) 
     pipeline.start() 
     path = pipeline.base_path + "/status?root=" + pipeline.pipeline_id 
     self.redirect(path) 

class StreamHandler(webapp2.RequestHandler): 

    def get(self): 

     tickers = ['AAPL','GOOG', 'IBM', 'BAC', 'INTC', 
        'DELL', 'C', 'JPM', 'WFM', 'WMT', 
        'AMZN', 'HOT', 'SPG', 'SWY', 'HTSI', 
        'DUK', 'CEG', 'XOM', 'F', 'WFC', 
        'CSCO', 'UAL', 'LUV', 'DAL', 'COST', 'YUM', 
        'TLT', 'HYG', 'JNK', 'LQD', 'MSFT', 
        'GE', 'LVS', 'MGM', 'TWX', 'DIS', 'CMCSA', 
        'TWC', 'ORCL', 'WPO', 'NYT', 'GM', 'JCP', 
        'LNKD', 'OPEN', 'NFLX', 'SBUX', 'GMCR', 
        'SPLS', 'BBY', 'BBBY', 'YHOO', 'MAR', 
        'L', 'LOW', 'HD', 'HOV', 'TOL', 'NVR', 'RYL', 
        'GIS', 'K', 'POST', 'KRFT', 'CHK', 'GGP', 
        'RSE', 'RWT', 'AIG', 'CB', 'BRK.A', 'CAT'] 

     for i in set(tickers): 

      url = 'http://search.twitter.com/search.json?q=' 
      resultcount = '&rpp=100' 
      language = '&lang=en' 
      encoding = '%40%24' 
      tickerstring = url + encoding + i + resultcount + language 
      tickurl = urllib2.Request(tickerstring) 
      tweets = urllib2.urlopen(tickurl) 
      code = tweets.getcode() 

      if code == 200: 
       results = json.load(tweets, 'utf-8') 
       if "results" in results: 
        entries = results["results"] 
        for entry in entries: 
         tweet = streamdata() 
         created = entry['created_at'] 
         tweetid = entry['id_str'] 
         tweettxt = entry['text'] 
         tweet.ticker = i 
         tweet.created_at = created 
         tweet.tweet_id = tweetid 
         tweet.text = tweettxt 
         tweet.source = "Twitter" 
         tweet.put() 

class MainHandler(webapp2.RequestHandler): 

    def get(self): 
     self.response.out.write('<a href="/start">Click here</a> to start the Datastore to BigQuery pipeline. ') 
     self.response.out.write('<a href="/add_data">Click here</a> to start adding data to the datastore. ') 


app = webapp2.WSGIApplication([ 
           ('/', MainHandler), 
           ('/start', DatastoretoBigQueryStart), 
           ('/add_data', StreamHandler)], 
           debug=True) 

任何見解的人可能會是一個很大的幫助。

很多謝謝。

回答

3

要轉換Unicode數據的字節串:

ctx.get_pool("file_pool").append(self._filename, str(data)) 

當你這樣做,如果沒有指定編碼,巨蟒回落到默認,這是ASCII。您需要解決不同的編碼問題,而是可以處理您的數據包含的所有Unicode代碼點。

對於大多數文本,UTF-8是一個不錯的選擇;如果你有很多非西方文本(阿拉伯文,亞洲文等),那麼UTF-16可能更有效率。在這兩種情況下,你必須明確地編碼:

ctx.get_pool("file_pool").append(self._filename, data.encode('utf8')) 

當讀回從該文件中的數據,用filedata.decode('utf8')解碼回爲Unicode。

有關Python和Unicode更多信息,請參見Python Unicode HOWTO

0
ctx.get_pool("file_pool").append(self._filename, str(data)) 

如果數據包含Unicode字符,這將失敗。嘗試

ctx.get_pool("file_pool").append(self._filename, unicode(data)) 
相關問題