我是新的卡夫卡。我的要求是,我有兩個數據庫源和目標表。現在我想從源表中獲取數據並將其存儲到這些kafka之間的目標中,並將作爲生產者和消費者使用。我已經完成了代碼,但問題在於,當生產者生成數據時,有些數據會錯過生成。例如,如果我在源表中有100條記錄,那麼它不會生成全部100條記錄。我使用的卡夫卡0.10Apache Kafka:生產者不生產所有數據
MyProducer Config-
bootstrap.servers=192.168.1.XXX:9092,192.168.1.XXX:9093,192.168.1.XXX:9094
acks=all
retries=2
batch.size=16384
linger.ms=2
buffer.memory=33554432
key.serializer=org.apache.kafka.common.serialization.IntegerSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
我的製片代碼: -
public void run() {
SourceDAO sourceDAO = new SourceDAO();
Source source;
int id;
try {
logger.debug("INSIDE RUN");
List<Source> listOfEmployee = sourceDAO.getAllSource();
Iterator<Source> sourceIterator = listOfEmployee.iterator();
String sourceJson;
Gson gson = new Gson();
while(sourceIterator.hasNext()) {
source = sourceIterator.next();
sourceJson = gson.toJson(source);
id = source.getId();
producerRecord = new ProducerRecord<Integer, String>(TOPIC, id, sourceJson);
producerRecords.add(producerRecord);
}
for(ProducerRecord<Integer, String> record : producerRecords) {
logger.debug("Producer Record: " + record.value());
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
logger.debug("Exception: " + exception);
if (exception != null)
throw new RuntimeException(exception.getMessage());
logger.info("The offset of the record we just sent is: " + metadata.offset()
+ " In Partition : " + metadata.partition());
}
});
}
producer.close();
producer.flush();
logger.info("Size of Record: " + producerRecords.size());
} catch (SourceServiceException e) {
logger.error("Unable to Produce data...", e);
throw new RuntimeException("Unable to Produce data...", e);
}
}
我的消費配置: -
bootstrap.servers=192.168.1.XXX:9092,192.168.1.231:XXX,192.168.1.232:XXX
group.id=consume
client.id=C1
enable.auto.commit=true
auto.commit.interval.ms=1000
max.partition.fetch.bytes=10485760
session.timeout.ms=35000
consumer.timeout.ms=35000
auto.offset.reset=earliest
message.max.bytes=10000000
key.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
value.deserializer =組織。 apache.kafka.common.serialization.StringDeserializer
消費者代碼: -
public void doWork() {
logger.debug("Inside doWork of DestinationConsumer");
DestinationDAO destinationDAO = new DestinationDAO();
consumer.subscribe(Collections.singletonList(this.TOPIC));
while(true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);
int minBatchSize = 1;
for(ConsumerRecord<String, String> rec : consumerRecords) {
logger.debug("Consumer Recieved Record: " + rec);
consumerRecordsList.add(rec);
}
logger.debug("Record Size: " + consumerRecordsList.size());
if(consumerRecordsList.size() >= minBatchSize) {
try {
destinationDAO.insertSourceDataIntoDestination(consumerRecordsList);
} catch (DestinationServiceException e) {
logger.error("Unable to update destination table");
}
}
}
}
你能說什麼消息丟失?第一個?最後一個?隨機? – TobiSH