我試圖通過使用cloud-bigtable-client(https://github.com/GoogleCloudPlatform/cloud-bigtable-client)的數據流向Bigtable應用突變(增量)。無法用HBaseMutationCoder編碼元素
這裏是我的工作,做什麼一個高度概括:
PCollection<SomeData> somedata = ...;
somedata.apply(ParDo.of(new CreateMutations()))
.setCoder(new HBaseMutationCoder()).apply(CloudBigtableIO.writeToTable(config));
// I don't think it is necessary to explicitly set Coder here; I tried both ways.
CreateMutations
是DOFN看起來像:
// c.element() is KV<String, Iterable<SomeData>>
public void processElement(ProcessContext c) {
Increment mutation = new Increment(c.element().getKey().getBytes());
for (SomeData data : c.element().getValue()) {
// Obtain cf (String), qual (String), value (long) from data.
// None of them is null.
mutation.addColumn(cf.getBytes(), qual.getBytes(), value);
}
c.output(mutation);
}
出人意料的是,當因執行本DOFN作業失敗元素不能由HBaseMutationCoder進行編碼。這裏是堆棧跟蹤的一小部分:
(e8a8d266ed05e19f): java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: java.lang.RuntimeException: java.lang.IllegalArgumentException: Unable to encode element 'row=some_string, families={(family=a, columns={some_string/a:2:text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=1, some_string/a:8:text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=9620}), (family=m, columns={some_string/m:2:text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=1, some_string/m:8:text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=9620}}' with coder 'HBaseMutationCoder'.
at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:160)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:288)
at ......
注意,錯誤消息內,它清楚地表明,row
,family
,column
限定符,並且value
s的適當填充。此特定的錯誤消息顯示它包含四個要增加的單元格。 我沒有遇到過使用Delete和Put的問題,但這是我第一次使用Increment的 - 除了行,家族,限定符和值之外,還有什麼需要填充的嗎?
任何幫助將非常感激。
我也嘗試使用Put
而不是Increment
,它的工作原理(與上面的代碼相同,除了兩行標有(*))。
// c.element() is KV<String, Iterable<SomeData>>
public void processElement(ProcessContext c) {
Put mutation = new Put(c.element().getKey().getBytes()); //(*)
for (SomeData data : c.element().getValue()) {
// Obtain cf (String), qual (String), value (long) from data.
// None of them is null.
mutation.addImmutable(cf.getBytes(), qual.getBytes(), Bytes.toBytes(value)); //(*)
}
c.output(mutation);
}
(我發現了一個相關的問題在這裏:How to load data into Google Cloud Bigtable from Google BigQuery 但我遇到的問題似乎並沒有被null
值都行引起/列族/預選賽/值的正確填寫。)
更新:這是我得到的完整堆棧跟蹤。
(875583981e325b46): java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: java.lang.RuntimeException: java.lang.IllegalArgumentException: Unable to encode element 'row=some_string, families={(family=a, columns={some_string/a:8:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=940, some_string/a:2:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=2}), (family=m, columns={some_string/m:2:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=2, some_string/m:8:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=940}}' with coder 'HBaseMutationCoder'.
at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:160)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:288)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:284)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext$1.outputWindowedValue(DoFnRunnerBase.java:508)
at com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsViaIteratorsDoFn.processElement(GroupAlsoByWindowsViaIteratorsDoFn.java:123)
at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:49)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:139)
at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:188)
at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.processElement(ForwardingParDoFn.java:42)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn.java:47)
at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:55)
at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:221)
at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.start(ReadOperation.java:182)
at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:69)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:284)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:220)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:170)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:192)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:172)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:159)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.google.cloud.dataflow.sdk.util.UserCodeException: java.lang.RuntimeException: java.lang.IllegalArgumentException: Unable to encode element 'row=some_string, families={(family=a, columns={some_string/a:8:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=940, some_string/a:2:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=2}), (family=m, columns={some_string/m:2:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=2, some_string/m:8:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=940}}' with coder 'HBaseMutationCoder'.
at com.google.cloud.dataflow.sdk.util.UserCodeException.wrap(UserCodeException.java:35)
at com.google.cloud.dataflow.sdk.util.UserCodeException.wrapIf(UserCodeException.java:40)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.wrapUserCodeException(DoFnRunnerBase.java:369)
at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:51)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:139)
at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:188)
at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.processElement(ForwardingParDoFn.java:42)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn.java:47)
at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:55)
at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:158)
... 24 more
Caused by: java.lang.RuntimeException: java.lang.IllegalArgumentException: Unable to encode element 'row=some_string, families={(family=a, columns={some_string/a:8:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=940, some_string/a:2:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=2}), (family=m, columns={some_string/m:2:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=2, some_string/m:8:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=940}}' with coder 'HBaseMutationCoder'.
at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:160)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:288)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext.output(DoFnRunnerBase.java:450)
at com.moloco.dataflow.bigtable.AptRecovery$UpdateCountPerCell.processElement(AptRecovery.java:78)
Caused by: java.lang.IllegalArgumentException: Unable to encode element 'row=some_string, families={(family=a, columns={some_string/a:8:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=940, some_string/a:2:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=2}), (family=m, columns={some_string/m:2:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=2, some_string/m:8:some_text/LATEST_TIMESTAMP/Put/vlen=8/seqid=0+=940}}' with coder 'HBaseMutationCoder'.
at com.google.cloud.dataflow.sdk.coders.StandardCoder.getEncodedElementByteSize(StandardCoder.java:170)
at com.google.cloud.dataflow.sdk.coders.StandardCoder.registerByteSizeObserver(StandardCoder.java:185)
at com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:641)
at com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:552)
at com.google.cloud.dataflow.sdk.runners.worker.MapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(MapTaskExecutorFactory.java:351)
at com.google.cloud.dataflow.sdk.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:125)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:61)
at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:46)
at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:158)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:288)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext.output(DoFnRunnerBase.java:450)
at com.moloco.dataflow.bigtable.AptRecovery$UpdateCountPerCell.processElement(AptRecovery.java:78)
at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:49)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:139)
at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:188)
at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.processElement(ForwardingParDoFn.java:42)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn.java:47)
at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:55)
at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:158)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:288)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.outputWindowedValue(DoFnRunnerBase.java:284)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext$1.outputWindowedValue(DoFnRunnerBase.java:508)
at com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsViaIteratorsDoFn.processElement(GroupAlsoByWindowsViaIteratorsDoFn.java:123)
at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:49)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:139)
at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:188)
at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.processElement(ForwardingParDoFn.java:42)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn.java:47)
at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:55)
at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:221)
at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.start(ReadOperation.java:182)
at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:69)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeW
您是否在堆棧跟蹤中看到「由......引起......」?你可以分享嗎?這可能會縮小這裏發生的事情。 –
讓我用一個完整的堆棧跟蹤來更新問題 - 我不確定 –