2017-02-15 39 views
1

我想從Google雲端存儲(GCS)讀取約90個gzip JSON日誌文件,每個大約2GB大(10 GB未壓縮),解析並寫入通過Google Cloud Dataflow(GCDF)轉換爲BigQuery(BQ)日期分區表。從Google雲端存儲中通過Dataflow將大型gzip JSON文件讀取到BigQuery中

每個文件包含7天的數據,整個日期範圍約爲2年(730天和計數)。我現在的管道是這樣的:

p.apply("Read logfile", TextIO.Read.from(bucket)) 
.apply("Repartition", Repartition.of()) 
.apply("Parse JSON", ParDo.of(new JacksonDeserializer())) 
.apply("Extract and attach timestamp", ParDo.of(new ExtractTimestamps())) 
.apply("Format output to TableRow", ParDo.of(new TableRowConverter())) 
.apply("Window into partitions", Window.into(new TablePartWindowFun())) 
.apply("Write to BigQuery", BigQueryIO.Write 
     .to(new DayPartitionFunc("someproject:somedataset", tableName)) 
     .withSchema(TableRowConverter.getSchema()) 
     .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) 
     .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); 

重新分區是我已經建立的同時,努力使管道reshuffle after decompressing,我試圖與沒有它運行的管道。通過Jackon ObjectMapper和相應的類來解析JSON,建議使用here。 TablePartWindowFun取自here,它用於爲PCollection中的每個條目分配一個分區。

該管道適用於較小的文件,但不是太多,但是會打破我的真實數據集。我選擇了足夠大的機器類型,並嘗試設置最大數量的工人,以及使用自動調節多達100臺n1-highmem-16機器。我已經嘗試了流式和批處理模式,並且每個工作者從250到1200 GB的disSizeGb值。

可能的解決方案,我可以在那一刻想到的是:

  1. 解壓所有的GCS文件等,使工人的工作動態分裂,因爲這是不可能利用GCS的gzip transcoding
  2. 在一個循環中構建「許多」並行流水線,每條流水線只處理90個文件的一部分。

選項2在我看來像編程「圍繞」框架,有沒有另一種解決方案?

附錄:

隨着磁盤分割在批處理模式下與100名工人最多(類型爲N1-HIGHMEM-4),約一小時的管道運行與12名工人和閱讀 gzip的JSON文件後,完成閱讀以及再分配的第一階段。然後它擴展到100名工人並處理重新分區的PCollection。它完成後的圖如下所示:

Write to BQ Service Graph

有趣的是,達到這個階段的時候,第一次它的處理高達150萬件/ s,則進度下降到0的OutputCollection的大小圖片中的GroupByKey步驟首先上升,然後從大約3億下降到0(總共約有18億個元素)。就像它丟棄了一些東西。另外,最後的ExpandIterableParDo(Streaming Write)運行時間爲0.圖片在「向後」運行之前略微顯示。 在工作人員的日誌中,我看到一些來自com.google.api.client.http.HttpTransport記錄器的exception thrown while executing request消息,但我在Stackdriver中找不到更多信息。

沒有重新分區讀使用帶有內存不足的錯誤n1-highmem-2情況下,在完全相同的步驟管道失敗(GroupByKey後一切)之後 - 使用更大的實例類型導致的異常像

java.util.concurrent.ExecutionException: java.io.IOException: 
CANCELLED: Received RST_STREAM with error code 8 dataflow-...-harness-5l3s 
talking to frontendpipeline-..-harness-pc98:12346 
+0

_「但爲我的真實數據集打破」_ - 究竟發生了什麼?你得到什麼錯誤? –

+0

我已經添加了一個錯誤的例子。 – Tobi

+0

我已經添加了另一個沒有Repartition步驟的例子 – Tobi

回答

1

感謝Google Cloud Dataflow團隊的Dan以及他提供的示例here,我能夠解決此問題。唯一的變化我做:

  • 循環遍歷175 =(25周),大段大段的日子裏,運行一個接一個的管道,不會打擊系統。在循環中確保重新處理前一次迭代的最後一個文件,並以與底層數據(175天)相同的速度向前移動startDate。由於使用了WriteDisposition.WRITE_TRUNCATE,因此使用正確的完整數據覆蓋區塊末尾的不完整日期。

  • 使用磁盤分割/改組變換如上所述,讀取gzip壓縮文件後,以加快這一進程,並允許平滑的自動縮放

  • 使用,而不是即時類型的日期時間,因爲我的數據是不是在UTC

UPDATE(Apache的光束2.0):

與Apache梁2.0解決方案的發佈變得更加容易。 Sharding BigQuery輸出表現在支持out of the box

0

可能值得嘗試通過在運行管道時設置具有較高值的​​--numWorkers來爲您的管道分配更多資源。這是「在您的管道故障排除」在線document,在「常見錯誤和行動方案」子章節中討論的可能解決方案之一。

+0

我已經試過這個,即使使用最大的機器類型時也是如此。 – Tobi

相關問題