2017-10-21 104 views
0

我正在嘗試使用s predfast s3 connector創建一個Kafka接收器連接器。然而,出於某種原因,日誌輸出報告的是SourceConnectorConfig:爲什麼SourceConnectorConfig報告接收器連接器?

INFO ConnectorConfig values: 
     connector.class = com.spredfast.kafka.connect.s3.sink.S3SinkConnector 
     key.converter = null 
     name = transactions-s3-sink 
     tasks.max = 1 
     transforms = null 
     value.converter = class org.apache.kafka.connect.storage.StringConverter 
(org.apache.kafka.connect.runtime.ConnectorConfig:180) 
INFO Creating connector transactions-s3-sink of type com.spredfast.kafka.connect.s3.sink.S3SinkConnector (org.apache.kafka.connect.runtime.Worker:178) 
INFO Instantiated connector transactions-s3-sink with version 0.0.1 of type class com.spredfast.kafka.connect.s3.sink.S3SinkConnector (org.apache.kafka.connect.runtime.Worker:181) 
INFO Finished creating connector transactions-s3-sink (org.apache.kafka.connect.runtime.Worker:194) 
INFO SourceConnectorConfig values: 
     connector.class = com.spredfast.kafka.connect.s3.sink.S3SinkConnector 
     key.converter = null 
     name = transactions-s3-sink 
     tasks.max = 1 
     transforms = null 
     value.converter = class org.apache.kafka.connect.storage.StringConverter 
(org.apache.kafka.connect.runtime.SourceConnectorConfig:180) 
INFO Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:824) 
... 
INFO Sink task WorkerSinkTask{id=transactions-s3-sink-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:232) 

爲什麼一個SinkConnectorConfig報道還進一步在日誌輸出我可以看到一個WorkerSinkTask被創造?

回答

1

原因是此連接器從Connect的API(請參閱源代碼here)擴展Connector抽象類而不是SinkConnector抽象類。

因此,Connect框架無法確定此連接器是源還是接收器,並且當前代碼中的邏輯是,如果它不是接收器,則認爲它是源。這就是你遇到這種不一致的原因。

解決方案是爲連接器擴展適當的抽象類(此處爲org.apache.kafka.connect.sink.SinkConnector

相關問題