2017-06-08 38 views
0

我正在尋找一種方式來掃描巨大的Google BigTable,並根據事件動態組成過濾器,並對大量行進行批量更新/刪除。基於動態過濾器的bigtable批量更新

目前,我正在嘗試將BigTable與基於Java的Dataflow(用於密集型無服務器計算能力)相結合。我達到了可以根據事件使用動態過濾器組合「掃描」對象的程度,但我仍然無法找到將CloudBigtableIO.read()的結果流式傳輸到後續數據流管道的方法。

欣賞任何建議。

回答

1

AbstractCloudBigtableTableDoFn擴展您的DoFn。這將使您可以訪問getConnection()方法。

try(Connection c = getConnection(); 
    Table t = c.getTable(YOUR_TABLE_NAME); 
    ResultScanner resultScanner = t.getScanner(YOUR_SCAN)) { 
    for(Result r : resultScanner) { 
    Mutation m = ... // construct a Put or Delete 
    context.output(m) 
    } 
} 

我假設你的管道始於CloudBigtableIO.read(),有AbstractCloudBigtableTableDoFn下,再有一個CloudBigtableIO.write():你會做這樣的事情。