2016-08-15 116 views
1

我有一個卡夫卡消費者,目前與配置反序列化的Java對象

public class KryoPOJODeserializer<T> implements Deserializer<T> { 

    private Kryo kryo = new Kryo(); 

    @Override 
    public void configure(Map props, boolean isKey) { 
     kryo.setInstantiatorStrategy(new DefaultInstantiatorStrategy(new StdInstantiatorStrategy())); 
     kryo.register(Arrays.asList("").getClass(), new ArraysAsListSerializer()); 
    } 

    @Override 
    public T deserialize(String topic, byte[] data) { 
     // Deserialize the serialized object. 
     return kryo.readObject(new Input(data), T.class); 
    } 

    @Override 
    public void close() { 

    } 

} 

什麼我無法弄清楚,是否有可能爲不同的主題重複使用相同的消費者(每個主題都有不同類型的POJO)?如果我的消費者配置是:

kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KryoPOJODeserializer.class.getName()); 

或者,我必須爲每個主題都有一個單獨的消費者嗎?

另外,我是否必須刪除我的反序列化器的泛型部分,總是返回一個對象,並將該對象轉換爲客戶端代碼中的相關POJO?例如:

public class KryoPOJODeserializer implements Deserializer { 

    private Kryo kryo = new Kryo(); 

    @Override 
    public void configure(Map props, boolean isKey) { 
     kryo.setInstantiatorStrategy(new DefaultInstantiatorStrategy(new StdInstantiatorStrategy())); 
     kryo.register(Arrays.asList("").getClass(), new ArraysAsListSerializer()); 
    } 

    @Override 
    public Object deserialize(String topic, byte[] data) { 
     // Deserialize the serialized object. 
     return kryo.readClassAndObject(new Input(new ByteArrayInputStream(data))); 
    } 

    @Override 
    public void close() { 

    } 

} 

後者會工作,但它感覺有點髒。

任何建議表示讚賞!

回答

3

您可以直接傳遞Deserializer情況下進入消費者使用你原來的做法:

KafkaConsumer<String, Foo> consumer = new KafkaConsumer<>(properties, 
    new StringDeserializer(), new KryoPOJODeserializer(Foo.class)); 

如果要重用一些話題同傳入的數據類型,那麼你可以建立一個訂閱者使用單個消費者的主題。如果您需要不同的對象類型的值,那麼您將需要使用多個使用者。

否則你的第二種方法也是有效的。

+0

因此,我猜想有(最壞的情況下)像我擁有卡夫卡話題一樣多的'KafkaConsumer'消費者是可以接受的?我有點擔心Kafka消費者相當重量級,類似於Hibernate SessionFactory。如果那不會讓生活變得更容易! – Matt

+1

是的,這是可以接受的 - 消費者(不管消息傳遞技術)通常被編寫來處理一種特定的有效載荷類型,但總是有例外。至於卡夫卡消費者是否「沉重」,取決於你的意思。消費者將維護與正在消費消息的所有經紀人的TCP連接。除此之外,沒有線程池在後臺執行任務。 –