我仍然懷疑kafka ZOOKEPER_AUTO_RESET.I在這方面見過很多問題。請原諒,如果這是一個重複的查詢。瞭解kafka zookeper自動重置
我有一個高層次的Java消費者不斷消費。 我確實有多個主題,所有主題都有一個分區。
我的關注點在下面。
我開始使用消費者組名稱爲「ncdev1」和ZOOKEPER_AUTO_RESET = smallest
的consumerkafka.jar。可以觀察到,init偏移量被設置爲-1。然後,我在某個時間後停止/啓動罐子。此時,它會選擇分配給用戶組(ncdev1)的最新偏移量,即36。在某段時間後,我再次重新啓動,然後將initoffset設置爲39.哪一個是最新值。然後我將組名改爲ZOOKEPER_GROUP_ID = ncdev2
。然後重新啓動jar文件,這次又將offset設置爲-1。在進一步的重新啓動,它躍升至最新值即39
然後我設置了
ZOOKEPER_AUTO_RESET=largest
和ZOOKEPER_GROUP_ID = ncdev3
然後試圖重新啓動與組名ncdev3 jar文件。它在重新啓動時選擇偏移的方式沒有區別。這是它重新啓動時選擇39,這與以前的配置相同。
做偏移形成beginning.Any其他配置上爲什麼不選擇任何想法,使其從一開始讀?(從What determines Kafka consumer offset?最大和最小的理解)
由於提前
已添加代碼
public class ConsumerForKafka {
private final ConsumerConnector consumer;
private final String topic;
private ExecutorService executor;
ServerSocket soketToWrite;
Socket s_Accept ;
OutputStream s1out ;
DataOutputStream dos;
static boolean logEnabled ;
static File fileName;
private static final Logger logger = Logger.getLogger(ConsumerForKafka.class);
public ConsumerForKafka(String a_zookeeper, String a_groupId, String a_topic,String session_timeout,String auto_reset,String a_commitEnable) {
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
createConsumerConfig(a_zookeeper, a_groupId,session_timeout,auto_reset,a_commitEnable));
this.topic =a_topic;
}
public void run(int a_numThreads,String a_zookeeper, String a_topic) throws InterruptedException, IOException {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(a_numThreads));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
String socketURL = PropertyUtils.getProperty("SOCKET_CONNECT_HOST");
int socketPort = Integer.parseInt(PropertyUtils.getProperty("SOCKET_CONNECT_PORT"));
Socket socks = new Socket(socketURL,socketPort);
//****
String keeper = a_zookeeper;
String topic = a_topic;
long millis = new java.util.Date().getTime();
//****
PrintWriter outWriter = new PrintWriter(socks.getOutputStream(), true);
List<KafkaStream<byte[], byte[]>> streams = null;
// now create an object to consume the messages
//
int threadNumber = 0;
// System.out.println("going to forTopic value is "+topic);
boolean keepRunningThread =false;
boolean chcek = false;
logger.info("logged");
BufferedWriter bw = null;
FileWriter fw = null;
if(logEnabled){
fw = new FileWriter(fileName, true);
bw = new BufferedWriter(fw);
}
for (;;) {
streams = consumerMap.get(topic);
keepRunningThread =true;
for (final KafkaStream stream : streams) {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while(keepRunningThread)
{
try{
if (it.hasNext()){
if(logEnabled){
String data = new String(it.next().message())+""+"\n";
bw.write(data);
bw.flush();
outWriter.print(data);
outWriter.flush();
consumer.commitOffsets();
logger.info("Explicit commit ......");
}else{
outWriter.print(new String(it.next().message())+""+"\n");
outWriter.flush();
}
}
// logger.info("running");
} catch(ConsumerTimeoutException ex) {
keepRunningThread =false;
break;
}catch(NullPointerException npe){
keepRunningThread =true;
npe.printStackTrace();
}catch(IllegalStateException ile){
keepRunningThread =true;
ile.printStackTrace();
}
}
}
}
}
private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId,String session_timeout,String auto_reset,String commitEnable) {
Properties props = new Properties();
props.put("zookeeper.connect", a_zookeeper);
props.put("group.id", a_groupId);
props.put("zookeeper.session.timeout.ms", session_timeout);
props.put("zookeeper.sync.time.ms", "2000");
props.put("auto.offset.reset", auto_reset);
props.put("auto.commit.interval.ms", "60000");
props.put("consumer.timeout.ms", "30");
props.put("auto.commit.enable",commitEnable);
//props.put("rebalance.max.retries", "4");
return new ConsumerConfig(props);
}
public static void main(String[] args) throws InterruptedException {
String zooKeeper = PropertyUtils.getProperty("ZOOKEEPER_URL_PORT");
String groupId = PropertyUtils.getProperty("ZOOKEPER_GROUP_ID");
String session_timeout = PropertyUtils.getProperty("ZOOKEPER_SESSION_TIMOUT_MS"); //6400
String auto_reset = PropertyUtils.getProperty("ZOOKEPER_AUTO_RESET"); //smallest
String enableLogging = PropertyUtils.getProperty("ENABLE_LOG");
String directoryPath = PropertyUtils.getProperty("LOG_DIRECTORY");
String log4jpath = PropertyUtils.getProperty("LOG_DIR");
String commitEnable = PropertyUtils.getProperty("ZOOKEPER_COMMIT"); //false
PropertyConfigurator.configure(log4jpath);
String socketURL = PropertyUtils.getProperty("SOCKET_CONNECT_HOST");
int socketPort = Integer.parseInt(PropertyUtils.getProperty("SOCKET_CONNECT_PORT"));
try {
Socket socks = new Socket(socketURL,socketPort);
boolean connected = socks.isConnected() && !socks.isClosed();
if(connected){
//System.out.println("Able to connect ");
}else{
logger.info("Not able to conenct to socket ..Exiting...");
System.exit(0);
}
} catch (UnknownHostException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
} catch(java.net.ConnectException cne){
logger.info("Not able to conenct to socket ..Exitring...");
System.exit(0);
}
catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
// String zooKeeper = args[0];
// String groupId = args[1];
String topic = args[0];
int threads = 1;
logEnabled = Boolean.parseBoolean(enableLogging);
if(logEnabled)
createDirectory(topic,directoryPath);
ConsumerForKafka example = new ConsumerForKafka(zooKeeper, groupId, topic, session_timeout,auto_reset,commitEnable);
try {
example.run(threads,zooKeeper,topic);
} catch(java.net.ConnectException cne){
cne.printStackTrace();
System.exit(0);
}
catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private static void createDirectory(String topic,String d_Path) {
try{
File file = new File(d_Path);
if (!file.exists()) {
if (file.mkdir()) {
logger.info("Directory Created" +file.getPath());
} else {
logger.info("Directory Creation failed");
}
}
fileName = new File(d_Path + topic + ".log");
if (!fileName.exists()) {
fileName.createNewFile();
}
}catch(IOException IOE){
//logger.info("IOException occured during Directory or During File creation ");
}
}
}
沒有名爲'ZOOKEPER_AUTO_RESET'的配置。你的意思是'auto.offset.reset'? – amethystic
是的,它是'auto.offset.reset'。用於分享錯誤的配置名稱.Apologies。 –
如果您將'auto.offset.reset'設置爲最早但仍然發現消費者總是從最新的偏移量中讀取數據,請粘貼代碼。 – amethystic