使用獨立模式創建一個這樣的連接器和用戶自己定製的轉型:如何創建用於創建具有轉換的分佈式Kafka Connect實例的json?
name=rabbitmq-source
connector.class=com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector
tasks.max=1
rabbitmq.host=rabbitmq-server
rabbitmq.queue=answers
kafka.topic=net.gutefrage.answers
transforms=extractFields
transforms.extractFields.type=net.gutefrage.connector.transforms.ExtractFields$Value
transforms.extractFields.fields=body,envelope.routingKey
transforms.extractFields.structName=net.gutefrage.events
但是,對於一個分佈式連接器什麼是PUT請求的連接REST API的語法?我在文檔中找不到任何示例。
已經嘗試過像情侶的事情:
cat <<EOF >/tmp/connector
{
"name": "rabbitmq-source",
"config": {
"connector.class": "com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector",
"tasks.max": "1",
"rabbitmq.host": "rabbitmq-server",
"rabbitmq.queue": "answers",
"kafka.topic": "net.gutefrage.answers",
"transforms": "extractFields",
"transforms.extractFields": {
"type": "net.gutefrage.connector.transforms.ExtractFields$Value",
"fields": "body,envelope.routingKey",
"structName": "net.gutefrage.events"
}
}
}
EOF
curl -vs --stderr - -X POST -H "Content-Type: application/json" --data @/tmp/connector "http://localhost:8083/connectors"
rm /tmp/connector
,或者也並沒有工作:
{
"name": "rabbitmq-source",
"config": {
"connector.class": "com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector",
"tasks.max": "1",
"rabbitmq.host": "rabbitmq-server",
"rabbitmq.queue": "answers",
"kafka.topic": "net.gutefrage.answers",
"transforms": "extractFields",
"transforms.extractFields.type": "net.gutefrage.connector.transforms.ExtractFields$Value",
"transforms.extractFields.fields": "body,envelope.routingKey",
"transforms.extractFields.structName": "net.gutefrage.events"
}
}
在過去的變種我得到以下錯誤:
{"error_code":400,"message":"Connector configuration is invalid and contains the following 1 error(s):\nInvalid value class net.gutefrage.connector.transforms.ExtractFields for configuration transforms.extractFields.type: Error getting config definition from Transformation: null\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"}
請注意,使用屬性格式,它可以很好地工作(在快速數據開發中使用Landoops創建新的連接器UI。有趣的是,Landoop的UI功能「轉化爲捲曲」產生非常相同的JSON作爲我的第二個例子)
更新
可以肯定這不是一個問題,Landoop,碼頭工人和我的自定義轉換,我從COP 3.3.0
bin/connect-distributed etc/schema-registry/connect-avro-distributed.properties
它記錄 [2017-09-13 14:07:52,930] INFO Loading plugin from: /opt/connectors/confluent-oss-gf-assembly-1.0.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:176) [2017-09-13 14:07:53,711] INFO Registered loader: PluginClassLoader{pluginLocation=file:/opt/connectors/confluent-oss-gf-assembly-1.0.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:199) [2017-09-13 14:07:53,711] INFO Added plugin 'com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132) [2017-09-13 14:07:53,712] INFO Added plugin 'net.gutefrage.connector.transforms.ExtractFields$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132) [2017-09-13 14:07:53,712] INFO Added plugin 'net.gutefrage.connector.transforms.ExtractFields$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
開始飼養員,經紀人模式的註冊表和卡夫卡Connect在分佈模式與標準的分佈式特性 目前爲止都很好。然後,我創建了一個連接器的配置:
cat <<EOF >/tmp/connector
請注意,我現在使用標準(捆綁)提取物領域轉變。 當我張貼與
{ "name": "rabbitmq-source", "config": { "connector.class": "com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector", "tasks.max": "1", "rabbitmq.host": "rabbitmq-server", "rabbitmq.queue": "answers", "kafka.topic": "net.gutefrage.answers", "transforms": "extractFields", "transforms.extractFields.type": "org.apache.kafka.connect.transforms.ExtractField$Value", "transforms.extractFields.field": "body" } } EOF curl -vs --stderr - -X POST -H "Content-Type: application/json" --data @/tmp/connector "http://localhost:8083/connectors"
我得到同樣的
{"error_code":400,"message":"Connector configuration is invalid and contains the following 1 error(s):\nInvalid value class org.apache.kafka.connect.transforms.ExtractField for configuration transforms.extractFields.type: Error getting config definition from Transformation: null\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"}*
「沒有工作」 - >你有錯誤嗎?錯誤是什麼? –
@RobinMoffatt爲第二個示例添加了錯誤消息,我認爲這應該是正常工作的示例。再次檢查您的博客和第二個變種看起來不錯。 – longliveenduro