2016-08-11 53 views
0

使用Akka 2.4.7。我想記錄整個Http Response。使用類似於How does one log Akka HTTP client requests關注的代碼的實現是提取出來的數據的HttpEntity如果實體大小> 1K,則Akka-Stream記錄物化流失敗

def entityAsString(entity: HttpEntity) (implicit m: Materializer, ex: ExecutionContext): Future[String] = { 
    entity.dataBytes.map(_.decodeString("UTF-8")).runWith(Sink.head) 
} 

這種運作良好,如果POST請求有一個小的有效載荷之一。但是,從1K開始有一個例外:

java.lang.IllegalStateException: Substream Source cannot be materialized more than once 

問題:爲什麼是這樣的例外取決於POST有效載荷的大小。希望有任何可能的解決辦法?

完整的日誌信息:

2016年8月11日10:15:35100 ERROR aaActorSystemImpl [未定義]:的請求的HttpRequest(列舉HTTPMethod(POST)在加工過程中出現錯誤,http://localhost:3001/api/v2/exec,List(User-Agent:捲曲/ 7.30.0,主機:localhost:3001,Accept:/,Expect:100-continue,Timeout-Access:),HttpEntity.Default(multipart/form-data; boundary = --------------- ---------- acebdf13572468;字符集= UTF-8,5599,源(SourceShape(StreamUtils $$匿名$ 2.out),CompositeModule [2db5bfef]
名稱:無名
模塊:
(無名)CompositeModule [4aac8b90]
名稱:無名
模塊:
(副信號源%28EntitySource%29)GraphStage(EntitySource)[073d36ba]
(未命名)[155dd7c9] GraphStage(OneHundredContinueStage)的副本[40b6c892]
(未命名)[1b902132] GraphStage的副本(收集)[75f65c1c]
(limitable)[76375468]的CompositeModule [59626a09]
名稱副本:limitable
模塊:
(未命名)GraphStage(未知操作)[1bee846d]
下行流:
個 上行流:
MatValue:忽略
下行流:
SubSource.out - > GraphStage.in
GraphStage.out - > Collect.in
Collect.out - > unknown-operation.in
上行流:
GraphStage.in - > SubSource.out
Collect.in - > GraphStage.out
unknown-operation.in - > Collect.out
MatValue:原子(副信號源%28EntitySource%29 [073d36ba])
(未命名)[77d6c04c] GraphStage的副本([email protected])[7e073049]
下行流:
SubSource.out - > GraphStage.in
GraphStage.out - > Collect.in
Collect.out - > unknown-operation.in
未知operation.out - > StreamUtils $$不久$ 2.in
上行流:
GraphStage.in - > SubSource.out
收集.in - > GraphStage。出
unknown-operation.in - > Collect.out
StreamUtils $$不久$ 2.in - >未知operation.out
MatValue:原子(akka.stream.impl.StreamLayout $ CompositeModule [4aac8b90]))) ,HttpProtocol(HTTP/1.1))
java.lang.IllegalStateException:子流源不能在akka.stream.impl.fusing.SubSource $$匿名$ 4.setCB(StreamOfStreams.scala物化不止一次
更多:703)
在akka.stream.impl.fusing.SubSource $$不久$ 4.preStart(StreamOfStreams.scala:713)
在akka.stream.impl.fusing.GraphInterpreter.init(GraphInterpreter.scala:475)
在akka.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:380)
在akka.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:538)
在akka.stream.impl。 fusing.ActorGraphInterpreter.preStart(ActorGraphInterpreter.scala:586)
在akka.actor.Actor $ class.aroundPreStart(Actor.scala:489)
在akka.stream.impl.fusing.ActorGraphInterpreter.aroundPreStart(ActorGraphInterpreter.scala: 529)
在akka.actor.ActorCell.create(ActorCell.scala:590)
在akka.actor.ActorCell.invokeAll $ 1(ActorCell.scala:461)
在akka.actor.ActorCell.syst emInvoke(ActorCell.scala:483)
在akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
在akka.dispatch.Mailbox.run(Mailbox.scala:223)
在akka.dispatch.Mailbox .exec(Mailbox.scala:234)
在scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
在scala.concurrent.forkjoin.ForkJoinPool $ WorkQueue.runTask(ForkJoinPool.java:1339)
在scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
在scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

+0

你試過'entity.withoutSizeLimit.dataBytes'嗎?這是默認限制的。 – Mikesname

+0

@Mikesname謝謝,'entity.withoutSizeLimit.dataBytes'只適用於第一次POST。之後,行爲與之前完全一樣(只要POST消息大於1K,就是異常)。短信msg仍然成功。 – Polymerase

+0

你嘗試過'entity.toStrict'嗎? –

回答

0

我假設entity.dataBytes在調用這個entityAsString或者調用entityAsString兩次之前已經用於一些有用的目的。一般情況下,HttpEntity的內容不能重複使用。但是,HttpEntity.Strict的內容可以重複使用。