我想檢索我的RDD的每個記錄上的每個卡夫卡偏移量,在foreachRDD方法中。我在我的主題中有一個分區,所以我的RDD也得到了一個分區。我basicaly嘗試somethind這樣的:附加kafka偏移量foreachRDD中的每個記錄
dStream.foreachRDD { rdd =>
if (!rdd.isEmpty) {
//get offset first value of the offset
val firstOffset = rdd.asInstanceOf[HasOffsetRanges].offsetRanges(0).fromOffset
val rddWithOffset = rdd.map(_.value)
.zipWithIndex()
.map{ case (v,i) => (v,i + firstOffset)}
}
}
在我的製片人爲例我送使用一個循環的郵件,我將在一個名爲位置這樣的列的索引:
+------+-----+--------+
| name| age|position|
+------+-----+--------+
|johnny| 26| 1|
| chloe| 42| 2|
| brian| 19| 3|
| eliot| 35| 4|
+------+-----+--------+
不幸的是我注意到,當我在我的消費者中添加膠印列時,訂單不會保持不變:
+------+-----+--------+------+
| name| age|position|offset|
+------+-----+--------+------+
|johnny| 26| 1| 1|
| chloe| 42| 2| 3|
| brian| 19| 3| 4|
| eliot| 35| 4| 2|
+------+-----+--------+------+
看起來我似乎鬆散了此流程的訂單。 你有什麼想法嗎?由於
順便說一句,我的Java製片人是這樣的:
KafkaRestProducer<String, Object> producer = new KafkaRestProducer<>(props);
ArrayList<String> names = new ArrayList<String>()
names.add("johnny")
names.add("chloe")
names.add("brian")
names.add("eliot")
ArrayList<Integer> ages = ArrayList<Integer>()
names.add(26)
names.add(42)
names.add(19)
names.add(35)
for (int i = 0; i < 3; ++i) {
String name = names(i)
Int age = ages(i)
Person person = Person
.newBuilder()
.setName(name)
.setAge(age)
.setPosition(i)
.build();
ProducerRecord<String, Object> record = new ProducerRecord<>("/apps/PERSON/streams:myTopic", name, person);
producer.send(record, null);
System.out.println(i);
}
你是什麼意思與 「棄秩序」?你觀察到什麼,它與你所期望的有什麼不同? – maasg
感謝您的評論,我編輯我的問題添加一個例子來說明我是如何鬆散的順序。你有什麼主意嗎? –
你對卡夫卡主題有多少個分區? – maasg