我想從Google雲端存儲(GCS)讀取約90個gzip JSON日誌文件,每個大約2GB大(10 GB未壓縮),解析並寫入通過Google Cloud Dataflow(GCDF)轉換爲BigQuery(BQ)日期分區表。從Google雲端存儲中通過Dataflow將大型gzip JSON文件讀取到BigQuery中
每個文件包含7天的數據,整個日期範圍約爲2年(730天和計數)。我現在的管道是這樣的:
p.apply("Read logfile", TextIO.Read.from(bucket))
.apply("Repartition", Repartition.of())
.apply("Parse JSON", ParDo.of(new JacksonDeserializer()))
.apply("Extract and attach timestamp", ParDo.of(new ExtractTimestamps()))
.apply("Format output to TableRow", ParDo.of(new TableRowConverter()))
.apply("Window into partitions", Window.into(new TablePartWindowFun()))
.apply("Write to BigQuery", BigQueryIO.Write
.to(new DayPartitionFunc("someproject:somedataset", tableName))
.withSchema(TableRowConverter.getSchema())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
重新分區是我已經建立的同時,努力使管道reshuffle after decompressing,我試圖與沒有它運行的管道。通過Jackon ObjectMapper和相應的類來解析JSON,建議使用here。 TablePartWindowFun取自here,它用於爲PCollection中的每個條目分配一個分區。
該管道適用於較小的文件,但不是太多,但是會打破我的真實數據集。我選擇了足夠大的機器類型,並嘗試設置最大數量的工人,以及使用自動調節多達100臺n1-highmem-16機器。我已經嘗試了流式和批處理模式,並且每個工作者從250到1200 GB的disSizeGb值。
可能的解決方案,我可以在那一刻想到的是:
- 解壓所有的GCS文件等,使工人的工作動態分裂,因爲這是不可能利用GCS的gzip transcoding
- 在一個循環中構建「許多」並行流水線,每條流水線只處理90個文件的一部分。
選項2在我看來像編程「圍繞」框架,有沒有另一種解決方案?
附錄:
隨着磁盤分割在批處理模式下與100名工人最多(類型爲N1-HIGHMEM-4),約一小時的管道運行與12名工人和閱讀 gzip的JSON文件後,完成閱讀以及再分配的第一階段。然後它擴展到100名工人並處理重新分區的PCollection。它完成後的圖如下所示:
有趣的是,達到這個階段的時候,第一次它的處理高達150萬件/ s,則進度下降到0的OutputCollection的大小圖片中的GroupByKey步驟首先上升,然後從大約3億下降到0(總共約有18億個元素)。就像它丟棄了一些東西。另外,最後的ExpandIterable
和ParDo(Streaming Write)
運行時間爲0.圖片在「向後」運行之前略微顯示。 在工作人員的日誌中,我看到一些來自com.google.api.client.http.HttpTransport
記錄器的exception thrown while executing request
消息,但我在Stackdriver中找不到更多信息。
沒有重新分區讀使用帶有內存不足的錯誤n1-highmem-2
情況下,在完全相同的步驟管道失敗(GroupByKey
後一切)之後 - 使用更大的實例類型導致的異常像
java.util.concurrent.ExecutionException: java.io.IOException:
CANCELLED: Received RST_STREAM with error code 8 dataflow-...-harness-5l3s
talking to frontendpipeline-..-harness-pc98:12346
_「但爲我的真實數據集打破」_ - 究竟發生了什麼?你得到什麼錯誤? –
我已經添加了一個錯誤的例子。 – Tobi
我已經添加了另一個沒有Repartition步驟的例子 – Tobi