apache-beam

    0熱度

    1回答

    將固定數量的字符串(用於測試的800,000個1KB)放入PubSub主題並在以下版本中運行以下Apache Beam(2.1.0)作業數據流,正好一次保留語義,如預期。 import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.gcp.pub

    1熱度

    1回答

    我正在遷移將FileBasedSink從2.0.0版擴展到2.2.0的自定義接收器。類已發生變化,增加了兩個類型參數:UserT和DestinationT: @Experimental(value=FILESYSTEM) public abstract class FileBasedSink<UserT,DestinationT,OutputT> extends java.lang.Objec

    0熱度

    1回答

    我試圖構建一個ETL來加載一個Dimension表。我使用Python和DataFlow和BigQuery來分配Apache Bea。 我需要爲pcollection的每個元素分配一個序列號,以便將其加載到BigQuery中,但我找不到任何方法來執行此操作。 我想我需要DataFlow使先前的聚合和連接,以獲得我最後的pcollection添加序列號,但在這一刻我需要停止並行處理,並將我的pcol

    1熱度

    2回答

    我有一個值的字典,我想使用Python SDK將它作爲有效的.CSV文件寫入GCS。我可以將字典寫成換行分隔的文本文件,但我似乎無法找到將字典轉換爲有效的.CSV的示例。任何人都可以建議在數據流管道中生成csv的最佳方法嗎?這回答了這個question地址從CSV文件讀取,但並沒有真正解決寫入CSV文件。我認識到CSV文件只是包含規則的文本文件,但我仍然在努力將數據字典轉換爲可以使用WriteTo

    0熱度

    1回答

    我嘗試使用Apache束執行管線,但試圖把一些輸出標籤時,我得到一個錯誤: import com.google.cloud.Tuple; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import org.apache.beam.sdk.Pipeline; import org.apache.b

    0熱度

    1回答

    請從下面的數據流任務的詳細信息運行: 作業ID:2017-10-17_22_03_20-14123260585966292858 項目名稱:極限測試file12-1508302687176 批次:一批 開始時間:2017年10月18日10:33:21上午 狀態:正在取消... Region:us-central1 作業日誌詳細信息:2017-10-18(11:34:56)工作流失敗。原因:(b27

    0熱度

    1回答

    假設我通過SideOutputs創建了兩個輸出PCollections,並且取決於某些條件,我只想將其中一個寫入BigQuery。這個怎麼做? 基本上我的用例是我試圖使Write_Append和Write_Truncate動態。我從我在BigQuery中維護的配置表中獲取信息(append/truncate)。所以根據我在配置表中的內容,我必須應用截斷或追加。 因此,使用SideOutputs我可

    1熱度

    3回答

    我正在嘗試使用數據流來讀取pubsub消息並將其寫入大查詢。我獲得了Google團隊的Alpha訪問權限,並獲得了提供的示例,但現在我需要將其應用於我的場景。 PubSub的有效載荷: Message { data: {'datetime': '2017-07-13T21:15:02Z', 'mac': 'FC:FC:48:AE:F6:94', 'status': 1} att

    0熱度

    1回答

    我有一個將結果寫入BigQuery表的Apache Beam/Dataflow管道。然後,我想查詢這個表中管道的單獨部分。然而,我似乎無法弄清楚如何正確設置這個管道依賴。我編寫(然後想要查詢)的新表格與一個單獨的表格保持連接,用於某些過濾邏輯,這就是爲什麼我實際上需要編寫表格並運行查詢。邏輯將如下所示: with beam.Pipeline(options=pipeline_options) as

    -1熱度

    1回答

    我正在一個項目中接收大約10個文件,每個文件包含大小爲200GB的文件。我的項目要求是從每個文件中提取數據,並與其他文件進行連接並提取數據。 E.G像我有文件1.txt我有帳戶ID和我有文件2.txt,我有帳戶ID和帳戶名稱。根據第一個文件的帳戶ID我需要從第二個文件提取數據。 這樣我需要對10個文件中的每一個執行操作並創建最終的輸出文件。 我目前正在Java中這是真正花時間的過程。大約需要4到5