2016-12-14 78 views
1

我正在嘗試Ignite和Kafka集成,將kafka消息帶入Ignite高速緩存。Ignite和Kafka集成

我的消息密鑰是一個隨機字符串(要使用Ignite工作,卡夫卡消息密鑰不能爲空),並且該值是用於人(java類)

當點燃接收JSON字符串表示這樣的消息,看起來Ignite將使用消息的密鑰(在我的情況下是隨機字符串)作爲緩存密鑰。

是否可以將消息密鑰更改爲該人員的ID,以便我可以將其放入緩存中。

看起來是streamer.receiver(new StreamReceiver)是可行的

 streamer.receiver(new StreamReceiver<String, String>() { 
      public void receive(IgniteCache<String, String> cache, Collection<Map.Entry<String, String>> entries) throws IgniteException { 
       for (Map.Entry<String, String> entry : entries) { 
        Person p = fromJson(entry.getValue()); 
        //ignore the message key,and use person id as the cache key 
        cache.put(p.getId(), p); 
       } 
      } 
     }); 

這是推薦的方法是什麼?我不確定在StreamReceiver中調用cache.put是否是一種正確的方法,因爲它只是寫入緩存之前的預處理步驟。

回答

2

數據流化器會將您的所有密鑰映射到高速緩存親和節點,創建批次條目並將批次發送到親和節點。在它StreamReceiver將收到您的條目後,獲得Person的ID並調用cache.put(K, V)。將條目引導爲將您的密鑰映射到相應的緩存關聯節點並將更新請求發送到此節點。

一切看起來不錯。但是從Kafka映射你的隨機密鑰的結果和映射Person的ID的結果將是不同的(最可能是不同的節點)。因此,由於冗餘網絡跳數,您的性能會變差。

不幸的是,當前的KafkaStreamer實現不支持流元組提取器(請參閱,例如StreamSingleTupleExtractor類)。但是您可以使用現有的卡夫卡流式傳輸實例輕鬆創建自己的卡夫卡流式傳輸實現。

您也可以嘗試使用KafkaStreamerkeyDecodervalDecoder以便從Kafka消息中提取Person的ID。我不確定,但它可以幫助。

+0

感謝@a_gura的幫助! – Tom