我是spark和kafka的新手,我與kafka具有略微不同的spark spark使用模式。 我使用訂閱相同kafka主題的Spark流應用程序
spark-core_2.10 - 2.1.1
spark-streaming_2.10 - 2.1.1
spark-streaming-kafka-0-10_2.10 - 2.0.0
kafka_2.10 - 0.10.1.1
連續的事件數據被傳輸到卡夫卡的話題,我需要從多個火花流應用程序處理。但是當我運行火花流應用程序時,只有其中一個接收數據。
Map<String, Object> kafkaParams = new HashMap<String, Object>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("group.id", "test-consumer-group");
kafkaParams.put("enable.auto.commit", "true");
kafkaParams.put("auto.commit.interval.ms", "1000");
kafkaParams.put("session.timeout.ms", "30000");
Collection<String> topics = Arrays.asList("4908100105999_000005");;
JavaInputDStream<ConsumerRecord<String, String>> stream = org.apache.spark.streaming.kafka010.KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String> Subscribe(topics, kafkaParams));
... //spark processing
我有兩個火花流應用程序,通常我提交的第一個應用程序使用kafka消息。第二個應用程序只是等待消息,永遠不會進行。 當我閱讀時,卡夫卡主題可以從多個消費者訂閱,對於火花流媒體來說是不是真的?或者有什麼我缺少kafka主題及其配置?
在此先感謝。
我在兩個消費者中都使用了相同的組ID,因此只有一個消費者正在接收消息。具有不同group.id的消費者訂閱同一主題,並行/分別接收消息。 – Gurubg
是的,如果您使用相同的組ID,那麼只有一個會收到該消息。 –