0
我正在嘗試將我的管道響應寫入Google存儲,但獲取已安裝在服務器上的模塊導入錯誤。Python Apache Beam Google存儲寫入錯誤
代碼:
from __future__ import print_function, absolute_import
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.transforms import PTransform, ParDo, DoFn, Create
from apache_beam.io import iobase, range_trackers
import logging
import re
import argparse
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)
def mongo_connection_string(url):
import logging
logger = logging.getLogger(__name__)
if 'gs://' in url:
from google.cloud import storage
logging.info('Fetching connection string from Cloud Storage {}'.format(url))
_, path = url.split('gs://')
path = path.split('/')
bucket = path[0]
path = '/'.join(path[1:])
client = storage.Client()
blob = client.get_bucket(bucket).get_blob(path).download_as_string()
connection_string = blob.splitlines()[0]
return connection_string
logger.info('Using connection string from CLI options')
return url
iso_match = re.compile(r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}')
def clean_query(query):
new_query = {}
for key, val in query.iteritems():
if isinstance(val, basestring):
val = str(val)
if isinstance(val, basestring) and iso_match.match(val):
val = datetime.datetime.strptime(val[0:19], '%Y-%m-%dT%H:%M:%S')
elif isinstance(val, dict):
val = clean_query(val)
new_query[str(key)] = val
return new_query
class _MongoSource(iobase.BoundedSource):
import pymongo
def __init__(self, connection_string, db, collection, query=None, fields=None):
import logging
logger = logging.getLogger(__name__)
self._connection_string = connection_string
self._db = db
self._collection = collection
self._fields = fields
self._client = None
# Prepare query
self._query = query
if not self._query:
self._query = {}
logger.info('Raw query: {}'.format(query))
self._query = clean_query(self._query)
logger.info('Cleaned query: {}'.format(self._query))
@property
def client(self):
import logging
import pymongo
logger = logging.getLogger(__name__)
if self._client:
logger.info('Reusing existing PyMongo client')
return self._client
logger.info('Preparing new PyMongo client')
self._client = pymongo.MongoClient(self._connection_string)
return self._client
def estimate_size(self):
return self.client[self._db][self._collection].count(self._query)
def get_range_tracker(self, start_position, stop_position):
from apache_beam.io import iobase, range_trackers
if start_position is None:
start_position = 0
if stop_position is None:
stop_position = range_trackers.OffsetRangeTracker.OFFSET_INFINITY
range_tracker = range_trackers.OffsetRangeTracker(start_position, stop_position)
range_tracker = range_trackers.UnsplittableRangeTracker(range_tracker)
return range_tracker
def read(self, range_tracker):
coll = self.client[self._db][self._collection]
for doc in coll.find(self._query, projection=self._fields):
yield doc
def split(self, desired_bundle_size, start_position=None, stop_position=None):
from apache_beam.io import iobase, range_trackers
if start_position is None:
start_position = 0
if stop_position is None:
stop_position = range_trackers.OffsetRangeTracker.OFFSET_INFINITY
yield iobase.SourceBundle(
weight=1,
source=self,
start_position=start_position,
stop_position=stop_position)
class ReadFromMongo(PTransform):
def __init__(self, connection_string, db, collection, query=None, fields=None):
super(ReadFromMongo, self).__init__()
self._connection_string = connection_string
self._db = db
self._collection = collection
self._query = query
self._fields = fields
self._source = _MongoSource(
self._connection_string,
self._db,
self._collection,
query=self._query,
fields=self._fields)
def expand(self, pcoll):
import logging
logger = logging.getLogger(__name__)
logger.info('Starting MongoDB read from {}.{} with query {}'
.format(self._db, self._collection, self._query))
return pcoll | iobase.Read(self._source)
def display_data(self):
return {'source_dd': self._source}
def transform_doc(document):
data={str(document['clause type']):int(document['count'])}
return data
def run():
import time
parser = argparse.ArgumentParser()
parser.add_argument('--output',
dest='output',
default='<output path>',
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args()
gcs_path = "<gcs URL>"
project_name = "<project name>"
pipeline_args.extend(['--runner=DataflowRunner',
"--project=civic-eye-181513",
"--staging_location=<stagging location>",
"--temp_location=<temp location>"
])
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as pipeline:
print ("starting pipleline")
connection_string = '<mongo URL>'
(pipeline
| "Load" >> ReadFromMongo(connection_string, 'new', 'Data', query={}, fields=['clause type','count'])
| "transform" >> beam.Map(transform_doc).with_output_types(str)
| "Save" >> WriteToText("{0}/output/wordcount{1}".format(gcs_path,int(time.time()))))
print ("done")
if __name__ == '__main__':
run()
錯誤:
Exception in worker loop: Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 738, in run work,
execution_context, env=self.environment)
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/workitem.py", line 130, in get_work_items
work_item_proto.sourceOperationTask.split)
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/workercustomsources.py", line 142, in __init__
source_spec[names.SERIALIZED_SOURCE_KEY]['value'])
File "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", line 225, in loads return
dill.loads(s)
File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 277, in loads
return load(file)
File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 266, in load
obj = pik.load()
File "/usr/lib/python2.7/pickle.py", line 858, in load
dispatch[key](self)
File "/usr/lib/python2.7/pickle.py", line 1133, in load_reduce
value = func(*args)
File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 766, in _import_module
return __import__(import_name)
ImportError: No module named pymongo
注:Pymongo模塊已經與最新版本的安裝:
pip show pymongo
Name: pymongo
Version: 3.5.1
Summary: Python driver for MongoDB <http://www.mongodb.org>
Home-page: http://github.com/mongodb/mongo-python-driver
Author: Bernie Hackett
Author-email: [email protected]
License: Apache License, Version 2.0
Location: /usr/local/lib/python2.7/dist-packages
感謝
感謝Anuj瞭解
setuptools
:讓我在這方面的工作,並更新你 –現在收到此錯誤:數據流似乎被卡住了。請通過http://stackoverflow.com/questions/tagged/google-cloud-dataflow與Dataflow團隊聯繫。 –
您使用的Apache梁版本是什麼? – Anuj