4

我正在使用火花流與卡夫卡主題。主題由5個分區創建。我的所有消息都使用tablename作爲鍵發佈到kafka主題。 鑑於此,我假設該表的所有消息都應該轉到相同的分區。 但我注意到在同一個表的火花日誌消息有時去執行者的節點1和有時去執行者的節點2。卡夫卡主題分區和Spark執行者映射

我在紗線集羣模式下使用以下命令運行代碼:

spark-submit --name DataProcessor --master yarn-cluster --files /opt/ETL_JAR/executor-log4j-spark.xml,/opt/ETL_JAR/driver-log4j-spark.xml,/opt/ETL_JAR/application.properties --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=driver-log4j-spark.xml" --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=executor-log4j-spark.xml" --class com.test.DataProcessor /opt/ETL_JAR/etl-all-1.0.jar 

和本次提交造成1名司機讓我們說在節點1和2個執行人在節點1和節點2。

我不希望節點1和節點2的執行者讀取同一個分區。但是這種情況正在發生

還試過下面的配置來指定使用者組,但沒有區別。

kafkaParams.put("group.id", "app1"); 

這是我們如何使用createDirectStream方法 *不經過動物管理員創建流。

HashMap<String, String> kafkaParams = new HashMap<String, String>(); 
    kafkaParams.put("metadata.broker.list", brokers); 
    kafkaParams.put("auto.offset.reset", "largest"); 
    kafkaParams.put("group.id", "app1"); 

     JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
       jssc, 
       String.class, 
       String.class, 
       StringDecoder.class, 
       StringDecoder.class, 
       kafkaParams, 
       topicsSet 
     ); 

完整代碼:

import java.io.Serializable; 
import java.util.Arrays; 
import java.util.HashMap; 
import java.util.HashSet; 
import java.util.Iterator; 

import org.apache.commons.lang3.StringUtils; 
import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.function.Function; 
import org.apache.spark.api.java.function.VoidFunction; 
import org.apache.spark.streaming.Durations; 
import org.apache.spark.streaming.api.java.JavaDStream; 
import org.apache.spark.streaming.api.java.JavaPairInputDStream; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 
import org.apache.spark.streaming.api.java.JavaStreamingContextFactory; 
import org.apache.spark.streaming.kafka.KafkaUtils; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

import kafka.serializer.StringDecoder; 
import scala.Tuple2; 

public class DataProcessor2 implements Serializable { 
    private static final long serialVersionUID = 3071125481526170241L; 

    private static Logger log = LoggerFactory.getLogger("DataProcessor"); 

    public static void main(String[] args) { 
     final String sparkCheckPointDir = ApplicationProperties.getProperty(Consts.SPARK_CHECKPOINTING_DIR); 
     DataProcessorContextFactory3 factory = new DataProcessorContextFactory3(); 
     JavaStreamingContext jssc = JavaStreamingContext.getOrCreate(sparkCheckPointDir, factory); 

     // Start the process 
     jssc.start(); 
     jssc.awaitTermination(); 
    } 

} 

class DataProcessorContextFactory3 implements JavaStreamingContextFactory, Serializable { 
    private static final long serialVersionUID = 6070911284191531450L; 

    private static Logger logger = LoggerFactory.getLogger(DataProcessorContextFactory.class); 

    DataProcessorContextFactory3() { 
    } 

    @Override 
    public JavaStreamingContext create() { 
     logger.debug("creating new context..!"); 

     final String brokers = ApplicationProperties.getProperty(Consts.KAFKA_BROKERS_NAME); 
     final String topic = ApplicationProperties.getProperty(Consts.KAFKA_TOPIC_NAME); 
     final String app = "app1"; 
     final String offset = ApplicationProperties.getProperty(Consts.KAFKA_CONSUMER_OFFSET, "largest"); 

     logger.debug("Data processing configuration. brokers={}, topic={}, app={}, offset={}", brokers, topic, app, 
       offset); 
     if (StringUtils.isBlank(brokers) || StringUtils.isBlank(topic) || StringUtils.isBlank(app)) { 
      System.err.println("Usage: DataProcessor <brokers> <topic>\n" + Consts.KAFKA_BROKERS_NAME 
        + " is a list of one or more Kafka brokers separated by comma\n" + Consts.KAFKA_TOPIC_NAME 
        + " is a kafka topic to consume from \n\n\n"); 
      System.exit(1); 
     } 
     final String majorVersion = "1.0"; 
     final String minorVersion = "3"; 
     final String version = majorVersion + "." + minorVersion; 
     final String applicationName = "DataProcessor-" + topic + "-" + version; 
     // for dev environment 
     SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName(applicationName); 
     // for cluster environment 
     //SparkConf sparkConf = new SparkConf().setAppName(applicationName); 
     final long sparkBatchDuration = Long 
       .valueOf(ApplicationProperties.getProperty(Consts.SPARK_BATCH_DURATION, "10")); 

     final String sparkCheckPointDir = ApplicationProperties.getProperty(Consts.SPARK_CHECKPOINTING_DIR); 

     JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(sparkBatchDuration)); 
     logger.debug("setting checkpoint directory={}", sparkCheckPointDir); 
     jssc.checkpoint(sparkCheckPointDir); 

     HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topic.split(","))); 

     HashMap<String, String> kafkaParams = new HashMap<String, String>(); 
     kafkaParams.put("metadata.broker.list", brokers); 
     kafkaParams.put("auto.offset.reset", offset); 
     kafkaParams.put("group.id", "app1"); 

//   @formatter:off 
      JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
        jssc, 
        String.class, 
        String.class, 
        StringDecoder.class, 
        StringDecoder.class, 
        kafkaParams, 
        topicsSet 
      ); 
//   @formatter:on 
     processRDD(messages, app); 
     return jssc; 
    } 

    private void processRDD(JavaPairInputDStream<String, String> messages, final String app) { 
     JavaDStream<MsgStruct> rdd = messages.map(new MessageProcessFunction()); 

     rdd.foreachRDD(new Function<JavaRDD<MsgStruct>, Void>() { 

      private static final long serialVersionUID = 250647626267731218L; 

      @Override 
      public Void call(JavaRDD<MsgStruct> currentRdd) throws Exception { 
       if (!currentRdd.isEmpty()) { 
        logger.debug("Receive RDD. Create JobDispatcherFunction at HOST={}", FunctionUtil.getHostName()); 
        currentRdd.foreachPartition(new VoidFunction<Iterator<MsgStruct>>() { 

         @Override 
         public void call(Iterator<MsgStruct> arg0) throws Exception { 
          while(arg0.hasNext()){ 
           System.out.println(arg0.next().toString()); 
          } 
         } 
        }); 
       } else { 
        logger.debug("Current RDD is empty."); 
       } 
       return null; 
      } 
     }); 
    } 
    public static class MessageProcessFunction implements Function<Tuple2<String, String>, MsgStruct> { 
     @Override 
     public MsgStruct call(Tuple2<String, String> data) throws Exception { 
      String message = data._2(); 
      System.out.println("message:"+message); 
      return MsgStruct.parse(message); 
     } 

    } 
    public static class MsgStruct implements Serializable{ 
     private String message; 
     public static MsgStruct parse(String msg){ 
      MsgStruct m = new MsgStruct(); 
      m.message = msg; 
      return m; 
     } 
     public String toString(){ 
      return "content inside="+message; 
     } 
    } 

} 

回答

3

使用DirectStream做法是正確的假設,即發送到卡夫卡分區的消息將在同一分區星火降落。

我們不能假設每個Spark分區都會由同一個Spark worker每次處理。在每個批次間隔上,爲每個分區創建每個分區的OffsetRange的Spark任務併發送到集羣進行處理,登陸某個可用的工作人員。

你在找分區的地方。唯一的partition locality that the direct kafka consumer supports是包含正在處理的偏移量範圍的kafka主機,此時您的Spark和Kafka展開處於同一位置;但這是一種我不常看到的部署拓撲。

如果您的要求規定需要擁有主機位置,您應該查看Apache SamzaKafka Streams

3

根據Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher),您可以指定分區的明確映射到主機

假設您有兩臺主機(h1和h2),而卡夫卡主題topic-name有三個分區。以下關鍵代碼將向您展示如何將指定分區映射到Java中的主機。

Map<TopicPartition, String> partitionMapToHost = new HashMap<>(); 
// partition 0 -> h1, partition 1 and 2 -> h2 
partitionMapToHost.put(new TopicPartition("topic-name", 0), "h1"); 
partitionMapToHost.put(new TopicPartition("topic-name", 1), "h2"); 
partitionMapToHost.put(new TopicPartition("topic-name", 2), "h2"); 
List<String> topicCollection = Arrays.asList("topic-name"); 
Map<String, Object> kafkaParams = new HasMap<>(); 
kafkaParams.put("bootstrap.servers", "10.0.0.2:9092,10.0.0.3:9092"); 
kafkaParams.put("group.id", "group-id-name"); 
kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
JavaInputDStream<ConsumerRecord<String, String>> records = KafkaUtils.createDirectStream(jssc, 
    LocationStrategies.PreferFixed(partitionMapToHost), // PreferFixed is the key 
    ConsumerStrategies.Subscribe(topicCollection, kafkaParams)); 

您還可以使用LocationStrategies.PreferConsistent(),其均勻地分佈在所有可用的執行人分區,並確保指定的分區只能由指定的遺囑執行人消耗。