在使用Akka Streams時,似乎我永遠無法獲得錯誤處理權限。流程失敗,出現錯誤因違反反應流程規範而關閉
所以這是我的代碼
var db = Database.forConfig("oracle")
var mysqlDb = Database.forConfig("mysql_read")
var mysqlDbWrite = Database.forConfig("mysql_write")
implicit val actorSystem = ActorSystem()
val decider : Supervision.Decider = {
case _: Exception =>
println("got an exception restarting connections")
// let us restart our connections
db.close()
mysqlDb.close()
mysqlDbWrite.close()
db = Database.forConfig("oracle")
mysqlDb = Database.forConfig("mysql_read")
mysqlDbWrite = Database.forConfig("mysql_write")
Supervision.Restart
}
implicit val materializer = ActorMaterializer(ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider))
和我有這樣
val alreadyExistsFilter : Flow[Foo, Foo, NotUsed] = Flow[Foo].mapAsync(10){ foo =>
try {
val existsQuery = sql"""SELECT id FROM foo WHERE id = ${foo.id}""".as[Long]
mysqlDbWrite.run(existsQuery).map(v => (foo, v))
} catch {
case e: Throwable =>
println(s"Lookup failed for ${foo}")
throw e // will restart the stream
}
}.collect {case (f, v) if v.isEmpty => f}
所以基本上如果FOO在MySQL中已經存在,則記錄不應該被任何進一步的處理的流流。
我的這個代碼的希望是,如果任何失敗的MySQL查找(MySQL機器是非常糟糕的,超時是常見的),記錄將被打印並丟棄,該流將繼續與剩餘的記錄禮貌監管。
當我運行此代碼。我看到這樣
[error] (mysql_write network timeout executor) java.lang.RuntimeException: java.sql.SQLException: Invalid socket timeout value or state
java.lang.RuntimeException: java.sql.SQLException: Invalid socket timeout value or state
at com.mysql.jdbc.ConnectionImpl$12.run(ConnectionImpl.java:5576)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.sql.SQLException: Invalid socket timeout value or state
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:998)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:937)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:926)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:872)
at com.mysql.jdbc.MysqlIO.setSocketTimeout(MysqlIO.java:4852)
at com.mysql.jdbc.ConnectionImpl$12.run(ConnectionImpl.java:5574)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.SocketException: Socket is closed
at java.net.Socket.setSoTimeout(Socket.java:1137)
at com.mysql.jdbc.MysqlIO.setSocketTimeout(MysqlIO.java:4850)
at com.mysql.jdbc.ConnectionImpl$12.run(ConnectionImpl.java:5574)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
和
[error] (mysql_write network timeout executor) java.lang.NullPointerException
java.lang.NullPointerException
at com.mysql.jdbc.MysqlIO.setSocketTimeout(MysqlIO.java:4850)
at com.mysql.jdbc.ConnectionImpl$12.run(ConnectionImpl.java:5574)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
其中哪些驚喜我這裏要說的是這些異常並非來自我的catch塊的事情錯誤。因爲我沒有看到我的catch塊的println語句。堆棧跟蹤不會顯示它來自哪裏......但由於它說的是mysql_write
我可以認爲它是上面的Flow,因爲只有這個Flow使用mysql_write
。
最後整個流崩潰,並顯示錯誤
[trace] Stack trace suppressed: run last compile:runMain for the full output.
flow has failed with error Shutting down because of violation of the Reactive Streams specification.
14:51:06,973 |-INFO in ch.qos.logback.classic.AsyncAppender[asyncKafkaAppender] - Worker thread will flush remaining events before exiting.
[success] Total time: 3480 s, completed Sep 26, 2017 2:51:07 PM
14:51:07,603 |-INFO in [email protected] - Sleeping for 1 seconds
我不知道我做了什麼違反反應流規範!
確定問題再次出現。我正在更新我的帖子。 –