0

我們正在測試分佈式模式下的kafka連接以將主題記錄從kafka拉到HDFS。我們有兩個盒子。其中一個kafka和zookeeper守護進程正在運行。我們在這個盒子裏保存了一個kafka連接實例。我們有另一個存在HDFS namenode的地方。我們在這裏連接了另一個kafka實例。在分佈式模式下運行kafka連接時的問題

我們開始kafka,動物園管理員和kafka連接在第一個框中。我們也開始在第二個盒子中連接kafka。現在根據合併文檔,我們必須使用REST API啓動HDFS連接器(或者其他任何連接器)。因此,在這兩個框中啓動kafka連接後,我們嘗試通過REST API啓動連接器。我們嘗試下面的命令: -

curl -X POST -H "HTTP/1.1 Host: ip-10-16-34-57.ec2.internal:9092 Content-Type: application/json Accept: application/json" --data '{"name": "hdfs-sink", "config": {"connector.class":"io.confluent.connect.hdfs.HdfsSinkConnector", "format.class":"com.qubole.streamx.SourceFormat", "tasks.max":"1", "hdfs.url":"hdfs://ip-10-16-37-124:9000", "topics":"Prd_IN_TripAnalysis,Prd_IN_Alerts,Prd_IN_GeneralEvents", "partitioner.class":"io.confluent.connect.hdfs.partitioner.DailyPartitioner", "locale":"", "timezone":"Asia/Calcutta" }}' http://ip-10-16-34-57.ec2.internal:8083/connectors 

只要我們按下進入這裏,我們得到如下回應:

<html> 
    <head> 
    <meta http-equiv="Content-Type" content="text/html;charset=ISO-8859-1"/> 
    <title>Error 415 </title> 
    </head> 
    <body> 
    <h2>HTTP ERROR: 415</h2> 
    <p>Problem accessing /connectors. Reason: 
    <pre> Unsupported Media Type</pre></p> 
    <hr /><i><small>Powered by Jetty://</small></i> 
    </body> 
    </html> 

在等的connect-distributed.properties文件/卡夫卡/下面是在兩個kafka連接節點。我們也創建了上述三個主題(連接偏移量,連接配置,連接狀態)

bootstrap.servers=ip-10-16-34-57.ec2.internal:9092 
group.id=connect-cluster 
key.converter=com.qubole.streamx.ByteArrayConverter 
value.converter=com.qubole.streamx.ByteArrayConverter 
enable.auto.commit=true 
auto.commit.interval.ms=1000 
offset.flush.interval.ms=1000 
key.converter.schemas.enable=true 
value.converter.schemas.enable=true 
internal.key.converter=org.apache.kafka.connect.json.JsonConverter 
internal.value.converter=org.apache.kafka.connect.json.JsonConverter 
internal.key.converter.schemas.enable=false 
internal.value.converter.schemas.enable=false 
offset.storage.topic=connect-offsets 
rest.port=8083 
config.storage.topic=connect-configs 
status.storage.topic=connect-status 
offset.flush.interval.ms=10000 

這裏有什麼問題?我們是否缺少啓動kafka以分佈模式連接以使用HDFS連接器的東西? kafka連接在獨立模式下工作正常。

回答

相關問題