我在Dataflow中有一項工作,通過使用內置的Dataflow API將數據從Bigtable導入Bigquery。我有兩個問題:Bigtable-BigQuery通過DataFlow導入:關於表分區和時間戳的2個問題
問題1:如果源數據是在一個大表Bigtable中,我怎麼能它分爲BigQuery中的一組子或小表動態基礎上,也就是說,在給定只有在運行時才知道Bigtable行鍵?
在數據流中的Java代碼看起來是這樣的:
p.apply(Read.from(CloudBigtableIO.read(config)))
.apply(ParDo.of(new SomeDoFNonBTSourceData()))
.apply(BigQueryIO.Write
.to(PROJ_ID + ":" + BQ_DataSet + "." + BQ_TableName)
.withSchema(schema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
p.run();
所以,既然BQ_TableName
有在代碼層面上提供的,我能提供給它編程的基礎上所看到的SomeDoFNonBTSourceData
裏面,像當前RowKey的值範圍?如果RowKey是「交流」,那麼TableA的,如果「東風」,那麼表B等
問題2:什麼是Bigtable的時間戳導出到BigQuery的正確的方式,從而在人類可讀的格式,最終重建它在BigQuery中?
的DOFN內的processElement功能如下:
public void processElement(ProcessContext c)
{
String valA = new String(c.element().getColumnLatestCell(COL_FAM, COL_NAME).getValueArray());
Long timeStamp = c.element().getColumnLatestCell(COL_FAM, COL_NAME).getTimestamp();
tr.put("ColA", valA);
tr.put("TimeStamp",timeStamp);
c.output(tr);
}
與管道建設過程中,BQ模式設置爲timestamp列如下:
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("ColA").setType("STRING"));
fields.add(new TableFieldSchema().setName("TimeStamp").setType("TIMESTAMP"));
schema = new TableSchema().setFields(fields);
因此Bigtable的時間戳似乎是Long
類型,並且我已嘗試使用"TIMESTAMP"
和"INTEGER"
類型作爲BQ中的目標TimeStamp列(好像BQ中沒有Long一樣)。最終,我需要使用BQ中的TimeStamp列來處理'order by'子句,並以人類可讀形式(日期和時間)顯示信息。 'order by'部分似乎工作正常,但我沒有設法將最終結果轉換爲任何有意義的結果 - 要麼發生轉換錯誤,要麼仍然不可讀。
對不起是什麼類型的日期是在那裏呢? java.util.Date似乎沒有'.toInstant()',而來自Google API的日期沒有其他一些方法? –
你可以嘗試改變你的語言水平爲「8 - lambda,類型註釋等」。在Intellij中,轉到項目結構,選擇您的模塊並更改語言級別。我不太瞭解如何在Eclipse中執行此操作。 – Ken
啊你是對的 - 我看到它在那裏的文檔。必須是我的Eclipse的東西。將解決它。如果我在問題1中找到了什麼,我會在這裏發帖...... –