2017-04-06 20 views
0

我試圖通過使用cloud-bigtable-client(https://github.com/GoogleCloudPlatform/cloud-bigtable-client)的數據流向Bigtable應用突變(增量)。無法用HBaseMutationCoder編碼元素

這裏是我的工作,做什麼一個高度概括:

PCollection<SomeData> somedata = ...; 
    somedata.apply(ParDo.of(new CreateMutations())) 
     .setCoder(new HBaseMutationCoder()).apply(CloudBigtableIO.writeToTable(config)); 
    // I don't think it is necessary to explicitly set Coder here; I tried both ways. 

CreateMutations是DOFN看起來像:

// c.element() is KV<String, Iterable<SomeData>> 
public void processElement(ProcessContext c) { 
    Increment mutation = new Increment(c.element().getKey().getBytes()); 
    for (SomeData data : c.element().getValue()) { 
    // Obtain cf (String), qual (String), value (long) from data. 
    // None of them is null. 
    mutation.addColumn(cf.getBytes(), qual.getBytes(), value); 
    } 
    c.output(mutation); 
} 

出人意料的是,當因執行本DOFN作業失敗元素不能由HBaseMutationCoder進行編碼。這裏是堆棧跟蹤的一小部分:

(e8a8d266ed05e19f): java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: java.lang.RuntimeException: java.lang.IllegalArgumentException: Unable to encode element 'row=some_string, families={(family=a, columns={some_string/a:2:text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=1, some_string/a:8:text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=9620}), (family=m, columns={some_string/m:2:text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=1, some_string/m:8:text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=9620}}' with coder 'HBaseMutationCoder'. 
    at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:160) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:288) 
    at ...... 

注意,錯誤消息內,它清楚地表明,rowfamilycolumn限定符,並且value s的適當填充。此特定的錯誤消息顯示它包含四個要增加的單元格。 我沒有遇到過使用Delete和Put的問題,但這是我第一次使用Increment的 - 除了行,家族,限定符和值之外,還有什麼需要填充的嗎?

任何幫助將非常感激。

我也嘗試使用Put而不是Increment,它的工作原理(與上面的代碼相同,除了兩行標有(*))。

// c.element() is KV<String, Iterable<SomeData>> 
public void processElement(ProcessContext c) { 
    Put mutation = new Put(c.element().getKey().getBytes()); //(*) 
    for (SomeData data : c.element().getValue()) { 
    // Obtain cf (String), qual (String), value (long) from data. 
    // None of them is null. 
    mutation.addImmutable(cf.getBytes(), qual.getBytes(), Bytes.toBytes(value)); //(*) 
    } 
    c.output(mutation); 
} 

(我發現了一個相關的問題在這裏:How to load data into Google Cloud Bigtable from Google BigQuery 但我遇到的問題似乎並沒有被null值都行引起/列族/預選賽/值的正確填寫。)


更新:這是我得到的完整堆棧跟蹤。

(875583981e325b46): java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: java.lang.RuntimeException: java.lang.IllegalArgumentException: Unable to encode element 'row=some_string, families={(family=a, columns={some_string/a:8:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=940, some_string/a:2:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=2}), (family=m, columns={some_string/m:2:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=2, some_string/m:8:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=940}}' with coder 'HBaseMutationCoder'. 
    at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:160) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:288) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:284) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext$1.outputWindowedValue(DoFnRunnerBase.java:508) 
    at com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsViaIteratorsDoFn.processElement(GroupAlsoByWindowsViaIteratorsDoFn.java:123) 
    at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:49) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:139) 
    at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:188) 
    at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.processElement(ForwardingParDoFn.java:42) 
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn.java:47) 
    at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:55) 
    at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) 
    at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:221) 
    at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.start(ReadOperation.java:182) 
    at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:69) 
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:284) 
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:220) 
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:170) 
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:192) 
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:172) 
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:159) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: com.google.cloud.dataflow.sdk.util.UserCodeException: java.lang.RuntimeException: java.lang.IllegalArgumentException: Unable to encode element 'row=some_string, families={(family=a, columns={some_string/a:8:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=940, some_string/a:2:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=2}), (family=m, columns={some_string/m:2:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=2, some_string/m:8:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=940}}' with coder 'HBaseMutationCoder'. 
    at com.google.cloud.dataflow.sdk.util.UserCodeException.wrap(UserCodeException.java:35) 
    at com.google.cloud.dataflow.sdk.util.UserCodeException.wrapIf(UserCodeException.java:40) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.wrapUserCodeException(DoFnRunnerBase.java:369) 
    at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:51) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:139) 
    at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:188) 
    at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.processElement(ForwardingParDoFn.java:42) 
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn.java:47) 
    at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:55) 
    at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) 
    at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:158) 
    ... 24 more 
Caused by: java.lang.RuntimeException: java.lang.IllegalArgumentException: Unable to encode element 'row=some_string, families={(family=a, columns={some_string/a:8:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=940, some_string/a:2:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=2}), (family=m, columns={some_string/m:2:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=2, some_string/m:8:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=940}}' with coder 'HBaseMutationCoder'. 
    at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:160) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:288) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext.output(DoFnRunnerBase.java:450) 
    at com.moloco.dataflow.bigtable.AptRecovery$UpdateCountPerCell.processElement(AptRecovery.java:78) 
Caused by: java.lang.IllegalArgumentException: Unable to encode element 'row=some_string, families={(family=a, columns={some_string/a:8:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=940, some_string/a:2:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=2}), (family=m, columns={some_string/m:2:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=2, some_string/m:8:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=940}}' with coder 'HBaseMutationCoder'. 
    at com.google.cloud.dataflow.sdk.coders.StandardCoder.getEncodedElementByteSize(StandardCoder.java:170) 
    at com.google.cloud.dataflow.sdk.coders.StandardCoder.registerByteSizeObserver(StandardCoder.java:185) 
    at com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:641) 
    at com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:552) 
    at com.google.cloud.dataflow.sdk.runners.worker.MapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(MapTaskExecutorFactory.java:351) 
    at com.google.cloud.dataflow.sdk.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:125) 
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:61) 
    at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:46) 
    at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:158) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:288) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext.output(DoFnRunnerBase.java:450) 
    at com.moloco.dataflow.bigtable.AptRecovery$UpdateCountPerCell.processElement(AptRecovery.java:78) 
    at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:49) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:139) 
    at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:188) 
    at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.processElement(ForwardingParDoFn.java:42) 
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn.java:47) 
    at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:55) 
    at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) 
    at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:158) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:288) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:284) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext$1.outputWindowedValue(DoFnRunnerBase.java:508) 
    at com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsViaIteratorsDoFn.processElement(GroupAlsoByWindowsViaIteratorsDoFn.java:123) 
    at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:49) 
    at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:139) 
    at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:188) 
    at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.processElement(ForwardingParDoFn.java:42) 
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn.java:47) 
    at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:55) 
    at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) 
    at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:221) 
    at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.start(ReadOperation.java:182) 
    at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:69) 
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeW 
+0

您是否在堆棧跟蹤中看到「由......引起......」?你可以分享嗎?這可能會縮小這裏發生的事情。 –

+0

讓我用一個完整的堆棧跟蹤來更新問題 - 我不確定 –

回答