2017-07-04 392 views
0

我正在嘗試使用GRPC在Scala中編寫流式服務。爲了實現這個我寫了這個原型文件Scala Grpc失敗,出現錯誤io.grpc.StatusRuntimeException:取消:無法讀取消息

syntax = "proto3"; 
package com.abhi.grpc; 

message TimeRequest{} 
message TimeResponse { 
    int64 currentTime = 1; 
} 

service Clock { 
    rpc StreamTime(TimeRequest) returns (stream TimeResponse); 
} 

這是我的服務器端代碼

import com.abhi.grpc.clock.{ClockGrpc, TimeRequest, TimeResponse} 
import io.grpc.stub.StreamObserver 
import monix.execution.Scheduler 
import monix.execution.Scheduler.{global => scheduler} 
import scala.concurrent.duration._ 

object ClockGrpcServer extends GrpcServer with App { 
    val ssd = ClockGrpc.bindService(new ClockGRPC(), Scheduler.global) 
    runServer(ssd, "Clock") 
} 

class ClockGRPC extends ClockGrpc.Clock { 
    override def streamTime(request: TimeRequest, responseObserver: StreamObserver[TimeResponse]): Unit = { 
     scheduler.scheduleWithFixedDelay(0.seconds, 3.seconds) { 
     responseObserver.onNext(TimeResponse(System.currentTimeMillis)) 
     } 
    } 
} 

,這是我的客戶

object ClockGrpcClient extends App { 
    val channel = ManagedChannelBuilder.forAddress("localhost", 50051).usePlaintext(true).build() 
    val stub = ClockGrpc.stub(channel) 
    val observer = new StreamObserver[TimeResponse] { 
     override def onError(t: Throwable): Unit = println(s"failed with error ${t}") 
     override def onCompleted(): Unit = println("closing observer") 
     override def onNext(value: TimeResponse): Unit = println(s"received time ${new DateTime(value)}") 
    } 
    stub.streamTime(TimeRequest(), observer) 
    StdIn.readLine() 
} 

當我運行的服務器和客戶端。服務器只要它接收來自客戶端

io.grpc.StatusRuntimeException: CANCELLED 
     at io.grpc.Status.asRuntimeException(Status.java:534) 
     at io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:279) 
     at com.abhi.ClockGRPC.$anonfun$streamTime$1(ClockGRPC.scala:22) 
     at monix.execution.internal.RunnableAction.run(RunnableAction.scala:25) 
     at monix.execution.schedulers.ReferenceScheduler$$anon$1.run(ReferenceScheduler.scala:45) 
     at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:140) 
     at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) 
     at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) 
     at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) 
     at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) 

我GOOGLE了一點,發現這篇文章

https://blog.codecentric.de/en/2017/01/hello-grpc-scalapb/

基於這一點,我改變了我的服務器使用的任何消息引發以下錯誤java.util scheduler

class ClockGRPC extends ClockGrpc.Clock { 
    val scheduler = Executors.newSingleThreadScheduledExecutor() 
    override def streamTime(request: TimeRequest, responseObserver: StreamObserver[TimeResponse]): Unit = { 
     val tick = new Runnable { 
     val counter = new AtomicInteger(10) 
     def run() = 
      if (counter.getAndDecrement() >= 0) { 
       val currentTime = System.currentTimeMillis() 
       responseObserver.onNext(TimeResponse(currentTime)) 
      } else { 
       scheduler.shutdown() 
       responseObserver.onCompleted() 
      } 
     } 
     scheduler.scheduleAtFixedRate(tick, 0l, 3000l, TimeUnit.SECONDS) 
    } 
} 

但我仍然得到CANCELED錯誤。所以我無法得到流式的例子。

+0

你在客戶端看到什麼? –

+0

'[info]運行com.abhi.ClockGrpcClient 失敗,出現錯誤io.grpc.StatusRuntimeException:取消:無法讀取消息.' –

+0

這裏是我的代碼https://github.com/abhsrivastava/GrpcTest –

回答

0

我幾乎已經放棄了這個問題。但今天回來並解決了它。

的問題是與線

override def onNext(value: TimeResponse): Unit = println(s"received time ${new DateTime(value)}") 

值不能被傳遞到new DateTime

爲了進一步使事情變得更糟。如果在回調方法中發生異常。 GRPC吞下它與一般錯誤消息

info] Running com.abhi.ClockGrpcClient failed with error io.grpc.StatusRuntimeException: CANCELLED: Failed to read message. 

我說,他的DateTime使用一個對象作爲參數,這樣編譯成功,運氣不好替換它,但呼叫在運行時失敗,異常是由GRPC吞噬。

我在這裏離開這裏,以便幫助別人。

[info] Running com.abhi.ClockGrpcClient failed with error io.grpc.StatusRuntimeException: CANCELLED: Failed to read message 

表示有人在回調函數中出錯。

+0

很高興你有這個解決。關於gRPC Java的StatusRuntimeException,簡單介紹一下:gRPC將把異常作爲StatusRuntimeException包裝並重新拋出,但是原始異常不會被吞下。它被保存爲錯誤原因(https://github.com/grpc/grpc-java/blob/master/core/src/main/java/io/grpc/internal/ClientCallImpl.java#L487),並且可以通過狀態#getCause進行訪問。 –

相關問題