2016-11-02 130 views
1

我正在使用Apache Kafka。我創建了一個戰爭文件,其中生產者用Java編碼,用戶用Scala編碼。生產者從HTML頁面獲取數據。我可以看到生產者發佈的大部分數據都在消費者身上找到,但有些數據丟失了。Apache Kafka Java生產者Scala消費者缺少流

這裏是我的製片代碼

文件1

package com.cts.rest; 

import java.util.Properties; 

import kafka.producer.ProducerConfig; 

public class Configuration { 

static ProducerConfig setKafkaProducerParameter() { 
    Properties properties = new Properties(); 
    properties.put("zk.connect", "localhost:2181"); 
    properties.put("metadata.broker.list", "localhost:9092"); 
    properties.put("serializer.class", "kafka.serializer.StringEncoder"); 
    properties.put("acks", 0); 
    ProducerConfig producerConfig = new ProducerConfig(properties); 
    return producerConfig; 
    } 

}

文件2

package com.cts.rest; 

import kafka.javaapi.producer.Producer; 
import kafka.producer.KeyedMessage; 
import kafka.producer.ProducerConfig; 


public class RTTSKProducer { 

static void sendDataToProducer(String line){ 

    ProducerConfig producerConfig = configuration.setKafkaProducerParameter(); 
    Producer<String, String> producer = new Producer<String, String>(producerConfig);  

    String topic = "jsondata";  
    KeyedMessage<String, String> msg = new KeyedMessage<String, String>(topic, line); 
    System.out.print(msg); 
    producer.send(msg); 
    producer.close(); 
      } 
    } 

現在我檢查消費者的全光照的消息g以下命令。

bin\windows\kafka-console-consumer.bat --zookeeper localhost:2181 --topic jsondata --from-beginning 

我是否缺少任何生產者配置?

回答

1

您可以嘗試增加'acks'配置以確保更耐用。最重要的是,您應該使用回調函數調用「send」方法來處理那些未成功發佈到Kafka的消息,如下所示:

producer.send(myRecord, 
      new Callback() { 
       public void onCompletion(RecordMetadata metadata, Exception e) { 
        if(e != null) 
         e.printStackTrace(); 
        System.out.println("The offset of the record we just sent is: " + metadata.offset()); 
       } 
      });