2017-02-21 86 views
9

我想讓消費者演員訂閱Kafka主題並流數據,以便在消費者之外使用Spark Streaming進行進一步處理。爲什麼是演員?因爲我讀到它的主管策略是處理卡夫卡故障的好方法(例如,在發生故障時重新啓動)。來自演員的Spark-Streaming

我發現了兩個選項:

  • Java的KafkaConsumer類:其poll()方法返回一個Map[String, Object]。我想要返回一個DStream就像KafkaUtils.createDirectStream會,並且我不知道如何從演員外部獲取流。
  • 擴展ActorHelper特徵並使用actorStream(),如example所示。後一個選項不顯示到主題的連接,而是顯示到套接字的連接。

任何人都可以指向正確的方向嗎?

回答

2

如需辦理卡夫卡的失敗,我使用了Apache館長框架,並採取以下解決辦法:

val client: CuratorFramework = ... // see docs 
val zk: CuratorZookeeperClient = client.getZookeeperClient 

/** 
    * This method returns false if kafka or zookeeper is down. 
    */ 
def isKafkaAvailable:Boolean = 
    Try { 
     if (zk.isConnected) { 
     val xs = client.getChildren.forPath("/brokers/ids") 
     xs.size() > 0 
     } 
     else false 
    }.getOrElse(false) 

對於消費卡夫卡的話題,我用了com.softwaremill.reactivekafka庫。例如:

class KafkaConsumerActor extends Actor { 
    val kafka = new ReactiveKafka() 
    val config: ConsumerProperties[Array[Byte], Any] = ... // see docs 

    override def preStart(): Unit = { 
     super.preStart() 

     val publisher = kafka.consume(config) 
     Source.fromPublisher(publisher) 
      .map(handleKafkaRecord) 
      .to(Sink.ignore).run() 
    } 

    /** 
    * This method will be invoked when any kafka records will happen. 
    */ 
    def handleKafkaRecord(r: ConsumerRecord[Array[Byte], Any]) = { 
     // handle record 
    } 
}