2017-03-08 56 views
0

我是卡夫卡新手。目前我正嘗試使用現有的程序(卡夫卡消費者)從卡夫卡提供者獲取數據。我能夠在一次獲取中獲取數據。卡夫卡消費者沒有獲取生產者共享的最新數據

但我擔心的是, 一旦我的消費者獲取過來,供應商可以再次共享新的數據集。 我如何確保我的客戶將獲取這些數據,一旦我的早期抓取完成,Provider就會發送這些數據。

請找到下面的代碼。

 import kafka.consumer.ConsumerConfig; 
     import kafka.consumer.KafkaStream; 
     import kafka.javaapi.consumer.ConsumerConnector; 

     import java.util.HashMap; 
     import java.util.List; 
     import java.util.Map; 
     import java.util.Properties; 
     import java.util.concurrent.ExecutorService; 
     import java.util.concurrent.Executors; 
     import java.util.concurrent.TimeUnit; 

     public class ConsumerGroupExample { 
      private final ConsumerConnector consumer; 
      private final String topic; 
      private ExecutorService executor; 

      public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) { 
       consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
         createConsumerConfig(a_zookeeper, a_groupId)); 
       this.topic = a_topic; 
      } 

      public void shutdown() { 
       if (consumer != null) consumer.shutdown(); 
       if (executor != null) executor.shutdown(); 
       try { 
        if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { 
         System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly"); 
        } 
       } catch (InterruptedException e) { 
        System.out.println("Interrupted during shutdown, exiting uncleanly"); 
       } 
      } 

      public void run(int a_numThreads) throws InterruptedException { 
       Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
       topicCountMap.put(topic, new Integer(a_numThreads)); 
       Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 

       //List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); 

       // System.out.println(streams.size()); 
       // now launch all the threads 
       // 
       executor = Executors.newFixedThreadPool(a_numThreads); 
       List<KafkaStream<byte[], byte[]>> streams = null; 
       // now create an object to consume the messages 
       // 
       int threadNumber = 0; 
    boolean keepRunningThread = false; 
    for (;;) { 

        streams = consumerMap.get(topic); 
        for (final KafkaStream stream : streams) { 
         keepRunningThread =true; 
         executor.submit(new ConsumerTest(stream, threadNumber,keepRunningThread)); 
         //threadNumber++; 

        } 
        //TimeUnit.MILLISECONDS.sleep(100); 
        //System.out.println("Going to sleep "); 
       } 

      private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) { 
       Properties props = new Properties(); 
       props.put("zookeeper.connect", a_zookeeper); 
       props.put("group.id", a_groupId); 
       props.put("zookeeper.session.timeout.ms", "1600"); 
       props.put("zookeeper.sync.time.ms", "200"); 
       props.put("consumer.timeout.ms","10"); 
       props.put("auto.offset.reset", "smallest"); 
       props.put("auto.commit.interval.ms", "1000"); 
       //props.put("key.deserializer", 
       //  "org.apache.kafka.common.serialization.StringDeserializer"); 
       //  props.put("value.deserializer", 
       //  "org.apache.kafka.common.serialization.StringDeserializer"); 

       return new ConsumerConfig(props); 
      } 

      public static void main(String[] args) throws InterruptedException { 
       String zooKeeper = args[0]; 
       String groupId = args[1]; 
       String topic = args[2]; 
       int threads = Integer.parseInt(args[3]); 

       ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic); 
       example.run(threads); 

       try { 
        Thread.sleep(10000); 
       } catch (InterruptedException ie) { 

       } 
       example.shutdown(); 
      } 
     } 






     import kafka.consumer.ConsumerIterator; 
     import kafka.consumer.KafkaStream; 

     public class ConsumerTest implements Runnable { 
      private KafkaStream m_stream; 
      private int m_threadNumber; 
     private boolean keepRunningThread 

      public ConsumerTest(KafkaStream a_stream, int a_threadNumber,boolean keepRunningThread) { 
       m_threadNumber = a_threadNumber; 
       m_stream = a_stream; 
keepRunningThread = keepRunningThread; 
      } 

      public void run() { 
       ConsumerIterator<byte[], byte[]> it = m_stream.iterator(); 
       while(keepRunningThread) 
{ 
    try 
    { 
    if(it.hasNext()) 
    { 
     System.out.println(new String(it.next().message())); 
    } 
    } 
    catch(ConsumerTimeoutException ex) 
    { 
    // Nothing serious timeout exception waiting for kafka message 
    } 
}‍‍‍‍‍‍‍‍‍‍ 

       // System.out.println("Shutting down Thread: " + m_threadNumber); 
      } 
     } 

回答

0

更改您的代碼以保持讀取並在沒有消息時處理超時。下面的代碼將繼續閱讀消息而不會阻止您的消費者您當前的代碼也將繼續閱讀,因爲(it.hasNext())但是它會阻止消費者。

刪除以下從main()中,因爲它會關閉10秒

  // try 
      // { 
      //  Thread.sleep(10000); 
      // } catch (InterruptedException ie) { 

      // } 
      // example.shutdown(); 

而且在消費者配置中添加consumer.timeout.ms後您的消費者,否則代碼將被阻止線。使用keepRunningThread標誌來控制,當你想退出消費者循環

while(keepRunningThread) 
{ 
    try 
    { 
    if(it.hasNext()) 
    { 
     System.out.println(new String(it.next().message())); 
    } 
    } 
    catch(ConsumerTimeoutException ex) 
    { 
    // Nothing serious timeout exception waiting for kafka message 
    // Wait for 5 seconds 
    Thread.sleep(5000); 
    } 
}‍‍‍‍‍‍‍‍‍‍ 

欲瞭解更多詳情,請(從卡夫卡的文檔複製粘貼)是指https://kafka.apache.org/07/documentation.html

consumer.timeout.ms:默認情況下,這個值是 - 1和消費者無限期阻止,如果沒有新消息可用於消費。通過將該值設置爲正整數,如果在指定的超時值之後沒有可用於消費的消息,則會向使用者拋出超時異常。

+0

如果該值設置爲-1,則消費者將被無限期阻止。這是否意味着,如果生產者發出消費者不能接收的新消息。 –

+0

此外,我還得到了代碼snnipet中提到的keepRunningThread的使用。共享 –

+0

consumer.timeout.ms不會停止使用消息,它只會指定在拋出ConsumerTimeoutException之前等待it.hasNext()的時間。這將防止掛起的消費者(如果消費者在主線上)。在你的情況下,它不是強制性的,但是有好處。 keepRunningThread是控制何時退出消費者循環的標誌。您可以使用它來控制何時停止使用(而不是在10秒後關閉消費者,您現在正在執行此操作)。 這些都是改進,而不是主要問題。主要問題是您在10秒後稱爲關機 – Kaushal