我正在使用Kafka自定義分區程序類。這裏我試圖將數據推送到單獨的分區。 我卡夫卡製作類:我的Kafka自定義分區程序類錯誤
import java.util.Date;
import java.util.Properties;
import java.util.Random;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class KafkaCustomPartitioner {
public static void main(String[] args) {
long events = Long.parseLong(args[0]);
int blocks = Integer.parseInt(args[1]);
Random rnd = new Random();
Properties props = new Properties();
props.put("metadata.broker.list", "localhost:9092");
props.put("serializer.class","kafka.serializer.StringEncoder");
props.put("key.serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class","com.kafka.partdecider.CustomPartitioner");
props.put("producer.type", "sync");
props.put("request.required.acks","1");
ProducerConfig config = new ProducerConfig(props);
Producer producer = new Producer(config);
for(int nBlocks=0; nBlocks<blocks; nBlocks++) {
for(long nEvents=0; nEvents<events; nEvents++) {
long runTime = new Date().getTime();
String msg = runTime + ": " + (50+nBlocks) + ": " + nEvents + ": " + rnd;
KeyedMessage<String, String> data = new KeyedMessage<String, String>("CustPartTopic",String.valueOf(nBlocks),msg);
producer.send(data);
}
}
producer.close();
}
}
客戶分區程序類:
import kafka.producer.Partitioner;
public class CustomPartitioner implements Partitioner {
public int partition(Object key, int arg1) {
String receivingkey = (String) key;
long id = Long.parseLong(receivingkey);
return (int) (id%arg1);
}
}
該項目的論證部分的值有:3 2 我得到「ArrayOutOfBoundsException」在這一行,如果我運行的類:
Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 0
at com.kafka.custompartitioner.KafkaCustomPartitioner.main(KafkaCustomPartitioner.java:13)
顯示在該行的錯誤:long events = Long.parseLong(args[0]);
但我不明白爲什麼該行給出錯誤。 任何人都可以讓我知道我該如何解決這個問題?
就問,你是卡夫卡0.8.2之前版本卡夫卡的工作?您正在使用非常舊的API。 – ppatierno
我正在使用版本0.10(kafka_2.11-0.10.2.0)。 你能告訴我什麼是我需要爲最新版本所做的更正?這是我的pom.xml \t \t org.apache.kafka \t kafka_2.11 \t 0.9.0.0 \t –
Sidhartha