2013-09-26 98 views
5

我正在使用Kafka 0.8 beta,我只是試圖發送不同的對象,使用我自己的編碼器對它們進行序列化,並將它們發送到現有的代理配置。現在我正試圖讓DefaultEncoder工作。Apache Kafka默認編碼器不工作

我有代理和一切設置和工作的StringEncoder,但我無法得到任何其他數據類型,包括只是純粹的字節[],由經紀人發送和接收。

我給生產者代碼是:

import kafka.javaapi.producer.Producer; 
import kafka.producer.KeyedMessage; 
import kafka.producer.ProducerConfig; 

import java.util.Date; 
import java.util.Properties; 
import java.util.Random; 


public class ProducerTest { 
    public static void main(String[] args) { 
     long events = 5; 
     Random rnd = new Random(); 
     rnd.setSeed(new Date().getTime()); 
     Properties props = new Properties(); 
     props.setProperty("metadata.broker.list", "localhost:9093,localhost:9094"); 
     props.setProperty("serializer.class", "kafka.serializer.DefaultEncoder"); 
     props.setProperty("partitioner.class", "example.producer.SimplePartitioner"); 
     props.setProperty("request.required.acks", "1"); 
     props.setProperty("producer.type", "async"); 
     props.setProperty("batch.num.messages", "4"); 

     ProducerConfig config = new ProducerConfig(props); 
     Producer<byte[], byte[]> producer = new Producer<byte[], byte[]>(config); 
     for (long nEvents = 0; nEvents < events; nEvents++) { 
      byte[] a = "Hello".getBytes(); 
      byte[] b = "There".getBytes(); 

      KeyedMessage<byte[], byte[]> data = new KeyedMessage<byte[], byte[]>("page_visits", a, b); 
      producer.send(data); 
     } 
     try { 
      Thread.sleep(5000); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
     producer.close(); 
    } 
} 

我用同樣SimplePartitioner如在給定的here的例子,並且通過琴絃更換所有的字節數組和改變串行到kafka.serializer.StringEncoder工作完全。

僅供參考,SimplePartitioner:

import kafka.producer.Partitioner; 
import kafka.utils.VerifiableProperties; 

public class SimplePartitioner implements Partitioner<String> { 
    public SimplePartitioner (VerifiableProperties props) { 

    } 

    public int partition(String key, int a_numPartitions) { 
     int partition = 0; 
     int offset = key.lastIndexOf('.'); 
     if (offset > 0) { 
      partition = Integer.parseInt(key.substring(offset+1)) % a_numPartitions; 
     } 
     return partition; 
    } 

} 

我在做什麼錯?

回答

6

答案是分區類SimplePartitioner僅適用於字符串。當我嘗試異步運行生產者時,它會創建一個單獨的線程來處理編碼和分區,然後再發送給代理。當它認識到SimplePartitioner僅適用於字符串時,該線程遇到了障礙,但由於它是一個單獨的線程,因此不會引發異常,因此線程只是退出而沒有任何錯誤指示。

如果我們改變SimplePartitioner接受的byte [],例如:

import kafka.producer.Partitioner; 
import kafka.utils.VerifiableProperties; 

public class SimplePartitioner implements Partitioner<byte[]> { 
    public SimplePartitioner (VerifiableProperties props) { 

    } 

    public int partition(byte[] key, int a_numPartitions) { 
     int partition = 0; 
     return partition; 
    } 

} 

現在這完美的作品。

+0

您應該堅持爲partitioner.class屬性使用kafka.producer.DefaultPartitioner的默認值,而不是對分區器的特定返回值進行硬編碼。 – gazarsgo

+0

這是爲了試駕。但是,以下是默認分區程序不起作用的情況:假設您希望消息的特定子序列嚴格按其生成的順序使用。如果你使用默認分區程序,這會失敗,因爲默認情況下會使用密鑰的散列,這是不可預知的。相反,如果您編寫自己的定製分區程序,並且有一些方法可以檢測子序列,那麼我們可以將它們分配給同一個分區。這個確切的用例發生在我的應用程序中。 –