2017-09-26 31 views
0

在使用Akka Streams時,似乎我永遠無法獲得錯誤處理權限。流程失敗,出現錯誤因違反反應流程規範而關閉

所以這是我的代碼

var db = Database.forConfig("oracle") 
var mysqlDb = Database.forConfig("mysql_read") 
var mysqlDbWrite = Database.forConfig("mysql_write") 

implicit val actorSystem = ActorSystem() 
val decider : Supervision.Decider = { 
    case _: Exception => 
     println("got an exception restarting connections") 
    // let us restart our connections 
    db.close() 
    mysqlDb.close() 
    mysqlDbWrite.close() 
    db = Database.forConfig("oracle") 
    mysqlDb = Database.forConfig("mysql_read") 
    mysqlDbWrite = Database.forConfig("mysql_write") 
    Supervision.Restart 
} 
implicit val materializer = ActorMaterializer(ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider)) 

和我有這樣

val alreadyExistsFilter : Flow[Foo, Foo, NotUsed] = Flow[Foo].mapAsync(10){ foo => 
    try { 
    val existsQuery = sql"""SELECT id FROM foo WHERE id = ${foo.id}""".as[Long] 
    mysqlDbWrite.run(existsQuery).map(v => (foo, v)) 
    } catch { 
    case e: Throwable => 
     println(s"Lookup failed for ${foo}") 
     throw e // will restart the stream 
    } 
}.collect {case (f, v) if v.isEmpty => f} 

所以基本上如果FOO在MySQL中已經存在,則記錄不應該被任何進一步的處理的流流。

我的這個代碼的希望是,如果任何失敗的MySQL查找(MySQL機器是非常糟糕的,超時是常見的),記錄將被打印並丟棄,該流將繼續與剩餘的記錄禮貌監管。

當我運行此代碼。我看到這樣

[error] (mysql_write network timeout executor) java.lang.RuntimeException: java.sql.SQLException: Invalid socket timeout value or state 
java.lang.RuntimeException: java.sql.SQLException: Invalid socket timeout value or state 
    at com.mysql.jdbc.ConnectionImpl$12.run(ConnectionImpl.java:5576) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.sql.SQLException: Invalid socket timeout value or state 
    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:998) 
    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:937) 
    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:926) 
    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:872) 
    at com.mysql.jdbc.MysqlIO.setSocketTimeout(MysqlIO.java:4852) 
    at com.mysql.jdbc.ConnectionImpl$12.run(ConnectionImpl.java:5574) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.net.SocketException: Socket is closed 
    at java.net.Socket.setSoTimeout(Socket.java:1137) 
    at com.mysql.jdbc.MysqlIO.setSocketTimeout(MysqlIO.java:4850) 
    at com.mysql.jdbc.ConnectionImpl$12.run(ConnectionImpl.java:5574) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

[error] (mysql_write network timeout executor) java.lang.NullPointerException 
java.lang.NullPointerException 
    at com.mysql.jdbc.MysqlIO.setSocketTimeout(MysqlIO.java:4850) 
    at com.mysql.jdbc.ConnectionImpl$12.run(ConnectionImpl.java:5574) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

其中哪些驚喜我這裏要說的是這些異常並非來自我的catch塊的事情錯誤。因爲我沒有看到我的catch塊的println語句。堆棧跟蹤不會顯示它來自哪裏......但由於它說的是mysql_write我可以認爲它是上面的Flow,因爲只有這個Flow使用mysql_write

最後整個流崩潰,並顯示錯誤

[trace] Stack trace suppressed: run last compile:runMain for the full output. 
flow has failed with error Shutting down because of violation of the Reactive Streams specification. 
14:51:06,973 |-INFO in ch.qos.logback.classic.AsyncAppender[asyncKafkaAppender] - Worker thread will flush remaining events before exiting. 
[success] Total time: 3480 s, completed Sep 26, 2017 2:51:07 PM 
14:51:07,603 |-INFO in [email protected] - Sleeping for 1 seconds 

我不知道我做了什麼違反反應流規範!

回答

1

第一次嘗試獲得更可預測的解決方案是刪除阻止行爲(Await.result)並使用mapAsync。在alreadyExistsFilter流動的改寫可能是:

val alreadyExistsFilter : Flow[Foo, Foo, NotUsed] = Flow[Foo].mapAsync(3) { foo ⇒ 
    val existsQuery = sql"""SELECT id FROM foo WHERE id = ${foo.id}""".as[Long] 
    foo → Await.result(mysqlDbWrite.run(existsQuery), Duration.Inf) 
    }.collect{ 
    case (foo, res) if res.isDefined ⇒ foo 
    } 

在阿卡阻擋更多信息可以在docs找到。

+0

確定問題再次出現。我正在更新我的帖子。 –

0

斯特凡諾給出的答案是正確的。由於流程中的代碼被阻塞,錯誤確實來了。

雖然,我的初始程序運行在scala 2.11上,甚至在切換到mapAsync之後,問題仍然存在。

由於這是一個命令行工具,我很容易切換到Scala 2.12並重試。

當我嘗試使用Scala 2.12時,它工作完美。

對我有很大幫助的一件事是在依賴關係中有"ch.qos.logback" % "logback-classic" % "1.2.3",。這將向您顯示正在執行的每條SQL語句,並輕鬆查看是否出現問題。