2016-12-09 65 views
0

我們正在嘗試運行日常Dataflow管道,該管道讀取Bigtable並將數據轉儲到GCS中(使用HBase的Scan和BaseResultCoder作爲編碼器),如下所示(只是爲了強調這個想法) :從Bigtable到GCS(反之亦然)通過Dataflow

Pipeline pipeline = Pipeline.create(options); 
    Scan scan = new Scan(); 
    scan.setCacheBlocks(false).setMaxVersions(1); 
    scan.addFamily(Bytes.toBytes("f")); 
    CloudBigtableScanConfiguration btConfig = BCloudBigtableScanConfiguration.Builder().withProjectId("aaa").withInstanceId("bbb").withTableId("ccc").withScan(scan).build(); 
    pipeline.apply(Read.from(CloudBigtableIO.read(btConfig))).apply(TextIO.Write.to("gs://bucket/dir/file").withCoder(HBaseResultCoder.getInstance())); 
    pipeline.run(); 

這似乎像預期的那樣完美運行。

現在,我們希望能夠在GCS中使用轉儲的文件進行恢復作業(如果需要)。也就是說,我們希望有一個數據流管道,它從GCS讀取轉儲的數據(即PCollection)並創建突變(基本上是「Put」對象)。出於某種原因,下面的代碼失敗了一堆NullPointerExceptions。我們不確定爲什麼會出現這種情況 - if語句會在下面檢查空字符串或長度爲0的字符串,以查看是否會產生影響,但事實並非如此。

// Part of DoFn<Result,Mutation> 
@Override 
public void processElement(ProcessContext c) { 
    Result result = c.element(); 
    byte[] row = result.getRow(); 
    if (row == null || row.length == 0) { // NullPointerException at this line 
    return; 
    } 
    Put mutation = new Put(result.getRow()); 
    // go through the column/value entries of this row, and create a corresponding put mutation. 
    for (Entry<byte[], byte[]> entry : result.getFamilyMap(Bytes.toBytes(cf)).entrySet()) { 
    byte[] qualifier = entry.getKey(); 
    if (qualifier == null || qualifier.length == 0) { 
     continue; 
    } 
    byte[] val = entry.getValue(); 
    if (val == null || val.length == 0) { 
     continue; 
    } 
    mutation.addImmutable(cf_bytes, qualifier, entry.getValue()); 
    } 
    c.output(mutation); 
} 

我們得到的錯誤是以下(線83上方標註):

(2a6ad6372944050d): java.lang.NullPointerException at some.package.RecoveryFromGcs$CreateMutationFromResult.processElement(RecoveryFromGcs.java:83) 

我有兩個問題:1。 有人已經經歷了這樣的事情,當他們嘗試帕爾多在PCollection獲得要寫入bigtable的PCollection? 2.這是一個合理的方法?最終目標是能夠通過備份的方式定期爲我們的bigtable(特定專欄系列)留下每日快照,以防發生不良情況。我們希望能夠通過數據流讀取備份數據,並在需要時將其寫入bigtable。

任何建議和幫助將非常感激!

--------編輯

這裏是掃描和Bigtable的數據轉儲到GCS代碼:(如果他們是不相關的一些細節被隱藏)

public static void execute(Options options) { 
    Pipeline pipeline = Pipeline.create(options); 
    final String cf = "f"; // some specific column family. 
    Scan scan = new Scan(); 
    scan.setCacheBlocks(false).setMaxVersions(1); // Disable caching and read only the latest cell. 
    scan.addFamily(Bytes.toBytes(cf)); 

    CloudBigtableScanConfiguration btConfig = 
     BigtableUtils.getCloudBigtableScanConfigurationBuilder(options.getProject(), "some-bigtable-name").withScan(scan).build(); 

    PCollection<Result> result = pipeline.apply(Read.from(CloudBigtableIO.read(btConfig))); 

    PCollection<Mutation> mutation = 
     result.apply(ParDo.of(new CreateMutationFromResult(cf))).setCoder(new HBaseMutationCoder()); 

    mutation.apply(TextIO.Write.to("gs://path-to-files").withCoder(new HBaseMutationCoder())); 

    pipeline.run(); 
} 

}

讀取上述代碼的輸出處的作業具有以下代碼: (這是一個從GCS讀取時拋出異常)

public static void execute(Options options) { 
    Pipeline pipeline = Pipeline.create(options); 
    PCollection<Mutation> mutations = pipeline.apply(TextIO.Read 
     .from("gs://path-to-files").withCoder(new HBaseMutationCoder())); 

    CloudBigtableScanConfiguration config = 
     BigtableUtils.getCloudBigtableScanConfigurationBuilder(options.getProject(), btTarget).build(); 
    if (config != null) { 
    CloudBigtableIO.initializeForWrite(pipeline); 
    mutations.apply(CloudBigtableIO.writeToTable(config)); 
    } 
    pipeline.run(); 
} 

}

我得到(https://jpst.it/Qr6M)錯誤是有點混亂的突變都將物體,但錯誤是關於「刪除」的對象。

回答

0

最好在cloud bigtable client github issues page上討論這個問題。我們目前正在研究像這樣的導入/導出功能,所以我們會很快做出迴應。即使您不添加github問題,我們也會自行研究這種方法。 github問題將使我們能夠更好地溝通。

FWIW,我不明白你怎麼能得到你突出顯示的線上的NPE。你確定你有正確的路線嗎?

EDIT(12/12):

以下processElement()方法應該工作的結果轉換爲認沽:

@Override 
public void processElement(DoFn<Result, Mutation>.ProcessContext c) throws Exception { 
    Result result = c.element(); 
    byte[] row = result.getRow(); 
    if (row != null && row.length > 0) { 
    Put put = new Put(row); 
    for (Cell cell : result.rawCells()) { 
     put.add(cell); 
    } 
    c.output(put); 
    } 
} 
+0

這是因爲DOFN我以前完全一樣,不同的是我寫的首先將PCollection 添加到GCS(通過「轉儲」作業),然後讀取它(通過「恢復」作業)並應用此DoFn創建突變。在寫/讀結果時,我使用'HBaseResultCoder.getInstance()'作爲編碼器。 –

+0

現在,我通過從BigTable掃描後應用DoFn來切換代碼,從而獲得PCollection ,並使用新的HBaseMutationCoder()將其寫入GCS。 再次,當它被另一個工作(寫入bigtable)讀取時,我收到錯誤(不同的錯誤,但它似乎與編碼器有關)。 –

+0

以下是讀取數據的作業的作業ID:2016-12-15_18_18_02-8862892074153368958 這是我得到的錯誤: –