不幸的是,這並不簡單,因爲Spark的源代碼中的KafkaReceiver和ReliableKafkaReceiver只存儲MessageAndMetadata.key和消息。
有兩個Spark中的JIRA與這個問題有關開放門票:已經打開了,而
。
髒的複製/粘貼/修改的斯巴克的源代碼,以解決您的問題:
package org.apache.spark.streaming.kafka
import java.lang.{Integer => JInt}
import java.util.{Map => JMap, Properties}
import kafka.consumer.{KafkaStream, Consumer, ConsumerConfig, ConsumerConnector}
import kafka.serializer.{Decoder, StringDecoder}
import kafka.utils.VerifiableProperties
import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.util.WriteAheadLogUtils
import org.apache.spark.util.ThreadUtils
import scala.collection.JavaConverters._
import scala.collection.Map
import scala.reflect._
object MoreKafkaUtils {
def createStream(
jssc: JavaStreamingContext,
zkQuorum: String,
groupId: String,
topics: JMap[String, JInt],
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): JavaReceiverInputDStream[(String, String, String)] = {
val kafkaParams = Map[String, String](
"zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
"zookeeper.connection.timeout.ms" -> "10000")
val walEnabled = WriteAheadLogUtils.enableReceiverLog(jssc.ssc.conf)
new KafkaInputDStreamWithTopic[String, String, StringDecoder, StringDecoder](jssc.ssc, kafkaParams, topics.asScala.mapValues(_.intValue()), walEnabled, storageLevel)
}
}
private[streaming]
class KafkaInputDStreamWithTopic[
K: ClassTag,
V: ClassTag,
U <: Decoder[_] : ClassTag,
T <: Decoder[_] : ClassTag](
@transient ssc_ : StreamingContext,
kafkaParams: Map[String, String],
topics: Map[String, Int],
useReliableReceiver: Boolean,
storageLevel: StorageLevel
) extends ReceiverInputDStream[(K, V, String)](ssc_) with Logging {
def getReceiver(): Receiver[(K, V, String)] = {
if (!useReliableReceiver) {
new KafkaReceiverWithTopic[K, V, U, T](kafkaParams, topics, storageLevel)
} else {
new ReliableKafkaReceiverWithTopic[K, V, U, T](kafkaParams, topics, storageLevel)
}
}
}
private[streaming]
class KafkaReceiverWithTopic[
K: ClassTag,
V: ClassTag,
U <: Decoder[_] : ClassTag,
T <: Decoder[_] : ClassTag](
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
) extends Receiver[(K, V, String)](storageLevel) with Logging {
// Connection to Kafka
var consumerConnector: ConsumerConnector = null
def onStop() {
if (consumerConnector != null) {
consumerConnector.shutdown()
consumerConnector = null
}
}
def onStart() {
logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id"))
// Kafka connection properties
val props = new Properties()
kafkaParams.foreach(param => props.put(param._1, param._2))
val zkConnect = kafkaParams("zookeeper.connect")
// Create the connection to the cluster
logInfo("Connecting to Zookeeper: " + zkConnect)
val consumerConfig = new ConsumerConfig(props)
consumerConnector = Consumer.create(consumerConfig)
logInfo("Connected to " + zkConnect)
val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
.newInstance(consumerConfig.props)
.asInstanceOf[Decoder[K]]
val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
.newInstance(consumerConfig.props)
.asInstanceOf[Decoder[V]]
// Create threads for each topic/message Stream we are listening
val topicMessageStreams = consumerConnector.createMessageStreams(
topics, keyDecoder, valueDecoder)
val executorPool =
ThreadUtils.newDaemonFixedThreadPool(topics.values.sum, "KafkaMessageHandler")
try {
// Start the messages handler for each partition
topicMessageStreams.values.foreach { streams =>
streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }
}
} finally {
executorPool.shutdown() // Just causes threads to terminate after work is done
}
}
// Handles Kafka messages
private class MessageHandler(stream: KafkaStream[K, V])
extends Runnable {
def run() {
logInfo("Starting MessageHandler.")
try {
val streamIterator = stream.iterator()
while (streamIterator.hasNext()) {
val msgAndMetadata = streamIterator.next()
store((msgAndMetadata.key, msgAndMetadata.message, msgAndMetadata.topic))
}
} catch {
case e: Throwable => reportError("Error handling message; exiting", e)
}
}
}
}
我對這個答案很感興趣。你找到方法了嗎? –
@阿倫:你找到解決方案嗎?如果是這樣,你能分享它嗎?謝謝! – jithinpt