1
我有一個卡夫卡消費者,目前與配置反序列化的Java對象
public class KryoPOJODeserializer<T> implements Deserializer<T> {
private Kryo kryo = new Kryo();
@Override
public void configure(Map props, boolean isKey) {
kryo.setInstantiatorStrategy(new DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
kryo.register(Arrays.asList("").getClass(), new ArraysAsListSerializer());
}
@Override
public T deserialize(String topic, byte[] data) {
// Deserialize the serialized object.
return kryo.readObject(new Input(data), T.class);
}
@Override
public void close() {
}
}
什麼我無法弄清楚,是否有可能爲不同的主題重複使用相同的消費者(每個主題都有不同類型的POJO)?如果我的消費者配置是:
kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KryoPOJODeserializer.class.getName());
或者,我必須爲每個主題都有一個單獨的消費者嗎?
另外,我是否必須刪除我的反序列化器的泛型部分,總是返回一個對象,並將該對象轉換爲客戶端代碼中的相關POJO?例如:
public class KryoPOJODeserializer implements Deserializer {
private Kryo kryo = new Kryo();
@Override
public void configure(Map props, boolean isKey) {
kryo.setInstantiatorStrategy(new DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
kryo.register(Arrays.asList("").getClass(), new ArraysAsListSerializer());
}
@Override
public Object deserialize(String topic, byte[] data) {
// Deserialize the serialized object.
return kryo.readClassAndObject(new Input(new ByteArrayInputStream(data)));
}
@Override
public void close() {
}
}
後者會工作,但它感覺有點髒。
任何建議表示讚賞!
因此,我猜想有(最壞的情況下)像我擁有卡夫卡話題一樣多的'KafkaConsumer'消費者是可以接受的?我有點擔心Kafka消費者相當重量級,類似於Hibernate SessionFactory。如果那不會讓生活變得更容易! – Matt
是的,這是可以接受的 - 消費者(不管消息傳遞技術)通常被編寫來處理一種特定的有效載荷類型,但總是有例外。至於卡夫卡消費者是否「沉重」,取決於你的意思。消費者將維護與正在消費消息的所有經紀人的TCP連接。除此之外,沒有線程池在後臺執行任務。 –