2017-03-03 58 views
0

我有,隨着每一次 初始化連接產生數據的一個卡夫卡生產類,這是耗時的過程, 因此使其更加快我想實施卡夫卡連接 池。我搜索了很多解決方案,但沒有找到合適的 one.Please重定向我正確solution.Thanks。我的卡夫卡生產者 類是:如何實現卡夫卡連接池一樣的JDBC連接池

import java.util.Properties; 
import org.apache.log4j.Logger; 
import com.bisil.report.queue.QueueDBFeederService; 

import kafka.javaapi.producer.Producer; 
import kafka.producer.KeyedMessage; 
import kafka.producer.ProducerConfig; 

@SuppressWarnings("deprecation") 
public class KafkaProducer1 implements ProducerService { 
      private static Producer<Integer, String> producer; 
      private static final String topic= "mytopic1"; 
      private Logger logger = Logger.getLogger(KafkaProducer1.class); 

@Override 
public void initialize() { 
     try { 
      Properties producerProps = new Properties(); 
      producerProps.put("metadata.broker.list", "192.168.21.182:9092"); 
      producerProps.put("serializer.class", "kafka.serializer.StringEncoder"); 
      producerProps.put("request.required.acks", "1"); 
      ProducerConfig producerConfig = new ProducerConfig(producerProps); 
      producer = new Producer<Integer, String>(producerConfig); 
     } catch (Exception e) { 
      logger.error("Exception while sending data to server "+e,e); 

     } 
     logger.info("Test Message"); 
    } 

    @Override 
public void publishMessage(String jsonPacket) { 
      KeyedMessage<Integer, String> keyedMsg = new KeyedMessage<Integer, String >(topic, jsonPacket); 
      producer.send(keyedMsg); 
     // This publishes message on given topic 
    } 

    @Override 
public void callMessage(String jsonPacket){ 
      initialize(); 
      // Publish message 
      publishMessage(jsonPacket); 
      //Close the producer 
      producer.close(); 

    } 

} 
+0

在initialize方法內部,您可以檢查生產者是否已經存在return如果已經初始化。完成所有處理/應用程序關閉後關閉生產者。我建議最好的方法是,使用依賴容器和注入獨立生產者使用依賴注入。 – Kaushal

回答

0

你可以把所有的消息在陣列中,反覆將其發佈到主題,然後關閉生產時done.This方式只有一次初始化和一次接近或破壞被稱爲。你可以做這樣的事情

String[] jsonPacket/// your message array 
for (long i = 0; i < jsonPacket.length; i++) { 
      producer.send(new KeyedMessage<Integer, String>(topic, jsonPacket[i])); 
     } 
     producer.close(); 
+0

其實情況是我有一個表單,當我創建新的或更新現有的時間跨度而不是連續的,數據存儲在jsonPacket(單個變量)中時,那麼您的建議對我的方案有效嗎? –

+0

Ahh ..在這種情況下,你可以檢查kaushal答案。你也可以把每幾秒鐘的應用程序更新,並寫入kafka話題。您需要根據您的應用程序設計進行實驗 –

0

如果我的理解是正確的,你需要製作的對象池可以是始終可用,當一個新的發佈請求時,等待其他請求,當任務完成後,你的要求可能匹配'對象池'(一個對象工廠與在Java中執行器框架工作(池)),這是由Apache公共實現的,因爲您需要從池中獲取KafkaProducer對象。在apache commons jar中實現並可用的對象池概念。 https://dzone.com/articles/creating-object-pool-java