2016-06-14 39 views
0

我對Kafka非常陌生,今天我嘗試創建Java Producer,用於在不同分區上的Kafka主題上生成消息。Kafka Producer Java API不會將消息分發到所有主題分區

首先,我創建了一個包raggieKafka,其中我創建了2個類:TestProducerSimplePartitioner

TestProducer類有下面的代碼:

package raggieKafka; 

import java.io.BufferedReader; 
import java.io.InputStreamReader; 
import java.util.*; 
import kafka.javaapi.producer.Producer; 
import kafka.producer.KeyedMessage; 
import kafka.producer.ProducerConfig; 

public class TestProducer{ 

    public static void main(String args[]) throws Exception 
    { 
     long events = 0; 

     BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); 
     events = Integer.parseInt(reader.readLine()); 
     Random rnd = new Random(); 

     Properties props = new Properties(); 
     props.put("metadata.broker.list", "localhost:9092"); 
     props.put("topic.metadata.refresh.interval.ms", "1"); 
     props.put("serializer.class", "kafka.serializer.StringEncoder"); 
     props.put("partitioner.class", "raggieKafka.SimplePartitioner"); 
     props.put("request.required.acks", "1"); 

     ProducerConfig config = new ProducerConfig(props); 
     Producer<String, String> prod = new Producer<String, String>(config); 

     for(long i = 0; i < events; i++) 
     { 
      long runtime = new Date().getTime(); 
      String ip = "192.168.2." + rnd.nextInt(255); 
      String msg = runtime + ",www.example.com, " + ip; 
      KeyedMessage<String,String> data = new KeyedMessage<String, String>("page_visits", ip, msg); 
      prod.send(data); 
     } 
     prod.close(); 
    } 
} 

SimplePartitioner類有下面的代碼:

package raggieKafka; 

import kafka.producer.Partitioner; 
import kafka.utils.VerifiableProperties; 

public class SimplePartitioner implements Partitioner{ 

    public SimplePartitioner(VerifiableProperties props) 
    { 

    } 

    public int partition(Object Key, int a_numPartitions) 
    { 
     int partition = 0; 
     String stringKey = (String) Key; 
     int offset = stringKey.indexOf(stringKey); 

     if(offset > 0) 
     { 
      partition = Integer.parseInt(stringKey.substring(offset+1)) % a_numPartitions; 
     } 
     return partition; 
    } 
} 

編譯我對卡夫卡經紀人創建話題,這些Java程序之前:

C:\kafka_2.11-0.9.0.1>.\bin\windows\kafka-topics.bat --create --topic page_visit 
s --zookeeper localhost:2181 --partitions 5 --replication-factor 1 
WARNING: Due to limitations in metric names, topics with a period ('.') or under 
score ('_') could collide. To avoid issues it is best to use either, but not bot 
h. 
Created topic "page_visits". 

現在當我c在java程序中,它將所有消息放入只有1個分區,即page_visits-0,在該分區下發布所有消息,但所有其他分區保持空白。

有人能告訴我爲什麼我的Java製作者不會將我所有的消息分發給其他分區嗎?

逸岸,我看着就谷歌,然後添加一個更多的財產:

props.put("topic.metadata.refresh.interval.ms", "1"); 

但仍生產者沒有生產消息的所有主題。

請幫助。

回答

2

你SimplePartitioner代碼有錯誤的下面一行

int offset = stringKey.indexOf(stringKey); 

它總是返回0所以你總是偏移等於0和它永遠不會大於0您如果塊不再執行。最後它總是返回你的分區0

解決方案:由於您的密鑰是IP地址,因此以下更改可按預期工作。

int offset = stringKey.lastIndexOf('.'); 

希望這有助於!

+0

非常感謝你AVR。我做了一個多麼愚蠢的錯誤,這使我幾乎瘋了。再次感謝修正。乾杯。 –

相關問題