2017-09-20 184 views
2

我正試圖將來自Kafka avro主題的數據吸收到事先創建的現有Oracle數據庫表中。 我以分佈模式運行Kafka Connect(3名工人)。 當我通過REST提交新連接器時,它會創建一個連接器,一項新任務,但任務立即失敗。 不明白爲什麼?以下是任務錯誤和我的配置。Kafka將JDBC接收器連接到Oracle任務失敗

任務錯誤

{"state":"FAILED","trace":"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. 
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:457) 
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:251) 
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180) 
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148) 
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146) 
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190) 
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) 
at java.util.concurrent.FutureTask.run(Unknown Source) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
at java.lang.Thread.run(Unknown Source)","id":0,"worker_id":"kafka_host02:8083"} 

卡夫卡連接配置

bootstrap.servers=kafka_host01:9092,kafka_host02:9092,kafka_host03:9092 
group.id=connect-cluster-00 
key.converter=io.confluent.connect.avro.AvroConverter 
key.converter.schema.registry.url=http://kafka_host01:8081 
value.converter=io.confluent.connect.avro.AvroConverter 
value.converter.schema.registry.url=http://kafka_host01:8081 
key.converter.schemas.enable=true 
value.converter.schemas.enable=true 
internal.key.converter=org.apache.kafka.connect.json.JsonConverter 
internal.value.converter=org.apache.kafka.connect.json.JsonConverter 
internal.key.converter.schemas.enable=false 
internal.value.converter.schemas.enable=false 
offset.storage.topic=connect-offsets 
offset.storage.replication.factor=3 
config.storage.topic=connect-configs 
config.storage.replication.factor=3 
status.storage.topic=connect-status 
status.storage.replication.factor=3 
offset.flush.interval.ms=10000 
rest.port=8083 

連接器提交命令

curl -X POST -H "Content-Type: application/json" --data '{"name": "heat-predict-ora-sink", "config": {"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max":"1", "topics":"MY-TEST-02_topic", "connection.user":"scott", "connection.password":"tiger", "connection.url":"jdbc:oracle:thin:@orahost.localdomain:1521:orcl", "key.converter":"io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url":"http://localhost:8081", "value.converter":"io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url":"http://localhost:8081", "insert.mode":"insert", "batch.size":"0", "table.name.format":"TEST.MY_TABLE_IN", "pk.mode":"none", "pk.fields":"none" }}' http://localhost:8083/connectors 

請告訴我任何想法,爲什麼它表現如此?

+0

連接日誌中是否有其他錯誤?如果以分佈式模式將其作爲單個工作人員運行,是否可行? –

+0

我開始連接 - 分佈在一個工作人員,發現根本問題是它找不到ojdbc8.jar,我把它放到confluent/share/java/kafka-connect-jdbc文件夾中,它有幫助。 –

+0

但還有一個問題 - 現在我看到連接日誌中有關SID的錯誤。從字面上看,它說我 - ORA-12505,TNS:偵聽器目前不知道連接描述符中給出的SID。但是我在主機上獲得了tnsnames.ora,並且可以使用sqlplus實用程序連接到SID。可能是我應該在某處複製tnsnames.ora? –

回答

0

終於成功地寫入RDBMS架構的用戶,當我讓表是由Kafka Connect用戶在其自己的模式中自動創建。奇怪的是,新表中的字段名稱是用引號引起來的,但它不是一個限制。感謝所有試圖幫助解答的人!