我們正在嘗試運行日常Dataflow管道,該管道讀取Bigtable並將數據轉儲到GCS中(使用HBase的Scan和BaseResultCoder作爲編碼器),如下所示(只是爲了強調這個想法) :從Bigtable到GCS(反之亦然)通過Dataflow
Pipeline pipeline = Pipeline.create(options);
Scan scan = new Scan();
scan.setCacheBlocks(false).setMaxVersions(1);
scan.addFamily(Bytes.toBytes("f"));
CloudBigtableScanConfiguration btConfig = BCloudBigtableScanConfiguration.Builder().withProjectId("aaa").withInstanceId("bbb").withTableId("ccc").withScan(scan).build();
pipeline.apply(Read.from(CloudBigtableIO.read(btConfig))).apply(TextIO.Write.to("gs://bucket/dir/file").withCoder(HBaseResultCoder.getInstance()));
pipeline.run();
這似乎像預期的那樣完美運行。
現在,我們希望能夠在GCS中使用轉儲的文件進行恢復作業(如果需要)。也就是說,我們希望有一個數據流管道,它從GCS讀取轉儲的數據(即PCollection)並創建突變(基本上是「Put」對象)。出於某種原因,下面的代碼失敗了一堆NullPointerExceptions。我們不確定爲什麼會出現這種情況 - if語句會在下面檢查空字符串或長度爲0的字符串,以查看是否會產生影響,但事實並非如此。
// Part of DoFn<Result,Mutation>
@Override
public void processElement(ProcessContext c) {
Result result = c.element();
byte[] row = result.getRow();
if (row == null || row.length == 0) { // NullPointerException at this line
return;
}
Put mutation = new Put(result.getRow());
// go through the column/value entries of this row, and create a corresponding put mutation.
for (Entry<byte[], byte[]> entry : result.getFamilyMap(Bytes.toBytes(cf)).entrySet()) {
byte[] qualifier = entry.getKey();
if (qualifier == null || qualifier.length == 0) {
continue;
}
byte[] val = entry.getValue();
if (val == null || val.length == 0) {
continue;
}
mutation.addImmutable(cf_bytes, qualifier, entry.getValue());
}
c.output(mutation);
}
我們得到的錯誤是以下(線83上方標註):
(2a6ad6372944050d): java.lang.NullPointerException at some.package.RecoveryFromGcs$CreateMutationFromResult.processElement(RecoveryFromGcs.java:83)
我有兩個問題:1。 有人已經經歷了這樣的事情,當他們嘗試帕爾多在PCollection獲得要寫入bigtable的PCollection? 2.這是一個合理的方法?最終目標是能夠通過備份的方式定期爲我們的bigtable(特定專欄系列)留下每日快照,以防發生不良情況。我們希望能夠通過數據流讀取備份數據,並在需要時將其寫入bigtable。
任何建議和幫助將非常感激!
--------編輯
這裏是掃描和Bigtable的數據轉儲到GCS代碼:(如果他們是不相關的一些細節被隱藏)
public static void execute(Options options) {
Pipeline pipeline = Pipeline.create(options);
final String cf = "f"; // some specific column family.
Scan scan = new Scan();
scan.setCacheBlocks(false).setMaxVersions(1); // Disable caching and read only the latest cell.
scan.addFamily(Bytes.toBytes(cf));
CloudBigtableScanConfiguration btConfig =
BigtableUtils.getCloudBigtableScanConfigurationBuilder(options.getProject(), "some-bigtable-name").withScan(scan).build();
PCollection<Result> result = pipeline.apply(Read.from(CloudBigtableIO.read(btConfig)));
PCollection<Mutation> mutation =
result.apply(ParDo.of(new CreateMutationFromResult(cf))).setCoder(new HBaseMutationCoder());
mutation.apply(TextIO.Write.to("gs://path-to-files").withCoder(new HBaseMutationCoder()));
pipeline.run();
}
}
讀取上述代碼的輸出處的作業具有以下代碼: (這是一個從GCS讀取時拋出異常)
public static void execute(Options options) {
Pipeline pipeline = Pipeline.create(options);
PCollection<Mutation> mutations = pipeline.apply(TextIO.Read
.from("gs://path-to-files").withCoder(new HBaseMutationCoder()));
CloudBigtableScanConfiguration config =
BigtableUtils.getCloudBigtableScanConfigurationBuilder(options.getProject(), btTarget).build();
if (config != null) {
CloudBigtableIO.initializeForWrite(pipeline);
mutations.apply(CloudBigtableIO.writeToTable(config));
}
pipeline.run();
}
}
我得到(https://jpst.it/Qr6M)錯誤是有點混亂的突變都將物體,但錯誤是關於「刪除」的對象。
這是因爲DOFN我以前完全一樣,不同的是我寫的首先將PCollection添加到GCS(通過「轉儲」作業),然後讀取它(通過「恢復」作業)並應用此DoFn創建突變。在寫/讀結果時,我使用'HBaseResultCoder.getInstance()'作爲編碼器。 –
現在,我通過從BigTable掃描後應用DoFn來切換代碼,從而獲得PCollection,並使用新的HBaseMutationCoder()將其寫入GCS。 再次,當它被另一個工作(寫入bigtable)讀取時,我收到錯誤(不同的錯誤,但它似乎與編碼器有關)。 –
以下是讀取數據的作業的作業ID:2016-12-15_18_18_02-8862892074153368958 這是我得到的錯誤: –