2016-02-29 78 views
5

我目前使用Kafka 0.9.0.1。根據我發現的一些消息來源,設置消息大小的方法是修改server.properties中的以下關鍵值。如何在卡夫卡設置郵件的大小?

  • message.max.bytes
  • replica.fetch.max.bytes
  • fetch.message.max.bytes

server.properties文件實際上有這些設置。

message.max.bytes=10485760 
replica.fetch.max.bytes=20971520 
fetch.message.max.bytes=10485760 

其他可能相關的設置如下。

socket.send.buffer.bytes=102400 
socket.receive.buffer.bytes=102400 
socket.request.max.bytes=104857600 

然而,當我嘗試使用的尺寸爲4〜6 MB的有效載荷發送消息,消費者從來沒有得到任何消息。生產者似乎發送消息時沒有拋出任何異常。如果我確實發送更小的有效載荷(如< 1 MB),那麼消費者確實會收到消息。

有關我在配置設置方面做錯了的任何想法?

以下是發送消息的示例代碼。

Producer<String, byte[]> producer = new KafkaProducer<>(getProducerProps()); 
File dir = new File("/path/to/dir"); 
for(String s : dir.list()) { 
    File f = new File(dir, s); 
    byte[] data = Files.readAllBytes(f.toPath()); 
    Payload payload = new Payload(data); //a simple pojo to store payload 
    String key = String.valueOf(System.currentTimeMillis()); 
    byte[] val = KryoUtil.toBytes(payload); //custom util to use kryo to get bytes[] 
    producer.send(new ProducerRecord<>("test", key, val)); 
} 
producer.close(); 

以下是接收消息的示例代碼。

KafkaConsumer consumer = new KafkaConsumer<>(getConsumerProps()); 
consumer.subscribe(Arrays.asList("test")); 
while(true) { 
    ConsumerRecord<String, byte[]> records = consumer.poll(100); 
    for(ConsumerRecord<String, byte[]> record : records) { 
    long offset = record.offset(); 
    String key = record.key(); 
    byte[] val = record.value(); 
    Payload payload = (Payload)KryoUtil.toObject(val, Payload.class); //custom util to use kryo to deserialize back to object 
    System.out.println(
     System.format("offset=%d, key=%s", offset, key)); 
    } 
} 

以下是爲生產者和消費者填充屬性文件的方法。

public static Properties getProducerProps() { 
    Properties props = new Properties(); 
    props.put("bootstrap.servers", "qc1:9092,qc2:9092,qc3:9092,qc4:9092"); 
    props.put("acks", "all"); 
    props.put("retries", 0); 
    props.put("batch.size", 16384); 
    props.put("linger.ms", 1); 
    props.put("buffer.memory", 33554432); 
    props.put("compression.type", "snappy"); 
    props.put("max.request.size", 10485760); //need this 
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
    props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); 
    return props; 
} 

public static Properties getConsumerProps() { 
    Properties props = new Properties(); 
    props.put("bootstrap.servers", "qc1:9092,qc2:9092,qc3:9092,qc4:9092"); 
    props.put("group.id", "test"); 
    props.put("enable.auto.commit", "true"); 
    props.put("auto.commit.interval.ms", "1000"); 
    props.put("session.timeout.ms", "30000"); 
    props.put("max.partition.fetch.bytes", 10485760); //need this too 
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); 
    return props; 
} 

回答

7

簡, 不要使用fetch.message.max.bytes首先是因爲這是從消費者和server.properties文件中並沒有去第二次,因爲是老版本的用戶的屬性,而不是 使用max.partition.fetch.bytes當您創建消費者作爲您用於實例化它的屬性的一部分。

+0

我只是試過,但我得到了同樣的效果。沒有收到「大」文件。我想知道他們是否被髮送,因爲當消費者開始閱讀主題時,偏移量是連續的(例如1,2,3等)。對我而言,似乎製片人甚至可能不會發送大文件? –

+0

事實證明,我需要爲生產者設置'max.request.size',爲消費者設置'max.partition.fetch.bytes'。我會用代碼修改一下,看看是否真的需要'max.partition.fetch.bytes'。 –

+0

是的,事實證明我確實需要這兩個設置。如果我沒有設置'max.partition.fetch.bytes',那麼我得到一個'RecordTooLargeException'。 –