2016-08-22 150 views
4

當前是否可以使用Apache Beam讀取python中的gzip文件? 我的管道是從GCS拉gzip格式與這行代碼:在python中打開gzip文件Apache Beam

beam.io.Read(beam.io.TextFileSource('gs://bucket/file.gz', compression_type='GZIP')) 

但我收到此錯誤:

UnicodeDecodeError: 'utf8' codec can't decode byte 0x8b in position 1: invalid start byte 

我們注意到在壓縮文件中的蟒蛇束源代碼似乎是在寫入接收器時處理。 https://github.com/apache/incubator-beam/blob/python-sdk/sdks/python/apache_beam/io/fileio.py#L445

更多詳細回溯:

Traceback (most recent call last): 
    File "beam-playground.py", line 11, in <module> 
    p.run() 
    File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/pipeline.py", line 159, in run 
    return self.runner.run(self) 
    File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 103, in run 
    super(DirectPipelineRunner, self).run(pipeline) 
    File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 98, in run 
    pipeline.visit(RunVisitor(self)) 
    File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/pipeline.py", line 182, in visit 
    self._root_transform().visit(visitor, self, visited) 
    File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/pipeline.py", line 419, in visit 
    part.visit(visitor, pipeline, visited) 
    File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/pipeline.py", line 422, in visit 
    visitor.visit_transform(self) 
    File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 93, in visit_transform 
    self.runner.run_transform(transform_node) 
    File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 168, in run_transform 
    return m(transform_node) 
    File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 99, in func_wrapper 
    func(self, pvalue, *args, **kwargs) 
    File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 258, in run_Read 
    read_values(reader) 
    File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 245, in read_values 
    read_result = [GlobalWindows.windowed_value(e) for e in reader] 
    File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/io/fileio.py", line 807, in __iter__ 
    yield self.source.coder.decode(line) 
    File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/coders/coders.py", line 187, in decode 
    return value.decode('utf-8') 
    File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/encodings/utf_8.py", line 16, in decode 
    return codecs.utf_8_decode(input, errors, True) 
UnicodeDecodeError: 'utf8' codec can't decode byte 0x8b in position 1: invalid start byte 

回答

2

UPDATE:TextIO在Python SDK現在支持的壓縮文件中讀取。

今天TextIO在Python SDK中實際上並不支持從壓縮文件讀取。

+0

感謝您的快速響應!我會瀏覽這篇文章。我添加了整個追溯問題。 – agsolid

+0

數據以UTF-8壓縮。我通過將原始文本從字節解碼爲unicode來驗證它。沒有錯誤。如果我錯了,糾正我,但看看梁源代碼,它似乎像TextFileSource甚至不處理壓縮文件。 – agsolid

+1

我剛剛看起來更深 - 我對支持的深度錯了。該類接受compression_type參數以準備實際支持。 –

3

我遇到了類似的問題。我有一個自定義的二進制源,我想解析和抓取數據。問題是file.io API基於CSV或ARVO,無論我嘗試過什麼,它都不會給我線路,而不會嘗試在線路中斷時拆分它們。正如你可以想象的,一個二進制文件不能很好地處理這個問題。

起初,我嘗試了一個自定義源代碼,最終實現了3個類,並且它是複製核心Dataflow/Beam代碼。最後,我編寫了這個令人驚歎的小知識來獲得我需要做的事情(這裏是深層次的源代碼測試)。

import apache_beam as beam 
from apache_beam.io.fileio import coders 

def _TextFileReader__iter(self): 
    # The full data file is had here and can be read like normal 
    # You can even limit the character bit here. (I did 9 to grab the file format) 
    data = self._file.read() 
    # Now you can either yield the whole file as a single data entry 
    # and run a ParDo to split it, or you can iterate in here and 
    # yield each row. I chose the latter, but I'm showing an example 
    # of the former. 
    yield data 

# This monkeypatch good! 
beam.io.fileio.TextFileReader.__iter__ = _TextFileReader__iter 

要調用這個源,並確保它是BINARY,我做了以下內容:

pipeline | 'start_3' >> beam.io.Read(
    beam.io.TextFileSource('gs://MY_BUCKET/sample.bin', 
     coder=coders.BytesCoder() 
    ) 
) 

通知的coders.BytesCoders()?沒有它試圖將字節轉換爲非二進制文件,這不利於我的解析引擎。 ;)

花了我很多時間來解決這個問題。但是,如果使用此方法,則可以使用Dataflow中的file.io類執行幾乎任何操作。 ;)

1

我遇到了同樣的問題。我試圖從GCS讀取二進制GZ文件,解壓縮它們,然後將它們運送到其他地方進行處理。我分兩步解決了這個問題。

首先,確保您使用的是正確的Python庫;我的原始圖書館已過時(我至少使用v0.4):pip install --upgrade google-cloud-dataflow

其次,我構建了我的管道如下:

import apache_beam as beam 
from apache_beam import (coders, io, transforms) 

raw_logs = (p 
      | io.Read("ReadLogsFromGCS", beam.io.TextFileSource(
         "gs://my-bucket/logs-*.gz", 
         coder=coders.BytesCoder())) 
      | transforms.Map(lambda x: x) 
      | io.Write("WriteToLocalhost", io.textio.WriteToText(
         "/tmp/flattened-logs", 
         file_name_suffix=".json"))) 
p.run() 

你應該有一個文件運行後,管道被稱爲/tmp/flattened-logs.json