我已經建立了一個包含3個經紀人和3個分區的主題的實驗Kafka環境。我有一個生產者和一個消費者。我想修改特定用戶的分區偏移量。我在kafka文檔中讀到kafka中的消費者提交/獲取API可以提交特定的偏移量或獲取消費者讀取的最新偏移量。這裏是鏈接的API:kafka消費者提取API不返回正確的偏移值
我用的代碼從下面的頁面寫我的代碼,以便從一個特定的消費取偏移。但是,獲取API會爲請求的偏移量返回「-1」的值。這裏是示例代碼:
https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
我也讀出第一鏈接「如果沒有偏移與消費羣下一個主題分區的經紀人不設置錯誤代碼(相關,因爲它是不是真的錯誤),但返回空元數據並將偏移字段設置爲-1。「
但是,我已經產生了一些消息,我的消費者已經消費了消息並輸出了每個讀取消息的偏移量。
如果有人能幫到這個,我會非常樂意。我想知道我的代碼的哪個部分是錯誤的。或者也許這個API有問題。請不要猶豫,提出任何有用的意見。我的代碼與我提供的鏈接中的代碼完全相同。但是,如果你需要看我的代碼,請告訴我把它放在這裏。
卡夫卡版本是0.10.2.0
我卡夫卡的配置是:
經紀人1:端口9093
經紀人2:端口9094
經紀人3:端口9095
主題:「testpic3」
.......... ............
消費者配置:
props.put("group.id", "test");
props.put("client.id", "MyConsumer");
................
這裏是我的代碼:
public class KafkaOffsetManage {
public static void main(String[] args) {
BlockingChannel channel = new BlockingChannel("localhost", 9095,
BlockingChannel.UseDefaultBufferSize(),
BlockingChannel.UseDefaultBufferSize(),
5000 /* read timeout in millis */);
channel.connect();
final String MY_GROUP = "test";
final String MY_CLIENTID = "MyConsumer";
int correlationId = 0;
final TopicAndPartition testPartition0 = new TopicAndPartition("testpic3",0);
final TopicAndPartition testPartition1 = new TopicAndPartition("testpic3",1);
final TopicAndPartition testPartition2 = new TopicAndPartition("testpic3",2);
channel.send(new ConsumerMetadataRequest(MY_GROUP, ConsumerMetadataRequest.CurrentVersion(), correlationId++, MY_CLIENTID));
ConsumerMetadataResponse metadataResponse = ConsumerMetadataResponse.readFrom(channel.receive().buffer());
System.out.println("+++++++++++++++++++++++++++");
System.out.println(metadataResponse.errorCode());
if (metadataResponse.errorCode() == ErrorMapping.NoError()) {
Broker offsetManager = metadataResponse.coordinator();
// if the coordinator is different, from the above channel's host then reconnect
channel.disconnect();
channel = new BlockingChannel(offsetManager.host(), offsetManager.port(),
BlockingChannel.UseDefaultBufferSize(),
BlockingChannel.UseDefaultBufferSize(),
5000 /* read timeout in millis */);
channel.connect();
System.out.println("Connected to Offset Manager");
System.out.println(offsetManager.host() + ", Port:"+ offsetManager.port());
} else {
// retry (after backoff)
}
// How to fetch offsets
List<TopicAndPartition> partitions = new ArrayList<TopicAndPartition>();
partitions.add(testPartition0);
//partitions.add(testPartition1);
OffsetFetchRequest fetchRequest = new OffsetFetchRequest(
MY_GROUP,
partitions,
(short) 2 /* version */, // version 1 and above fetch from Kafka, version 0 fetches from ZooKeeper
correlationId,
MY_CLIENTID);
try {
channel.send(fetchRequest.underlying());
OffsetFetchResponse fetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer());
OffsetMetadataAndError result = fetchResponse.offsets().get(testPartition0);
short offsetFetchErrorCode = result.error();
if (offsetFetchErrorCode == ErrorMapping.NotCoordinatorForConsumerCode()) {
channel.disconnect();
// Go to step 1 and retry the offset fetch
} else if (offsetFetchErrorCode == ErrorMapping.OffsetsLoadInProgressCode()) {
// retry the offset fetch (after backoff)
} else {
long retrievedOffset = result.offset();
String retrievedMetadata = result.metadata();
System.out.println("The retrieved offset is:"+ Long.toString(retrievedOffset));
System.out.println(retrievedMetadata);
System.out.println(result.toString());
}
}
catch (Exception e) {
channel.disconnect();
// Go to step 1 and then retry offset fetch after backoff
}
}
}
代碼的輸出是在這裏:
+++++++++++++++++++++++++++
0
Connected to Offset Manager
user-virtual-machine, Port:9093
------------------------
The retrieved offset is:-1
OffsetMetadataAndError[-1,,3]
Process finished with exit code 0
一個奇怪的事情我有關卡夫卡的依賴關係。當我加入這個依賴,我的代碼不會在節目中認識了一些類:
<artifactId>kafka_2.10</artifactId>
<version>0.10.2.0</version>
班「ConsumerMetadataRequest」和「ConsumerMetadataResponse」無法識別。
所以我添加這種依賴性,而不是:
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.0</version>
謝謝
請確保提供代碼的相關部分。您所包含的鏈接可能會在某些時候停止工作,使您的問題變得毫無用處。 –
非常感謝Patrick。這裏是我的代碼 – Farhad9660