2017-08-04 37 views
0

我想檢索我的RDD的每個記錄上的每個卡夫卡偏移量,在foreachRDD方法中。我在我的主題中有一個分區,所以我的RDD也得到了一個分區。我basicaly嘗試somethind這樣的:附加kafka偏移量foreachRDD中的每個記錄

dStream.foreachRDD { rdd => 
    if (!rdd.isEmpty) { 
    //get offset first value of the offset 
    val firstOffset = rdd.asInstanceOf[HasOffsetRanges].offsetRanges(0).fromOffset 
    val rddWithOffset = rdd.map(_.value) 
     .zipWithIndex() 
     .map{ case (v,i) => (v,i + firstOffset)} 
    } 
} 

在我的製片人爲例我送使用一個循環的郵件,我將在一個名爲位置這樣的列的索引:

+------+-----+--------+ 
| name| age|position| 
+------+-----+--------+ 
|johnny| 26|  1| 
| chloe| 42|  2| 
| brian| 19|  3| 
| eliot| 35|  4| 
+------+-----+--------+ 

不幸的是我注意到,當我在我的消費者中添加膠印列時,訂單不會保持不變:

+------+-----+--------+------+ 
| name| age|position|offset| 
+------+-----+--------+------+ 
|johnny| 26|  1|  1| 
| chloe| 42|  2|  3| 
| brian| 19|  3|  4| 
| eliot| 35|  4|  2| 
+------+-----+--------+------+ 

看起來我似乎鬆散了此流程的訂單。 你有什麼想法嗎?由於

順便說一句,我的Java製片人是這樣的:

KafkaRestProducer<String, Object> producer = new KafkaRestProducer<>(props); 

ArrayList<String> names = new ArrayList<String>() 
names.add("johnny") 
names.add("chloe") 
names.add("brian") 
names.add("eliot") 

ArrayList<Integer> ages = ArrayList<Integer>() 
names.add(26) 
names.add(42) 
names.add(19) 
names.add(35) 

for (int i = 0; i < 3; ++i) { 

    String name = names(i) 
    Int age = ages(i)  
    Person person = Person 
     .newBuilder() 
     .setName(name) 
     .setAge(age) 
     .setPosition(i) 
     .build(); 

    ProducerRecord<String, Object> record = new ProducerRecord<>("/apps/PERSON/streams:myTopic", name, person); 

    producer.send(record, null); 
    System.out.println(i); 
} 
+0

你是什麼意思與 「棄秩序」?你觀察到什麼,它與你所期望的有什麼不同? – maasg

+0

感謝您的評論,我編輯我的問題添加一個例子來說明我是如何鬆散的順序。你有什麼主意嗎? –

+0

你對卡夫卡主題有多少個分區? – maasg

回答

0

我的英語很差。我用這個代碼:

val Array(brokers, topic, groupId) = args 
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "group.id" -> groupId) 
    val topicPartition = Map[TopicAndPartition, Long](TopicAndPartition(topic, 0) -> 1.toLong) 
    val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.offset, mmd.message) 
    val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (Long, String)](
     ssc, kafkaParams, topicPartition, messageHandler) 

    kafkaStream.foreachRDD(rdd => rdd.foreach(println)) 

輸出: (偏移,lineOfMessage) ...

+0

嗨,謝謝你的回答,但我沒有找到這個參數的構造函數createDiirectStream。你是什​​麼版本的卡夫卡? –

+0

我正在使用。 Spark 1.5.2,Kafka 0.8.2 –

相關問題