您可以隨時與演員系統定義一個全局懶VAL:
object Execution {
implicit lazy val actorSystem: ActorSystem = ActorSystem()
implicit lazy val materializer: Materializer = ActorMaterializer()
}
然後你只需要導入它在任何類的,你想用阿卡流:
import Execution._
val stream: DStream[...] = ...
stream.foreachRDD { rdd =>
...
rdd.foreachPartition { records =>
val (queue, done) = Source.queue(...)
.via(Producer.flow(...))
.toMat(Sink.ignore)(Keep.both)
.run() // implicitly pulls `Execution.materializer` from scope,
// which in turn will initialize `Execution.actorSystem`
... // push records to the queue
// wait until the stream is completed
Await.result(done, 10.minutes)
}
}
的上面是一種僞代碼,但我認爲它應該表達一般想法。
這樣,當需要時,系統將在每個執行器JVM上初始化一次。此外,您可以讓演員系統「邪」,以便它能夠自動關機,當JVM完成:
object Execution {
private lazy val config = ConfigFactory.parseString("akka.daemonic = on")
.withFallback(ConfigFactory.load())
implicit lazy val actorSystem: ActorSystem = ActorSystem("system", config)
implicit lazy val materializer: Materializer = ActorMaterializer()
}
我們在我們的工作星火這樣做的,它完美的作品。
此工作方式沒有任何類型的廣播變量,自然可以用於各種Spark作業,流式處理或其他。因爲系統是在單例對象中定義的,所以它保證每個JVM實例只能初始化一次(模仿各種類加載器模式,但在Spark的上下文中並不重要),因此即使某些分區被放置到同一個JVM上(可能在不同的線程中),它只會初始化一次actor系統。 lazy val
確保初始化的線程安全性,並且ActorSystem
是線程安全的,所以這也不會在這方面引起問題。
看到使用香草卡夫卡生產者和使用阿卡流之間的性能比較會很有趣。我看不出一個強大的原因,爲什麼一個人會比另一個人快得多。 – maasg
我的假設是,您從反應流功能中受益,反應流功能本質上被認爲更具動態性,因此速度更快。減少阻塞。我不知道如何執行背壓和全部與原生產者。它只是更多的工作 – MaatDeamon
卡夫卡生產者內部實現了一個緩衝機制來優化吞吐量。在寫作方面,我沒有看到任何好處:不需要背壓:Kafka通常足夠快,可以接受任何負載,並給出合適的尺寸。阻止:緩衝不是一個真正的問題,我想你還是要等到寫入完成才能繼續並完成已完成工作的補償。從遠處看,它看起來像一個無雙贏的優化。但我想這些數字會贏得意見,所以看比較會很有趣。 – maasg