2017-03-02 78 views
0

我有以下應用程序(我對這個框架很陌生),並且我希望看到緩存大小(增加),因爲它從隊列中讀取消息,但它保持爲0時間。如何從Ignite Cache正確讀取

KafkaStreamer<String, String, String> kafkaStreamer = new KafkaStreamer<>(); 

    Ignition.setClientMode(true); 

    Ignite ignite = Ignition.start(); 

    Properties settings = new Properties(); 
    // Set a few key parameters 
    settings.put("bootstrap.servers", "localhost:9092"); 
    settings.put("group.id", "test"); 
    settings.put("zookeeper.connect", "localhost:2181"); 
    settings.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
    settings.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    settings.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
    settings.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 

    // Create an instance of StreamsConfig from the Properties instance 
    kafka.consumer.ConsumerConfig config = new ConsumerConfig(settings); 

    IgniteCache<String, String> cache = ignite.getOrCreateCache("myCache"); 

    IgniteDataStreamer<String, String> stmr = ignite.dataStreamer("myCache"); 
    // allow overwriting cache data 
    stmr.allowOverwrite(true); 

    kafkaStreamer.setIgnite(ignite); 
    kafkaStreamer.setStreamer(stmr); 

    // set the topic 
    kafkaStreamer.setTopic("test"); 

    // set the number of threads to process Kafka streams 
    kafkaStreamer.setThreads(1); 

    // set Kafka consumer configurations 
    kafkaStreamer.setConsumerConfig(config); 

    // set decoders 
    StringDecoder keyDecoder = new StringDecoder(null); 
    StringDecoder valueDecoder = new StringDecoder(null); 

    kafkaStreamer.setKeyDecoder(keyDecoder); 
    kafkaStreamer.setValueDecoder(valueDecoder); 

    kafkaStreamer.start(); 

    while (true) { 

     System.out.println(cache.metrics().getSize()); 
     Thread.sleep(200); 
    } 

任何人都可以告訴什麼是缺失/錯誤?

謝謝!

回答

1

可能你沒有消耗足夠的條目來填充IgniteDataStreamer緩衝區。嘗試設置刷新超時:

stmr.autoFlushFrequency(1000); 
0

由於性能原因,度量標準默認處於禁用狀態。您可以在配置文件中使用CacheConfiguration.setStatisticsEnabled(true)statisticsEnabled屬性啓用指標。