假設我通過SideOutputs創建了兩個輸出PCollections,並且取決於某些條件,我只想將其中一個寫入BigQuery。這個怎麼做?將一個特定的PCollection寫入BigQuery
基本上我的用例是我試圖使Write_Append和Write_Truncate動態。我從我在BigQuery中維護的配置表中獲取信息(append/truncate)。所以根據我在配置表中的內容,我必須應用截斷或追加。
因此,使用SideOutputs我可以創建兩個PCollections(分別爲Append和Truncate),其中一個將爲空。具有所有行的那個必須寫入BigQuery。這種方法是否正確?
,我正在使用的代碼:
final TupleTag<TableRow> truncate =
new TupleTag<TableRow>(){};
// Output that contains word lengths.
final TupleTag<TableRow> append =
new TupleTag<TableRow>(){};
PCollectionTuple results = read.apply("convert to table row",ParDo.of(new DoFn<String,TableRow>(){
@ProcessElement
public void processElement(ProcessContext c)
{
String value = c.sideInput(configView).get(0).toString();
LOG.info("config: "+value);
if(value.equals("truncate")){
LOG.info("outputting to truncate");
c.output(new TableRow().set("color", c.element()));
}
else
{
LOG.info("outputting to append");
c.output(append,new TableRow().set("color", c.element()));
}
//c.output(new TableRow().set("color", c.element()));
}
}).withSideInputs(configView).withOutputTags(truncate,
TupleTagList.of(append)));
results.get(truncate).apply("truncate",BigQueryIO.writeTableRows()
.to("projectid:datasetid.tableid")
.withSchema(schema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
results.get(append).apply("append",BigQueryIO.writeTableRows()
.to("projectid:datasetid.tableid")
.withSchema(schema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
我需要執行一出兩個。如果我做這兩個表將會被截斷反正。
P.S.我使用的Java SDK(Apache的梁2.1)
這是一個普遍的問題,「這種方法是否正確?」或者你想要一些代碼解決方案? –
@Marcin Zablocki好吧,我也想要一些代碼解決方案 – rish0097
你說你有兩個PCollections,那麼問題是什麼呢?分裂和寫作的方法似乎沒問題。 –