2017-08-29 61 views
1

願意將數據寫回kafka的效率最高,我有興趣使用Akka Stream將我的RDD分區寫回到Kafka。Akka Stream從Spark工作內部寫入kafka

問題是,我需要一種方法來創建每個執行器的actor系統,而不是每個分區,這將是荒謬的。一個JVM上的一個節點上最多可能有8個actorSystems。然而,每個分區有一個Stream是很好的。

有沒有人已經這樣做?

我的理解是,一個演員系統不能被序列化,因此不能是 發送了具有每個執行者的廣播變量。

如果有人在圍繞這個方法尋找解決方案方面有經驗,那麼請您分享一下嗎?

否則我總是可以回落到https://index.scala-lang.org/benfradet/spark-kafka-writer/spark-kafka-0-10-writer/0.3.0?target=_2.11但我不確定這是最有效的方式。

+0

看到使用香草卡夫卡生產者和使用阿卡流之間的性能比較會很有趣。我看不出一個強大的原因,爲什麼一個人會比另一個人快得多。 – maasg

+0

我的假設是,您從反應流功能中受益,反應流功能本質上被認爲更具動態性,因此速度更快。減少阻塞。我不知道如何執行背壓和全部與原生產者。它只是更多的工作 – MaatDeamon

+0

卡夫卡生產者內部實現了一個緩衝機制來優化吞吐量。在寫作方面,我沒有看到任何好處:不需要背壓:Kafka通常足夠快,可以接受任何負載,並給出合適的尺寸。阻止:緩衝不是一個真正的問題,我想你還是要等到寫入完成才能繼續並完成已完成工作的補償。從遠處看,它看起來像一個無雙贏的優化。但我想這些數字會贏得意見,所以看比較會很有趣。 – maasg

回答

1

您可以隨時與演員系統定義一個全局懶VAL:

object Execution { 
    implicit lazy val actorSystem: ActorSystem = ActorSystem() 
    implicit lazy val materializer: Materializer = ActorMaterializer() 
} 

然後你只需要導入它在任何類的,你想用阿卡流:

import Execution._ 

val stream: DStream[...] = ... 

stream.foreachRDD { rdd => 
    ... 
    rdd.foreachPartition { records => 
    val (queue, done) = Source.queue(...) 
     .via(Producer.flow(...)) 
     .toMat(Sink.ignore)(Keep.both) 
     .run() // implicitly pulls `Execution.materializer` from scope, 
       // which in turn will initialize `Execution.actorSystem` 

    ... // push records to the queue 

    // wait until the stream is completed 
    Await.result(done, 10.minutes) 
    } 
} 

的上面是一種僞代碼,但我認爲它應該表達一般想法。

這樣,當需要時,系統將在每個執行器JVM上初始化一次。此外,您可以讓演員系統「邪」,以便它能夠自動關機,當JVM完成:

object Execution { 
    private lazy val config = ConfigFactory.parseString("akka.daemonic = on") 
    .withFallback(ConfigFactory.load()) 
    implicit lazy val actorSystem: ActorSystem = ActorSystem("system", config) 
    implicit lazy val materializer: Materializer = ActorMaterializer() 
} 

我們在我們的工作星火這樣做的,它完美的作品。

此工作方式沒有任何類型的廣播變量,自然可以用於各種Spark作業,流式處理或其他。因爲系統是在單例對象中定義的,所以它保證每個JVM實例只能初始化一次(模仿各種類加載器模式,但在Spark的上下文中並不重要),因此即使某些分區被放置到同一個JVM上(可能在不同的線程中),它只會初始化一次actor系統。 lazy val確保初始化的線程安全性,並且ActorSystem是線程安全的,所以這也不會在這方面引起問題。

+0

謝謝。讓我更好地理解它。所以你根本不使用廣播變量?你能給出更多的上下文代碼嗎,你如何在你的spark操作中使用Execution對象。我想在火花流媒體中使用它,我不確定是否每個RDD都有一個新的演員系統,或者當你把它放在JVM的時候。你能幫助澄清一點嗎?什麼是你的背景,火花流媒體工作或正常的火花工作。 – MaatDeamon

+0

我已經擴展了我的答案,希望現在可以回答你的問題。 –

+0

你如何停止流,你使用與記錄的大小? – MaatDeamon