我有,隨着每一次 初始化連接產生數據的一個卡夫卡生產類,這是耗時的過程, 因此使其更加快我想實施卡夫卡連接 池。我搜索了很多解決方案,但沒有找到合適的 one.Please重定向我正確solution.Thanks。我的卡夫卡生產者 類是:如何實現卡夫卡連接池一樣的JDBC連接池
import java.util.Properties;
import org.apache.log4j.Logger;
import com.bisil.report.queue.QueueDBFeederService;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
@SuppressWarnings("deprecation")
public class KafkaProducer1 implements ProducerService {
private static Producer<Integer, String> producer;
private static final String topic= "mytopic1";
private Logger logger = Logger.getLogger(KafkaProducer1.class);
@Override
public void initialize() {
try {
Properties producerProps = new Properties();
producerProps.put("metadata.broker.list", "192.168.21.182:9092");
producerProps.put("serializer.class", "kafka.serializer.StringEncoder");
producerProps.put("request.required.acks", "1");
ProducerConfig producerConfig = new ProducerConfig(producerProps);
producer = new Producer<Integer, String>(producerConfig);
} catch (Exception e) {
logger.error("Exception while sending data to server "+e,e);
}
logger.info("Test Message");
}
@Override
public void publishMessage(String jsonPacket) {
KeyedMessage<Integer, String> keyedMsg = new KeyedMessage<Integer, String >(topic, jsonPacket);
producer.send(keyedMsg);
// This publishes message on given topic
}
@Override
public void callMessage(String jsonPacket){
initialize();
// Publish message
publishMessage(jsonPacket);
//Close the producer
producer.close();
}
}
在initialize方法內部,您可以檢查生產者是否已經存在return如果已經初始化。完成所有處理/應用程序關閉後關閉生產者。我建議最好的方法是,使用依賴容器和注入獨立生產者使用依賴注入。 – Kaushal