有一個GAE數據存儲種類,其中有幾個100'000個物體。想做幾個涉及的查詢(涉及計數查詢)。 Big Query似乎是適合這樣做的上帝。Google App Engine:在數據存儲上使用Big Query?
目前有一種簡單的方法可以使用Big Query查詢實時AppEngine Datastore嗎?
有一個GAE數據存儲種類,其中有幾個100'000個物體。想做幾個涉及的查詢(涉及計數查詢)。 Big Query似乎是適合這樣做的上帝。Google App Engine:在數據存儲上使用Big Query?
目前有一種簡單的方法可以使用Big Query查詢實時AppEngine Datastore嗎?
您無法直接在DataStore實體上運行BigQuery,但您可以編寫一個Mapper管道,從DataStore中讀取實體,將它們寫入CSV雲存儲中,然後將這些實例吸收到BigQuery中 - 甚至可以自動化該過程。下面是一個使用Mapper API類只是數據存儲到CSV步驟示例:
import re
import time
from datetime import datetime
import urllib
import httplib2
import pickle
from google.appengine.ext import blobstore
from google.appengine.ext import db
from google.appengine.ext import webapp
from google.appengine.ext.webapp.util import run_wsgi_app
from google.appengine.ext.webapp import blobstore_handlers
from google.appengine.ext.webapp import util
from google.appengine.ext.webapp import template
from mapreduce.lib import files
from google.appengine.api import taskqueue
from google.appengine.api import users
from mapreduce import base_handler
from mapreduce import mapreduce_pipeline
from mapreduce import operation as op
from apiclient.discovery import build
from google.appengine.api import memcache
from oauth2client.appengine import AppAssertionCredentials
#Number of shards to use in the Mapper pipeline
SHARDS = 20
# Name of the project's Google Cloud Storage Bucket
GS_BUCKET = 'your bucket'
# DataStore Model
class YourEntity(db.Expando):
field1 = db.StringProperty() # etc, etc
ENTITY_KIND = 'main.YourEntity'
class MapReduceStart(webapp.RequestHandler):
"""Handler that provides link for user to start MapReduce pipeline.
"""
def get(self):
pipeline = IteratorPipeline(ENTITY_KIND)
pipeline.start()
path = pipeline.base_path + "/status?root=" + pipeline.pipeline_id
logging.info('Redirecting to: %s' % path)
self.redirect(path)
class IteratorPipeline(base_handler.PipelineBase):
""" A pipeline that iterates through datastore
"""
def run(self, entity_type):
output = yield mapreduce_pipeline.MapperPipeline(
"DataStore_to_Google_Storage_Pipeline",
"main.datastore_map",
"mapreduce.input_readers.DatastoreInputReader",
output_writer_spec="mapreduce.output_writers.FileOutputWriter",
params={
"input_reader":{
"entity_kind": entity_type,
},
"output_writer":{
"filesystem": "gs",
"gs_bucket_name": GS_BUCKET,
"output_sharding":"none",
}
},
shards=SHARDS)
def datastore_map(entity_type):
props = GetPropsFor(entity_type)
data = db.to_dict(entity_type)
result = ','.join(['"%s"' % str(data.get(k)) for k in props])
yield('%s\n' % result)
def GetPropsFor(entity_or_kind):
if (isinstance(entity_or_kind, basestring)):
kind = entity_or_kind
else:
kind = entity_or_kind.kind()
cls = globals().get(kind)
return cls.properties()
application = webapp.WSGIApplication(
[('/start', MapReduceStart)],
debug=True)
def main():
run_wsgi_app(application)
if __name__ == "__main__":
main()
如果追加此您IteratorPipeline類的末尾:yield CloudStorageToBigQuery(output)
,你可以管得到的CSV文件句柄到BigQuery擷取管道。 ..這樣的:
class CloudStorageToBigQuery(base_handler.PipelineBase):
"""A Pipeline that kicks off a BigQuery ingestion job.
"""
def run(self, output):
# BigQuery API Settings
SCOPE = 'https://www.googleapis.com/auth/bigquery'
PROJECT_ID = 'Some_ProjectXXXX'
DATASET_ID = 'Some_DATASET'
# Create a new API service for interacting with BigQuery
credentials = AppAssertionCredentials(scope=SCOPE)
http = credentials.authorize(httplib2.Http())
bigquery_service = build("bigquery", "v2", http=http)
jobs = bigquery_service.jobs()
table_name = 'datastore_dump_%s' % datetime.utcnow().strftime(
'%m%d%Y_%H%M%S')
files = [str(f.replace('/gs/', 'gs://')) for f in output]
result = jobs.insert(projectId=PROJECT_ID,
body=build_job_data(table_name,files)).execute()
logging.info(result)
def build_job_data(table_name, files):
return {"projectId": PROJECT_ID,
"configuration":{
"load": {
"sourceUris": files,
"schema":{
# put your schema here
"fields": fields
},
"destinationTable":{
"projectId": PROJECT_ID,
"datasetId": DATASET_ID,
"tableId": table_name,
},
}
}
}
不,BigQuery是需要將數據上傳到其中的不同產品。它無法在數據存儲上運行。您可以使用GQL查詢數據存儲。
對於BigQuery,您必須將這些類別導出爲CSV或分隔記錄結構,然後加載到BigQuery中,然後您可以進行查詢。我不知道哪個設施可以查詢實時GAE數據存儲。
Biquery是分析查詢引擎,意味着您無法更改記錄。不允許更新或刪除,只能追加。
我們正在做一個受信任的測試程序從數據存儲移動到BigQuery的兩個簡單的操作:
它會自動處理模式的爲您服務。
更多信息(適用):https://docs.google.com/a/google.com/spreadsheet/viewform?formkey=dHdpeXlmRlZCNWlYSE9BcE5jc2NYOUE6MQ
有了新的(從2013年9月)streaming inserts api你可以從你的應用程序到BigQuery的進口記錄。
數據在BigQuery中立即可用,所以這應該滿足您的實時需求。
雖然這個問題現在是有點老了,這可能是任何人碰到這個問題
絆腳石此刻雖然得到這個從工作的本地開發服務器不是特別好一個簡單的解決方案。
截至2016年,現在這是非常可能的!你必須做到以下幾點:
有關此工作流程的完整示例,請參閱this post!
那麼發生了什麼?任何關於TTP命運的更新? – gae123
是啊,已經有一段時間了 – ZiglioUK
也有興趣 – Omri