2017-08-29 41 views
2

我是spark和kafka的新手,我與kafka具有略微不同的spark spark使用模式。 我使用訂閱相同kafka主題的Spark流應用程序

spark-core_2.10 - 2.1.1 
spark-streaming_2.10 - 2.1.1 
spark-streaming-kafka-0-10_2.10 - 2.0.0 
kafka_2.10 - 0.10.1.1 

連續的事件數據被傳輸到卡夫卡的話題,我需要從多個火花流應用程序處理。但是當我運行火花流應用程序時,只有其中一個接收數據。

 Map<String, Object> kafkaParams = new HashMap<String, Object>(); 

    kafkaParams.put("bootstrap.servers", "localhost:9092"); 
    kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    kafkaParams.put("auto.offset.reset", "latest"); 
    kafkaParams.put("group.id", "test-consumer-group"); 
    kafkaParams.put("enable.auto.commit", "true"); 
    kafkaParams.put("auto.commit.interval.ms", "1000"); 
    kafkaParams.put("session.timeout.ms", "30000"); 

    Collection<String> topics = Arrays.asList("4908100105999_000005");; 
    JavaInputDStream<ConsumerRecord<String, String>> stream = org.apache.spark.streaming.kafka010.KafkaUtils.createDirectStream(
        ssc, 
        LocationStrategies.PreferConsistent(), 
        ConsumerStrategies.<String, String> Subscribe(topics, kafkaParams)); 

     ... //spark processing 

我有兩個火花流應用程序,通常我提交的第一個應用程序使用kafka消息。第二個應用程序只是等待消息,永遠不會進行。 當我閱讀時,卡夫卡主題可以從多個消費者訂閱,對於火花流媒體來說是不是真的?或者有什麼我缺少kafka主題及其配置?

在此先感謝。

回答

0

您可以使用相同的groupid創建不同的流。下面是從0.8集成的在線文檔的詳細信息,有兩種做法:

方法1:基於接收器的方法

多卡夫卡輸入DStreams可以用不同的組創建和 議題使用多個接收器並行接收數據。

方法二:直接法(沒有接收者)

無需創建多個輸入流卡夫卡和工會它們。使用 directStream,Spark Streaming將創建與 一樣多的RDD分區,並使用卡夫卡分區,這將全部從 卡夫卡中並行讀取數據。因此,卡夫卡和RDD分區之間存在一對一的映射關係,這更易於理解和調整。

就像你正在使用0.10你可以閱讀更多的Spark Streaming + Kafka Integration Guide 0.8

從您的代碼如下,請參考Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0

甚至認爲它是使用火花流API,一切都是卡夫卡性能控制,取決於您在屬性文件中指定的組ID,您可以啓動具有不同組ID的多個流。

乾杯!

+1

我在兩個消費者中都使用了相同的組ID,因此只有一個消費者正在接收消息。具有不同group.id的消費者訂閱同一主題,並行/分別接收消息。 – Gurubg

+0

是的,如果您使用相同的組ID,那麼只有一個會收到該消息。 –

1

消費者數量[在消費羣體下],不能超過主題中的分區數量。如果您想要並行使用這些消息,那麼您需要引入適當數量的分區並創建接收器來處理每個分區。

+0

讓兩個消費羣體在同一消費羣體下擁有兩個分區有什麼區別? – Gurubg

+0

我的意思是卡夫卡分區。如果您的Kafka主題中有兩個分區,並且想要並行處理這些消息,則可以引入一組消費者[該消費者組中的消費者數量不應超過正在使用的主題中的分區數。]消費者組由消費者組ID標識。如果兩個消費者羣體具有相同的羣組ID,那麼Kafka會假定這兩個消費羣體都是一個羣體。如果您的應用程序使用相同的代碼,則嘗試更改第二個應用程序的kafkaParams.put(「group.id」,「test-consumer-group1」)。 –