2017-06-19 21 views
0

我需要根據時間戳將SCollection元素保存到不同的每小時BigQuery表中。我嘗試以下方法 -如何將SCollection元素保存到不同的BigQuery表中?

  1. 通過(TableName, Iterable[TableRow])族元素,然後使用BigQueryClient實例保存每個Iterable[TableRow]到它們各自的表中。這不起作用,因爲BigQueryClient不可序列化。

  2. 創建一個SCollection[TableName, PCollection[TableRow]],然後使用BigQueryIO.Write將每個PCollection[TableRow]保存到其各自的BigQuery表。要創建PCollection[TableRow]對象,我使用.map(s => (s._1, sc.pipeline.apply(Create.of(s._2.toList.asJava)))),其中sc是ScioContext的一個實例。這不起作用,因爲ScioContext不可序列化。

有沒有辦法將插入元素流到不同的BigQuery表中?

回答

0

在Beam中,BigQuery IO轉換提供several methods用於根據當前窗口選擇表格。我相信Dataflow 1.9對窗口相關目標具有相似的方法。數據流2.0還包括DynamicDestinations。請參閱Javadoc以獲取基於每個元素內的用戶ID選擇表的示例。

我對Scio並不熟悉,但它看起來像從BigQuery中暴露底層方法,IO將是最簡單的方法來完成這一點。

1

要使用Scio執行此操作,您可以創建自定義輸出轉換,該轉換將寫入由DynamicDestinations對象(Apache Beam)指定的目標。該表由輸入元素的某些特徵動態確定,在此情況下爲元素的事件時間(小時)。

定製輸出變換的BigQuery:

import com.google.api.services.bigquery.model.TableSchema 
import com.spotify.scio.bigquery.BigQueryUtil 
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition 
import org.apache.beam.sdk.io.gcp.bigquery._ 
import org.apache.beam.sdk.transforms.PTransform 
import org.apache.beam.sdk.values.{PCollection, PDone, ValueInSingleWindow} 


def saveAsBigQuery(tblPrefix: String, 
        tblSchema: String, 
        writeDisposition: WriteDisposition): 
    PTransform[PCollection[TableRow], PDone] = { 

    BigQueryIO.writeTableRows() 
    .to(new DynamicDestinations[TableRow, String] { 

     override def getTable(tblSuffix: String): TableDestination = { 
     // TODO: construct table name 
     val tblName = "%s_%s".format(tblPrefix, tblSuffix) 
     new TableDestination(tblName, null) 
     } 

     override def getDestination(tblRow: ValueInSingleWindow[TableRow]): String = { 
     // TODO: determine hourly table suffix based on event time in tblRow object 
     } 

     override def getSchema(destination: String): TableSchema = { 
     BigQueryUtil.parseSchema(tblSchema) 
     } 
    }) 
    .withWriteDisposition(writeDisposition) 
    .asInstanceOf[PTransform[PCollection[TableRow], PDone]] 
} 

應用自定義輸出變換使用上面的函數:

import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write 


val tblPrefix = "table_prefix" 
val tblSchema = "table_schema" // TODO: this needs to be in valid BigQuery schema format 
val writeDisposition = Write.WriteDisposition.WRITE_APPEND 

val bqTransform = saveAsBigQuery(
    tblPrefix, 
    tblSchema, 
    writeDisposition) 

// assuming tblRows is an SCollection[TableRow] 
tblRows.saveAsCustomOutput("saveAsBigQuery", bqTransform) 
相關問題