0

我正在使用Python Beam SDK 0.6.0。我想在Google雲端存儲中將我的輸出寫入JSON。做這個的最好方式是什麼?如何將結果寫入數據流中的gcs中的JSON文件/ Beam

我覺得我可以使用Text IO接收器中的WriteToText,但是我必須分別格式化每一行,對嗎?如何將結果保存到包含對象列表的有效JSON文件中?

回答

1

好的,作爲參考,我通過在beam SDK中使用WriteToText所使用的_TextSink編寫我自己的接收器來解決此問題。

不知道這是否是最好的方式來做到這一點,但迄今爲止效果很好。希望它可以幫助別人。

import os 
import json 

import apache_beam as beam 
from apache_beam import coders 
from apache_beam.io.iobase import Write 
from apache_beam.transforms import PTransform 

class _JsonSink(beam.io.FileSink): 
    """A Dataflow sink for writing JSON files.""" 

    def __init__(self, 
       file_path_prefix, 
       file_name_suffix='', 
       num_shards=0, 
       shard_name_template=None, 
       coder=coders.ToStringCoder(), 
       compression_type=beam.io.CompressionTypes.AUTO): 

     super(_JsonSink, self).__init__(
      file_path_prefix, 
      file_name_suffix=file_name_suffix, 
      num_shards=num_shards, 
      shard_name_template=shard_name_template, 
      coder=coder, 
      mime_type='text/plain', 
      compression_type=compression_type) 
     self.last_rows = dict() 

    def open(self, temp_path): 
     """ Open file and initialize it w opening a list.""" 
     file_handle = super(_JsonSink, self).open(temp_path) 
     file_handle.write('[\n') 
     return file_handle 

    def write_record(self, file_handle, value): 
     """Writes a single encoded record converted to JSON and terminates the 
     line w a comma.""" 
     if self.last_rows.get(file_handle, None) is not None: 
      file_handle.write(self.coder.encode(
       json.dumps(self.last_rows[file_handle]))) 
      file_handle.write(',\n') 

     self.last_rows[file_handle] = value 

    def close(self, file_handle): 
     """Finalize the JSON list and close the file handle returned from 
     ``open()``. Called after all records are written. 
     """ 
     if file_handle is not None: 
      # Write last row without a comma 
      file_handle.write(self.coder.encode(
       json.dumps(self.last_rows[file_handle]))) 

      # Close list and then the file 
      file_handle.write('\n]\n') 
      file_handle.close() 


class WriteToJson(PTransform): 
    """PTransform for writing to JSON files.""" 

    def __init__(self, 
       file_path_prefix, 
       file_name_suffix='', 
       num_shards=0, 
       shard_name_template=None, 
       coder=coders.ToStringCoder(), 
       compression_type=beam.io.CompressionTypes.AUTO): 

     self._sink = _JsonSink(file_path_prefix, file_name_suffix, num_shards, 
           shard_name_template, coder, compression_type) 

    def expand(self, pcoll): 
     return pcoll | Write(self._sink) 

使用水槽類似於你如何使用文本水槽:

pcol | WriteToJson('gs://path/to/file', file_name_suffix='.json') 
0

使每個文件包含了一堆元素的一個列表是困難的,因爲你需要組一堆元素,然後將它們一起寫入一個文件。讓我建議你使用不同的格式。

您可能會考慮JSON Lines格式,其中文件中的每一行表示一個JSON元素。

將數據轉換爲JSON行應該非常簡單。下面的變換應該做的伎倆:

class WriteToJsonLines(beam.PTransform): 
    def __init__(self, file_name): 
     self._file_name = file_name 

    def expand(self, pcoll): 
     return (pcoll 
       | 'format json' >> beam.Map(json.dumps) 
       | 'write to text' >> beam.WriteToText(self._file_name)) 

最後,如果你以後想讀你的JSON行的文件,你可以寫自己的JsonLinesSource或使用beam_utils之一。

+0

讓我知道你的數據是不是這樣組織的,我會試着找出適合你的東西。 – Pablo

+0

謝謝。我真的需要它是JSON。最後寫我自己的水槽,我認爲應該工作。 – while

+0

這很公平! ; ) – Pablo