2016-12-14 87 views
1

我有以下流:阿卡流TCP +阿卡流卡夫卡生產者未停止不發佈消息,而不是錯誤-ING出

Source(IndexedSeq(ByteString.empty)) 
.via(
    Tcp().outgoingConnection(bsAddress, bsPort) 
    .via(Framing.delimiter(ByteString("\n"), 256, allowTruncation = true)) 
    .map(_.utf8String) 
) 
.map(m => new ProducerRecord[Array[Byte], String](kafkaTopic, m)) 
.runWith(
    Producer.plainSink(
    ProducerSettings(system, new ByteArraySerializer, new StringSerializer) 
     .withBootstrapServers(s"${kafkaAddress}:${kafkaPort}") 
) 
).onComplete { 
    case Success(Done) => printAndByeBye("Stream ends successfully") 
    case Failure(ex) => printAndByeBye("Stream ends with an error: " + ex.toString) 
    } 

它正常工作了一段時間,我可以消耗填充上的消息卡夫卡話題。但有時候,顯然是隨機的間隔,沒有更多的消息發佈,並且這段代碼沒有記錄任何錯誤(printAndByeBye將打印傳遞的消息並終止參與者系統。)重新啓動應用程序後,消息繼續流。

關於如何知道這裏發生了什麼的任何想法?

編輯:我把卡蒙它,我可以看到以下行爲:

Mailbox Size per Actor

Time in Mailbox per Actor

Processing Time per Actor

它看起來像是停不通知流應該停止,但我不知道如何明確並停止流。

回答

0

流沒有失敗,但TCP流已空閒,因爲設備發佈數據在一段時間後停止發送數據而沒有丟失連接。 而不是使用簡單的:

TCP().outgoingConnection(bsAddress, bsPort) 

我最終使用:

def outgoingConnection(
remoteAddress: InetSocketAddress, 
localAddress: Option[InetSocketAddress]   = None, 
options:  immutable.Traversable[SocketOption] = Nil, 
halfClose:  Boolean        = true, 
connectTimeout: Duration       = Duration.Inf, 
idleTimeout: Duration       = Duration.Inf): Flow[ByteString, ByteString, Future[OutgoingConnection]] = ??? 

所以

Tcp().outgoingConnection(bsAddress, bsPort) 

通過通知的idleTimeout成爲

val connectTimeout: Duration = 1 second 
val idleTimeout: Duration = 2 second 
Tcp().outgoingConnection(
    remoteAddress = InetSocketAddress.createUnresolved(bsAddress, bsPort), 
    connectTimeout = connectTimeout, 
    idleTimeout = idleTimeout 
) 

,在遵循啓動失敗一個d另一個流程可以重新啓動。

0

我倒是建議創建與監督的流動屬性,像你的TCP連接來處理可能出現的異常:

val flow = 
    Tcp().outgoingConnection("", 12) 
      .via(Framing.delimiter(ByteString("\n"), 256, allowTruncation = true)) 
      .map(_.utf8String).withAttributes(ActorAttributes.supervisionStrategy { 
     case ex: Throwable => 
     println("Error ocurred: " + ex) 
     Supervision.Resume 
    } 

Source(IndexedSeq(ByteString.empty)) 
.via(flow) 
.map(m => new ProducerRecord[Array[Byte], String](kafkaTopic, m)) 
.runWith(
    Producer.plainSink(
    ProducerSettings(system, new ByteArraySerializer, new StringSerializer) 
     .withBootstrapServers(s"${kafkaAddress}:${kafkaPort}") 
) 
).onComplete { 
    case Success(Done) => printAndByeBye("Stream ends successfully") 
    case Failure(ex) => printAndByeBye("Stream ends with an error: " + ex.toString) 
    } 

如果存在與流量流中的所有錯誤停止。有了這個配置,你會看到流是否引發了任何異常。

+0

流不失敗的。沒有錯誤記錄。 – Darien

0

如果一切都安靜下來可能被降到背壓應用的地方。 嘗試並選擇性地用非背壓感知階段替換您的背壓感知階段,並檢查問題是否仍然存在。 你的情況有背壓的2個可能的來源:

1)的TCP連接

你可以試試,並附ByteString無限源卡夫卡,線沿線的做事:

Source.cycle(() => List(???).iterator) 
.map(m => new ProducerRecord[Array[Byte], String](kafkaTopic, m)) 
.runWith(
    Producer.plainSink(
    ProducerSettings(system, new ByteArraySerializer, new StringSerializer) 
     .withBootstrapServers(s"${kafkaAddress}:${kafkaPort}") 
) 
).onComplete { 
    case Success(Done) => printAndByeBye("Stream ends successfully") 
    case Failure(ex) => printAndByeBye("Stream ends with an error: " + ex.toString) 
    } 

2)卡夫卡下沉

一些測井

Source(IndexedSeq(ByteString.empty)) 
.via(
    Tcp().outgoingConnection(bsAddress, bsPort) 
    .via(Framing.delimiter(ByteString("\n"), 256, allowTruncation = true)) 
    .map(_.utf8String) 
) 
.map(m => new ProducerRecord[Array[Byte], String](kafkaTopic, m)) 
.runForeach(println) 
.onComplete { 
    case Success(Done) => printAndByeBye("Stream ends successfully") 
    case Failure(ex) => printAndByeBye("Stream ends with an error: " + ex.toString) 
    } 
取代它

您是否只能在兩種情況中的一種情況下看到問題?同時?沒有?

+0

有沒有辦法監控這種情況?或者當背壓停止流時超時? – Darien

+0

有捕獲階段背壓超時http://doc.akka.io/docs/akka/2.4.14/scala/stream/stages-overview.html#backpressureTimeout –

+0

它發生在使用TCP流中獲取數據。 – Darien