我正在使用Kafka 0.8.0並嘗試實現下面提到的場景。消費者在Apache Kafka中使用消息的延遲
JCA API(用作生產商和將數據發送到)----->消費------> HBase的
我儘快發送的每個消息,以消費者爲我取使用JCA客戶端的數據。例如,只要生產者發送消息1,我想從消費者那裏獲取相同的信息,並將其放入HBase中。但是我的消費者在一些隨機的n消息之後開始提取消息。我想讓生產者和消費者保持同步,以便他們兩人開始一起工作。
我用:
1經紀人
1單一主題
1單曲製作和高層次的消費
任何人都可以建議我需要什麼做到這一點嗎?
編輯:
添加一些相關的代碼片段。
Consumer.java
public class Consumer extends Thread {
private final ConsumerConnector consumer;
private final String topic;
PrintWriter pw = null;
int t = 0;
StringDecoder kd = new StringDecoder(null);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
Map<String, List<KafkaStream<String, Signal>>> consumerMap;
KafkaStream<String, Signal> stream;
ConsumerIterator<String, Signal> it;
public Consumer(String topic) {
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
this.topic = topic;
topicCountMap.put(topic, new Integer(1));
consumerMap = consumer.createMessageStreams(topicCountMap, kd, new Serializer(
new VerifiableProperties()));
stream = consumerMap.get(topic).get(0);
it = stream.iterator();
}
private static ConsumerConfig createConsumerConfig() {
Properties props = new Properties();
props.put("zookeeper.connect", KafkaProperties.zkConnect);
props.put("group.id", KafkaProperties.groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("fetch.size", "1024");
return new ConsumerConfig(props);
}
synchronized public void run() {
while (it.hasNext()) {
t = (it.next().message()).getChannelid();
System.out.println("In Consumer received msg" + t);
}
}
}
producer.java
public class Producer {
public final kafka.javaapi.producer.Producer<String, Signal> producer;
private final String topic;
private final Properties props = new Properties();
public Producer(String topic)
{
props.put("serializer.class", "org.bigdata.kafka.Serializer");
props.put("key.serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list", "localhost:9092");
// Use random partitioner. Don't need the key type. Just set it to Integer.
// The message is of type userdefined Object .
producer = new kafka.javaapi.producer.Producer<String,Signal(newProducerConfig(props));
this.topic = topic;
}
}
KafkaProperties.java
public interface KafkaProperties {
final static String zkConnect = "127.0.0.1:2181";
final static String groupId = "group1";
final static String topic = "test00";
final static String kafkaServerURL = "localhost";
final static int kafkaServerPort = 9092;
final static int kafkaProducerBufferSize = 64 * 1024;
final static int connectionTimeOut = 100000;
final static int reconnectInterval = 10000;
final static String clientId = "SimpleConsumerDemoClient";
}
這是消費者如何處理前10條消息,而不是消費者收到消息的系統消息,但從第11條消息開始,它開始正常工作。
producer sending msg1
producer sending msg2
producer sending msg3
producer sending msg4
producer sending msg5
producer sending msg6
producer sending msg7
producer sending msg8
producer sending msg9
producer sending msg10
producer sending msg11
producer sending msg12
In Consumer received msg12
producer sending msg13
In Consumer received msg13
producer sending msg14
In Consumer received msg14
producer sending msg15
In Consumer received msg15
producer sending msg16
In Consumer received msg16
producer sending msg17
In Consumer received msg17
producer sending msg18
In Consumer received msg18
producer sending msg19
In Consumer received msg19
producer sending msg20
In Consumer received msg20
producer sending msg21
In Consumer received msg21
EDITED:將其中生產者將消息發送到消費者的收聽者的功能。而我使用的是默認的配置生產者沒有覆蓋它
public synchronized void onValueChanged(final MonitorEvent event_) {
// Get the value from the DBR
try {
final DBR dbr = event_.getDBR();
final String[] val = (String[]) dbr.getValue();
producer1.producer.send(new KeyedMessage<String, Signal>
(KafkaProperties.topic,new Signal(messageNo)));
System.out.println("producer sending msg"+messageNo);
messageNo++;
} catch (Exception ex) {
ex.printStackTrace();
}
}
你能顯示你的生產者和消費者代碼/配置嗎?看起來他們中的一些人使用批量操作(事實上這是好事)。 – Dmitry
@Dmitry添加了代碼片段。 – Ankita
消費者似乎沒問題(財產fetch.size = 1K除外 - 這意味着消費者無法收到更大的消息,但可能不是我們正在尋找的問題)。你可以分享生產者的newProducerConfig()和run()方法的代碼嗎? – Dmitry