2017-04-25 32 views
0

我仍然懷疑kafka ZOOKEPER_AUTO_RESET.I在這方面見過很多問題。請原諒,如果這是一個重複的查詢。瞭解kafka zookeper自動重置

我有一個高層次的Java消費者不斷消費。 我確實有多個主題,所有主題都有一個分區。

我的關注點在下面。

我開始使用消費者組名稱爲「ncdev1」和ZOOKEPER_AUTO_RESET = smallest的consumerkafka.jar。可以觀察到,init偏移量被設置爲-1。然後,我在某個時間後停止/啓動罐子。此時,它會選擇分配給用戶組(ncdev1)的最新偏移量,即36。在某段時間後,我再次重新啓動,然後將initoffset設置爲39.哪一個是最新值。然後我將組名改爲ZOOKEPER_GROUP_ID = ncdev2。然後重新啓動jar文件,這次又將offset設置爲-1。在進一步的重新啓動,它躍升至最新值即39

然後我設置了
ZOOKEPER_AUTO_RESET=largestZOOKEPER_GROUP_ID = ncdev3

然後試圖重新啓動與組名ncdev3 jar文件。它在重新啓動時選擇偏移的方式沒有區別。這是它重新啓動時選擇39,這與以前的配置相同。

做偏移形成beginning.Any其他配置上爲什麼不選擇任何想法​​,使其從一開始讀?(從What determines Kafka consumer offset?最大和最小的理解)

由於提前

已添加代碼

public class ConsumerForKafka { 
    private final ConsumerConnector consumer; 
    private final String topic; 
    private ExecutorService executor; 
    ServerSocket soketToWrite; 
    Socket s_Accept ; 
    OutputStream s1out ; 
    DataOutputStream dos; 
    static boolean logEnabled ; 
    static File fileName; 


    private static final Logger logger = Logger.getLogger(ConsumerForKafka.class); 


    public ConsumerForKafka(String a_zookeeper, String a_groupId, String a_topic,String session_timeout,String auto_reset,String a_commitEnable) { 
     consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
       createConsumerConfig(a_zookeeper, a_groupId,session_timeout,auto_reset,a_commitEnable)); 
     this.topic =a_topic; 
    } 


    public void run(int a_numThreads,String a_zookeeper, String a_topic) throws InterruptedException, IOException { 
     Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
     topicCountMap.put(topic, new Integer(a_numThreads)); 
     Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 
     String socketURL = PropertyUtils.getProperty("SOCKET_CONNECT_HOST"); 
     int socketPort = Integer.parseInt(PropertyUtils.getProperty("SOCKET_CONNECT_PORT")); 
     Socket socks = new Socket(socketURL,socketPort);   

     //**** 
     String keeper = a_zookeeper; 
     String topic = a_topic; 

     long millis = new java.util.Date().getTime(); 

     //**** 

     PrintWriter outWriter = new PrintWriter(socks.getOutputStream(), true); 

     List<KafkaStream<byte[], byte[]>> streams = null; 
     // now create an object to consume the messages 
     // 
     int threadNumber = 0; 
     // System.out.println("going to forTopic value is "+topic); 
     boolean keepRunningThread =false; 
     boolean chcek = false; 
     logger.info("logged"); 
     BufferedWriter bw = null; 
     FileWriter fw = null; 
     if(logEnabled){ 
      fw = new FileWriter(fileName, true); 
      bw = new BufferedWriter(fw); 
     } 

     for (;;) { 


      streams = consumerMap.get(topic); 
      keepRunningThread =true; 

      for (final KafkaStream stream : streams) { 

       ConsumerIterator<byte[], byte[]> it = stream.iterator(); 

       while(keepRunningThread) 
       { 

       try{ 


        if (it.hasNext()){ 

         if(logEnabled){ 
          String data = new String(it.next().message())+""+"\n"; 
          bw.write(data); 
          bw.flush(); 
          outWriter.print(data); 
          outWriter.flush(); 
          consumer.commitOffsets(); 
          logger.info("Explicit commit ......"); 
         }else{ 

          outWriter.print(new String(it.next().message())+""+"\n"); 
          outWriter.flush(); 
         } 

        } 
        // logger.info("running"); 


       } catch(ConsumerTimeoutException ex) { 

        keepRunningThread =false; 
        break; 

        }catch(NullPointerException npe){ 

         keepRunningThread =true; 
         npe.printStackTrace(); 
        }catch(IllegalStateException ile){ 
         keepRunningThread =true; 
         ile.printStackTrace(); 
        } 

       } 

      } 

     } 
    } 

    private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId,String session_timeout,String auto_reset,String commitEnable) { 
     Properties props = new Properties(); 
     props.put("zookeeper.connect", a_zookeeper); 
     props.put("group.id", a_groupId); 
     props.put("zookeeper.session.timeout.ms", session_timeout); 
     props.put("zookeeper.sync.time.ms", "2000"); 
     props.put("auto.offset.reset", auto_reset); 
     props.put("auto.commit.interval.ms", "60000"); 
     props.put("consumer.timeout.ms", "30"); 
     props.put("auto.commit.enable",commitEnable); 
     //props.put("rebalance.max.retries", "4"); 


     return new ConsumerConfig(props); 
    } 

    public static void main(String[] args) throws InterruptedException { 

     String zooKeeper = PropertyUtils.getProperty("ZOOKEEPER_URL_PORT"); 
     String groupId = PropertyUtils.getProperty("ZOOKEPER_GROUP_ID"); 
     String session_timeout = PropertyUtils.getProperty("ZOOKEPER_SESSION_TIMOUT_MS"); //6400 
     String auto_reset = PropertyUtils.getProperty("ZOOKEPER_AUTO_RESET"); //smallest 
     String enableLogging = PropertyUtils.getProperty("ENABLE_LOG"); 
     String directoryPath = PropertyUtils.getProperty("LOG_DIRECTORY"); 
     String log4jpath = PropertyUtils.getProperty("LOG_DIR"); 
     String commitEnable = PropertyUtils.getProperty("ZOOKEPER_COMMIT"); //false 
     PropertyConfigurator.configure(log4jpath); 

     String socketURL = PropertyUtils.getProperty("SOCKET_CONNECT_HOST"); 
     int socketPort = Integer.parseInt(PropertyUtils.getProperty("SOCKET_CONNECT_PORT")); 
     try { 
      Socket socks = new Socket(socketURL,socketPort); 
      boolean connected = socks.isConnected() && !socks.isClosed(); 
      if(connected){ 
       //System.out.println("Able to connect "); 
      }else{ 
       logger.info("Not able to conenct to socket ..Exiting..."); 
       System.exit(0); 
      } 
     } catch (UnknownHostException e1) { 
      // TODO Auto-generated catch block 
      e1.printStackTrace(); 
     } catch(java.net.ConnectException cne){ 
      logger.info("Not able to conenct to socket ..Exitring..."); 
      System.exit(0); 
     } 
     catch (IOException e1) { 
      // TODO Auto-generated catch block 
      e1.printStackTrace(); 
     } 

     // String zooKeeper = args[0]; 
     // String groupId = args[1]; 
     String topic = args[0]; 
     int threads = 1; 

     logEnabled = Boolean.parseBoolean(enableLogging); 
     if(logEnabled) 
      createDirectory(topic,directoryPath); 

     ConsumerForKafka example = new ConsumerForKafka(zooKeeper, groupId, topic, session_timeout,auto_reset,commitEnable); 
     try { 
      example.run(threads,zooKeeper,topic); 
     } catch(java.net.ConnectException cne){ 
      cne.printStackTrace(); 
      System.exit(0); 
     } 
     catch (IOException e) { 
      // TODO Auto-generated catch block 

      e.printStackTrace(); 


     } 


    } 

    private static void createDirectory(String topic,String d_Path) { 

     try{ 
     File file = new File(d_Path); 
     if (!file.exists()) { 
      if (file.mkdir()) { 
       logger.info("Directory Created" +file.getPath()); 
      } else { 

       logger.info("Directory Creation failed"); 
      } 
     } 

     fileName = new File(d_Path + topic + ".log"); 
     if (!fileName.exists()) { 
      fileName.createNewFile(); 
     } 



     }catch(IOException IOE){ 
      //logger.info("IOException occured during Directory or During File creation "); 
     } 


    } 
} 
+0

沒有名爲'ZOOKEPER_AUTO_RESET'的配置。你的意思是'auto.offset.reset'? – amethystic

+0

是的,它是'auto.offset.reset'。用於分享錯誤的配置名稱.Apologies。 –

+0

如果您將'auto.offset.reset'設置爲最早但仍然發現消費者總是從最新的偏移量中讀取數據,請粘貼代碼。 – amethystic

回答

0

重新仔細閱讀您的文章後,我認爲您遇到的情況應如預期。

我開始使用consumerkafka.jar,消費者組名稱爲「ncdev1」和ZOOKEPER_AUTO_RESET =最小。可以觀察到,init偏移量被設置爲-1。然後,我在某個時間後停止/啓動罐子。此時,它選擇最新的偏移分配到消費者組(ncdev1),即36.

auto.offset.reset只有當不存在初始偏移,或者如果偏移超出範圍適用由於日誌中只有36條消息,因此消費者組可以非常快速地讀取所有這些記錄,這就是爲什麼您會看到消費者組每次重新啓動時都會選擇最新的偏移量。

+0

..謝謝。你能告訴我什麼是commitOffsets嗎? docs說,如果配置'auto.commit.enable'被設置爲false,它會明確提交它的提交偏移量。但在我的腦海裏,對於它的確切工作仍然存在一些疑問。例如,我的消費者消費了36個消息。在一個接一個迭代的時候,在試圖寫第21條消息時,我的tcp連接被關閉或者我的應用程序被終止了。在這種情況下,我的偏移值是20,如果是20,那麼在重新啓動時它會開始讀取從21日開始? –

+0

如果'auto.commit.enable'設置爲false,用戶會負責偏移提交的事情。您可以在任何地方存儲偏移量,例如數據庫,NoSQL存儲。但是如果你仍然想在Zookeeper或Kafka中存儲偏移量(設置'offsetsets.storage' = kafka),那麼手動調用'commitOffsets'會有所幫助。 – amethystic

+0

這就是爲什麼提交補償是必需的,卡夫卡承諾在消費者失敗的情況下「至少一次」的語義。至於承諾的抵消,它確實意味着「下一個要讀取的消息」。 – amethystic