2017-09-08 85 views
1

使用獨立模式創建一個這樣的連接器和用戶自己定製的轉型:如何創建用於創建具有轉換的分佈式Kafka Connect實例的json?

name=rabbitmq-source 
connector.class=com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector 
tasks.max=1 
rabbitmq.host=rabbitmq-server 
rabbitmq.queue=answers 
kafka.topic=net.gutefrage.answers 
transforms=extractFields 
transforms.extractFields.type=net.gutefrage.connector.transforms.ExtractFields$Value 
transforms.extractFields.fields=body,envelope.routingKey 
transforms.extractFields.structName=net.gutefrage.events 

但是,對於一個分佈式連接器什麼是PUT請求的連接REST API的語法?我在文檔中找不到任何示例。

已經嘗試過像情侶的事情:

cat <<EOF >/tmp/connector 
{ 
    "name": "rabbitmq-source", 
    "config": { 
    "connector.class": "com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector", 
    "tasks.max": "1", 
    "rabbitmq.host": "rabbitmq-server", 
    "rabbitmq.queue": "answers", 
    "kafka.topic": "net.gutefrage.answers", 
    "transforms": "extractFields", 
    "transforms.extractFields": { 
     "type": "net.gutefrage.connector.transforms.ExtractFields$Value", 
     "fields": "body,envelope.routingKey", 
     "structName": "net.gutefrage.events" 
    } 
    } 
} 
EOF 

curl -vs --stderr - -X POST -H "Content-Type: application/json" --data @/tmp/connector "http://localhost:8083/connectors" 
rm /tmp/connector 

,或者也並沒有工作:

{ 
    "name": "rabbitmq-source", 
    "config": { 
    "connector.class": "com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector", 
    "tasks.max": "1", 
    "rabbitmq.host": "rabbitmq-server", 
    "rabbitmq.queue": "answers", 
    "kafka.topic": "net.gutefrage.answers", 
    "transforms": "extractFields", 
    "transforms.extractFields.type": "net.gutefrage.connector.transforms.ExtractFields$Value", 
    "transforms.extractFields.fields": "body,envelope.routingKey", 
    "transforms.extractFields.structName": "net.gutefrage.events" 
    } 
} 

在過去的變種我得到以下錯誤:

{"error_code":400,"message":"Connector configuration is invalid and contains the following 1 error(s):\nInvalid value class net.gutefrage.connector.transforms.ExtractFields for configuration transforms.extractFields.type: Error getting config definition from Transformation: null\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"} 

請注意,使用屬性格式,它可以很好地工作(在快速數據開發中使用Landoops創建新的連接器UI。有趣的是,Landoop的UI功能「轉化爲捲曲」產生非常相同的JSON作爲我的第二個例子)

更新

可以肯定這不是一個問題,Landoop,碼頭工人和我的自定義轉換,我從COP 3.3.0

bin/connect-distributed etc/schema-registry/connect-avro-distributed.properties

它記錄 [2017-09-13 14:07:52,930] INFO Loading plugin from: /opt/connectors/confluent-oss-gf-assembly-1.0.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:176) [2017-09-13 14:07:53,711] INFO Registered loader: PluginClassLoader{pluginLocation=file:/opt/connectors/confluent-oss-gf-assembly-1.0.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:199) [2017-09-13 14:07:53,711] INFO Added plugin 'com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132) [2017-09-13 14:07:53,712] INFO Added plugin 'net.gutefrage.connector.transforms.ExtractFields$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132) [2017-09-13 14:07:53,712] INFO Added plugin 'net.gutefrage.connector.transforms.ExtractFields$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132) 開始飼養員,經紀人模式的註冊表和卡夫卡Connect在分佈模式與標準的分佈式特性 目前爲止都很好。然後,我創建了一個連接器的配置:

cat <<EOF >/tmp/connector
{ "name": "rabbitmq-source", "config": { "connector.class": "com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector", "tasks.max": "1", "rabbitmq.host": "rabbitmq-server", "rabbitmq.queue": "answers", "kafka.topic": "net.gutefrage.answers", "transforms": "extractFields", "transforms.extractFields.type": "org.apache.kafka.connect.transforms.ExtractField$Value", "transforms.extractFields.field": "body" } } EOF
請注意,我現在使用標準(捆綁)提取物領域轉變。 當我張貼與curl -vs --stderr - -X POST -H "Content-Type: application/json" --data @/tmp/connector "http://localhost:8083/connectors" 我得到同樣的

{"error_code":400,"message":"Connector configuration is invalid and contains the following 1 error(s):\nInvalid value class org.apache.kafka.connect.transforms.ExtractField for configuration transforms.extractFields.type: Error getting config definition from Transformation: null\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"}*

+0

「沒有工作」 - >你有錯誤嗎?錯誤是什麼? –

+0

@RobinMoffatt爲第二個示例添加了錯誤消息,我認爲這應該是正常工作的示例。再次檢查您的博客和第二個變種看起來不錯。 – longliveenduro

回答

1

確保$ Value in transforms.extractFields.type = net.gutefrage.connector.transforms.ExtractFields $ Value不會被bash命令cat解釋爲變量。它爲我工作。

0

對於使用連接器配置的JSON格式和CP連接CLI中,JQ工具必須安裝在機器上,其中卡夫卡連接集羣在跑。

E.g.對於Landoops快速數據的開發環境,你必須

docker exec rabbitmqconnect_fast-data-dev_1 apk add --no-cache jq 

那麼這將工作:

docker exec rabbitmqconnect_fast-data-dev_1 /opt/confluent-3.3.0/bin/confluent config rabbitmq-source -d /tmp/connector-config.json 

使用連接器REST端點時,雖然這不會解決問題。

1

如果你想在獨立模式下運行卡夫卡連接的工人,則必須啓動工作並提供員工配置文件一個或多個連接器配置文件。所有這些配置文件是Java屬性格式,所以你提供的第一配置示例是正確的格式:

name=rabbitmq-source 
connect.class=com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector 
tasks.max=1 
rabbitmq.host=rabbitmq-server 
rabbitmq.queue=answers 
kafka.topic=net.gutefrage.answers 
transforms=extractFields 
transforms.extractFields.type=net.gutefrage.connector.transforms.ExtractFields$Value 
transforms.extractFields.fields=body,envelope.routingKey 
transforms.extractFields.structName=net.gutefrage.events 

如果你想運行在卡夫卡連接工作者分佈模式,那麼你將不得不首先啓動分佈式工作器,然後使用the REST APIPUT請求以及帶有JSON文檔的/connectors端點,作爲第二步創建連接器。這JSON文件將匹配你是第二個JSON文件格式:

{ 
    "name": "rabbitmq-source", 
    "config": { 
    "connector.class": "com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector", 
    "tasks.max": "1", 
    "rabbitmq.host": "rabbitmq-server", 
    "rabbitmq.queue": "answers", 
    "kafka.topic": "net.gutefrage.answers", 
    "transforms": "extractFields", 
    "transforms.extractFields.type": "net.gutefrage.connector.transforms.ExtractFields$Value", 
    "transforms.extractFields.fields": "body,envelope.routingKey", 
    "transforms.extractFields.structName": "net.gutefrage.events" 
    } 
} 

匯合CLI,包括匯合的,包括卡夫卡開源平臺,是一個開發工具,以幫助通過運行一個動物園管理員可以快速上手實例,Kafka代理,Confluent模式註冊表,REST代理和分佈式模式下的Connect Worker。加載連接器時,可以將連接器配置指定爲JSON文件或屬性文件,然後使用jq將後者轉換爲JSON格式。

但是,報告的錯誤是:

{ 
    "error_code":400, 
    "message":"Connector configuration is invalid and contains the following 1 error(s):\nInvalid value class net.gutefrage.connector.transforms.ExtractFields for configuration transforms.extractFields.type: Error getting config definition from Transformation: null\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`" 
} 

此錯誤消息的重要組成部分,是「從改造錯誤獲取配置定義:空」。雖然這有點太神祕,但這意味着net.gutefrage.connector.transforms.ExtractFields Java類的config()方法返回null。

確保您指定的net.gutefrage.connector.transforms.ExtractFields$Value字符串是嵌套靜態類Value正確的完全合格的名稱,該Value類全面,正確實施org.apache.kafka.connect.transforms.Transformation<? extends ConnectRecord<R>>接口。請注意,config()方法必須返回一個非空ConfigDef對象。

查看與Apache Kafka一起發貨的Single Message Transform(SMT)的this example,或者查看其他示例的Robin's blog post

+0

感謝您的長時間答覆。我的問題不是很具體,但是對於JSON連接器格式,我使用的是分佈式模式。我在文中作了澄清。 其次,你提到PUT,但它必須是Landoop的POST。 第三,您忽略了我的自定義轉換已經在屬性格式中工作,無論是在獨立模式下還是在分佈式模式下的工作人員本身。所以我會爭辯說我的自定義轉換是有效的? – longliveenduro

+0

在這裏你可以看看:https://gist.github.com/breadfan/10aa3baeea15fd577b3bfac2f1494e2d。我知道這是不完整的,因爲它只允許遍歷結構,但這是正在進行的工作... – longliveenduro

+0

有趣的是,我得到了與標準相同的錯誤 {name}:「rabbitmq-source」, 「config」:{ 「connector.class」:「com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector」, 「tasks.max」:「1」, 「rabbitmq.host」:「rabbitmq-server 「, 」rabbitmq.queue「:」answers「, 」kafka.topic「:」net.gutefrage.answers「, 」transforms「:」extractFields「, 」transforms.extractFields.type「:」org.apache .kafka.connect.transforms.ExtractField $ Value「, 」transforms.extractFields.field「:」body「 } } – longliveenduro

0

隨着fast-data-dev,您可以建立任何連接器的JAR文件,然後就在

https://github.com/Landoop/fast-data-dev#enable-additional-connectors

的UI添加它與指令的類路徑會自動檢測新的連接器 - 併爲您提供說明當你點擊NEW新連接器在:

http://localhost:3030/kafka-connect-ui

什麼也將是值得嘗試 - 爲fast-data-dev與通用已經來到MQTT接收器連接器,正在嘗試。詳見說明書在http://docs.datamountaineer.com/en/latest/mqtt-sink.html

您將有效地需要做 connect.mqtt.kcql=INSERT INTO /answers SELECT body FROM net.gutefrage.answers

由於這是一個通用的MQTT連接器 - 可能你需要使用enable-additional-connectors說明

添加的RabbitMQ客戶端庫
+0

謝謝,但這與我的問題的本質無關:配置轉換的問題分佈式連接器 – longliveenduro