2015-09-09 36 views
1

執行DataFlow管道,我們每隔一段時間就會看到這些異常。 我們能爲他們做些什麼嗎? 我們有一個非常簡單的流程,可以從GCS中的文件中讀取數據,並在輸入文件中創建每行記錄 - 輸入文件中大約有一百萬行。Google雲數據流管道中的異常雲到Bigtable

另外管道內的數據會發生什麼變化?它是否被重新處理?還是在傳輸到BigTable時丟失了?

(609803d25ddab111): io.grpc.StatusRuntimeException: UNKNOWN 
at io.grpc.Status.asRuntimeException(Status.java:428) 
at io.grpc.stub.Calls$StreamObserverToCallListenerAdapter.onClose(Calls.java:284) 
at io.grpc.ClientInterceptors$CheckedForwardingCall.start(ClientInterceptors.java:202) 
at com.google.cloud.bigtable.grpc.io.RetryingCall.retryCall(RetryingCall.java:123) 
at com.google.cloud.bigtable.grpc.io.RetryingCall.runCall(RetryingCall.java:110) 
at com.google.cloud.bigtable.grpc.io.RetryingCall.halfClose(RetryingCall.java:100) 
at io.grpc.stub.Calls.asyncServerStreamingCall(Calls.java:178) 
at io.grpc.stub.Calls.asyncServerStreamingCall(Calls.java:166) 
at io.grpc.stub.Calls.asyncUnaryCall(Calls.java:143) 
at com.google.cloud.bigtable.grpc.BigtableDataGrpcClient.listenableAsyncCall(BigtableDataGrpcClient.java:244) 
at com.google.cloud.bigtable.grpc.BigtableDataGrpcClient.mutateRowAsync(BigtableDataGrpcClient.java:256) 
at com.google.cloud.bigtable.hbase.BatchExecutor.issuePutRequest(BatchExecutor.java:262) 
at com.google.cloud.bigtable.hbase.BatchExecutor.issueRequest(BatchExecutor.java:300) 
at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.issueRequest(BigtableBufferedMutator.java:365) 
at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.doMutation(BigtableBufferedMutator.java:360) 
at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.mutate(BigtableBufferedMutator.java:335) 
at com.company.HBaseBigtableWriter.processElement(HBaseBigtableWriter.java:70) 
at com.google.cloud.dataflow.sdk.util.DoFnRunner.invokeProcessElement(DoFnRunner.java:189) 
at com.google.cloud.dataflow.sdk.util.DoFnRunner.processElement(DoFnRunner.java:171) 
at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase.processElement(ParDoFnBase.java:193) 
at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:52) 
at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) 
at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:171) 
at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.start(ReadOperation.java:117) 
at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:66) 
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:234) 
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:171) 
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:137) 
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:147) 
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:132) 
at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 
Caused by: java.net.SocketTimeoutException: connect timed out at java.net.PlainSocketImpl.socketConnect(Native Method) 
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:345) 
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) 
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) 
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) 
at java.net.Socket.connect(Socket.java:589) 
at sun.net.NetworkClient.doConnect(NetworkClient.java:175) 
at sun.net.www.http.HttpClient.openServer(HttpClient.java:432) 
at sun.net.www.http.HttpClient.openServer(HttpClient.java:527) 
at sun.net.www.http.HttpClient.<init>(HttpClient.java:211) 
at sun.net.www.http.HttpClient.New(HttpClient.java:308) 
at sun.net.www.http.HttpClient.New(HttpClient.java:326) 
at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1168) 
at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1104) 
at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:998) 
at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:932) 
at com.google.bigtable.repackaged.com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:93) 
at com.google.bigtable.repackaged.com.google.api.client.http.HttpRequest.execute(HttpRequest.java:965) 
at com.google.auth.oauth2.ComputeEngineCredentials.refreshAccessToken(ComputeEngineCredentials.java:61) 
at com.google.cloud.bigtable.grpc.io.RefreshingOAuth2CredentialsInterceptor.doRefresh(RefreshingOAuth2CredentialsInterceptor.java:232) 
at com.google.cloud.bigtable.grpc.io.RefreshingOAuth2CredentialsInterceptor.syncRefresh(RefreshingOAuth2CredentialsInterceptor.java:166) 
at com.google.cloud.bigtable.grpc.BigtableSession$7.call(BigtableSession.java:302) 
at com.google.cloud.bigtable.grpc.BigtableSession$7.call(BigtableSession.java:299) ... 4 more 

我們能做些什麼來強化我們的代碼嗎?

而數據流本身是很簡單的

Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); 
options.setMaxNumWorkers(20); 

Pipeline p = Pipeline.create(options); 

CloudBigtableIO.initializeForWrite(p) 
      .apply(TextIO.Read.from(options.getInputFile())) 
      .apply(ParDo.of(new HBaseBigtableWriter(options))); 
p.run(); 

ParDo樣子:

public class HBaseBigtableWriter extends DoFn<String, Void> { 
private Connection conn; 
private BufferedMutator mutator; 
private final CloudBigtableTableConfiguration btConfig; 

public HBaseBigtableWriter(CloudBigtableOptions options) { 
    this.btConfig = CloudBigtableTableConfiguration.fromCBTOptions(options); 

@Override 
public void startBundle(DoFn<String, Void>.Context c) throws Exception { 
    super.startBundle(c); 
    conn = new BigtableConnection(btConfig.toHBaseConfig()); 
    mutator = conn.getBufferedMutator(TableName.valueOf(btConfig.getTableId())); 
} 

@Override 
public void processElement(DoFn<String, Void>.ProcessContext c) { 
    Put put = Put(....); 
    //some of based on the input line.. no sideInputs or anything 
    p.addImmutable(...) 
    mutator.mutate(put); //mentioned line in stacktrace 
} 

@Override 
public void finishBundle(DoFn<String, Void>.Context c) throws Exception { 
    try { 
     mutator.close(); 
    } catch (RetriesExhaustedWithDetailsException e) { 
     retriesExceptionAggregator.addValue(1); 
     List<Throwable> causes = e.getCauses(); 
     if (causes.size() == 1) { 
      throw (Exception) causes.get(0); 
     } else { 
      throw e; 

     } 
    } 
    finally { 
     conn.close(); 
     super.finishBundle(c); 
    } 
} 
} 

而且這個人是彈出飄飛。特別是在負載 - -

java.util.concurrent.RejectedExecutionException: Task [email protected] rejected from [email protected][Shutting down, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 155291] 
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) 
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) 
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) 
at io.grpc.SerializingExecutor.execute(SerializingExecutor.java:112) 
at io.grpc.ChannelImpl$CallImpl$ClientStreamListenerImpl.closed(ChannelImpl.java:398) 
at io.grpc.transport.AbstractClientStream.closeListener(AbstractClientStream.java:256) 
at io.grpc.transport.AbstractClientStream.transportReportStatus(AbstractClientStream.java:230) 
at io.grpc.transport.AbstractClientStream.remoteEndClosed(AbstractClientStream.java:180) 
at io.grpc.transport.AbstractStream$1.endOfStream(AbstractStream.java:121) 
at io.grpc.transport.MessageDeframer.deliver(MessageDeframer.java:253) 
at io.grpc.transport.MessageDeframer.deframe(MessageDeframer.java:168) 
at io.grpc.transport.AbstractStream.deframe(AbstractStream.java:285) 
at io.grpc.transport.AbstractClientStream.inboundTrailersReceived(AbstractClientStream.java:175) 
at io.grpc.transport.Http2ClientStream.transportTrailersReceived(Http2ClientStream.java:162) 
at io.grpc.transport.netty.NettyClientStream.transportHeadersReceived(NettyClientStream.java:110) 
at io.grpc.transport.netty.NettyClientHandler.onHeadersRead(NettyClientHandler.java:179) 
at io.grpc.transport.netty.NettyClientHandler.access$800(NettyClientHandler.java:69) 
at io.grpc.transport.netty.NettyClientHandler$LazyFrameListener.onHeadersRead(NettyClientHandler.java:424) 
at com.google.bigtable.repackaged.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onHeadersRead(DefaultHttp2ConnectionDecoder.java:316) 
它看起來就像同樣是發生在谷歌SDK類

也即數據流工作2015-09-10_10_26_26-7782438171725519247

(dedc6cc776609500): org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 2 actions: StatusRuntimeException: 2 times, 
at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.handleExceptions(BigtableBufferedMutator.java:408) 
at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.doFlush(BigtableBufferedMutator.java:285) 
at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.close(BigtableBufferedMutator.java:258) 
at org.apache.hadoop.hbase.client.AbstractBigtableConnection$2.close(AbstractBigtableConnection.java:181) 
at com.google.cloud.bigtable.dataflow.CloudBigtableIO$CloudBigtableSingleTableWriteFn.finishBundle(CloudBigtableIO.java:613) 

對這些異常有什麼建議? 謝謝!

+0

結束使用'CloudBigtableIO.writeToTable()' - 並且從來沒有再次看到這個問題。 –

回答

1

關閉一個連接,然後做一個突變可能會導致你看到的堆棧跟蹤(我猜測當你停止一個工作人員,而緩衝突變仍在進行時)。

您能否請我們在我們的github問題跟蹤器上打開一個錯誤?我認爲這可能是診斷這個問題最有效的方法。 https://github.com/GoogleCloudPlatform/cloud-bigtable-client/issues

如果我正確讀取堆棧跟蹤,看起來您沒有利用CloudBigtableIO.writeToTable()方法,並且您正在使用自定義ParDo來編寫數據。如果是這樣,那麼你的問題的答案真的取決於你在你的自定義ParDo中做什麼以及「停止工人」的動態。

+0

更新了問題。我們在概念上做錯了什麼? 另一方面用'CloudBigtableIO.writeToTable()'實現了一個管道,並運行一些測試以查看Exception是否消失 –

+0

必須像Google SDK一樣去除答案標籤,我們再次看到錯誤。查看更新後的問題 –

+0

https://github.com/GoogleCloudPlatform/cloud-bigtable-client/issues/478作爲後續操作 –