2016-08-16 29 views
1

我在我的服務器機器上運行單節點kafka。 我使用以下命令來創建主題「bin/kafka-topics.sh --create --zookeeper localhost:2181 - 複製因子1 - 部分1 - 主題測試」。 我有兩個logstash實例正在運行。第一個從一些Java應用程序日誌文件讀取數據將同樣注入到kafka中。 它工作正常,我可以在控制檯上使用「bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning」命令查看數據。但是另一個從kafka(相同主題「測試」)讀入並注入elasicsearch的logstash實例失敗了。 Logstash的第二個實例無法讀取數據。我改變了它的配置文件從卡夫卡到閱讀和打印在控制檯上,那麼也不會輸出anything.Here是未能logstash配置文件:Logsatsh和kafka

// config file 
    input { 
     kafka { 
     zk_connect => "localhost:2181" 
     topic_id => "test" 
     } 
     } 
     output { 
     stdout{} 
     } 

Logstash沒有打印任何東西,也不拋出任何錯誤。 我正在使用Logstash 2.4和kafka 0.10。 我用卡夫卡快速入門指南(http://kafka.apache.org/documentation.html#quickstart

+0

你確定你有一個Zookeeper實例在localhost上運行嗎? – Val

回答

1

如果你看卡夫卡輸入plugin configuration,你可以看到一個重要的參數,它允許連接到卡夫卡集​​羣:zk_connect

根據文檔,它默認設置爲localhost:2181。確保將它設置爲Kafka集羣實例,或者理想情況下爲多個實例,具體取決於您的設置。

例如,假設您要連接到帶有JSON主題的三節點Kafka集羣。配置如下:

kafka { 
topic_id => "your_topic" 
zk_connect => "kc1.host:2181,kc2.host:2181,kc3.host:2181" 
} 

此外,重要的是配置主題的正確編解碼器。上面的例子將使用JSON事件。如果您使用Avro,則需要設置另一個參數 - codec。有關如何配置的詳細信息,請參閱on the documentation page。它基本上需要指向Avro模式文件,這可以作爲avsc文件或模式註冊表端點(在我看來更好的解決方案)給出。

如果您的架構註冊表在您的Kafka環境中運行,您可以將編解碼器指向它的url。一個完整的例子是:

kafka { 
codec => avro_schema_registry { endpoint => "http://kc1.host:8081"} 
topic_id => "your_topic" 
zk_connect => "kc1.host:2181,kc2.host:2181,kc3.host:2181" 
} 

希望它的作品!

+0

嗨wjp,我正在運行單節點kafka集羣,kafka中的數據是一些xmls。沒有Schema Registry正在運行。我檢查了zookeeper,它也在運行。 –

1
@wjp 
Hi wjp, I am running single node kafka cluster. There is no Schema Registry running. zookeeper is also running. I used following command to create topic "bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test". I have two logstash instances running. First one reads data from some java application log file inject the same to the kafka. It works fine, I can see data in kafka on console using "bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning" command. But the other logstash instance which reads from kafka(same topic "test") and injects into elasicsearch, is failing. This second instance of logstash fails to read data. I changed its configuration file to read from kafka and print on console, then also it does not output anything.Here is the config file for failing logstash: 
input { 
kafka { 
zk_connect => "localhost:2181" 
topic_id => "test" 
} 
} 
output { 
stdout{} 
} 
Logstash neither print anything nor it throws any error. 
I am using Logstash 2.4 and kafka 0.10. 
I used kafka quick start guide (http://kafka.apache.org/documentation.html#quickstart)