0

的卡夫卡連接API如何設置max.poll.records我使用匯合3.0.1平臺,建設卡夫卡Elasticsearch連接器。爲此,我正在擴展SinkConnector和SinkTask(Kafka連接API)以獲取來自Kafka的數據。匯合平臺

如此代碼我正在擴大SinkConnector的taskConfigs方法返回「max.poll.records」在同一時間獲取只有100條記錄的一部分。但它不能正常工作,我正在同時獲取所有記錄,而且我沒有在規定的時間內進行補償。請任何一個可以幫我配置「max.poll.records」

public List<Map<String, String>> taskConfigs(int maxTasks) { 
    ArrayList<Map<String, String>> configs = new ArrayList<Map<String, String>>(); 
    for (int i = 0; i < maxTasks; i++) { 
     Map<String, String> config = new HashMap<String, String>(); 
     config.put(ConfigurationConstants.CLUSTER_NAME, clusterName); 
     config.put(ConfigurationConstants.HOSTS, hosts); 
     config.put(ConfigurationConstants.BULK_SIZE, bulkSize); 
     config.put(ConfigurationConstants.IDS, elasticSearchIds); 
     config.put(ConfigurationConstants.TOPICS_SATELLITE_DATA, topics); 
     config.put(ConfigurationConstants.PUBLISH_TOPIC, topicTopublish); 
     config.put(ConfigurationConstants.TYPES, elasticSearchTypes); 
     config.put("max.poll.records", "100"); 

     configs.add(config); 
    } 
    return configs; 
    } 
+0

BTW,Confluent 3.1(今天發佈)包括一個Elasticsearch接收器連接器,以防萬一您的需要。 http://docs.confluent.io/3.1.0/connect/connect-elasticsearch/docs/index.html – shikhar

回答

3

不能覆蓋大部分消費者卡夫卡一樣CONFIGS在連接器配置max.poll.records。您可以在連接工作人員配置中這樣做,但前綴爲consumer.

+0

我創建了一個worker.properties並在屬性文件中提供了上述屬性,並在命令下運行。 SH ./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties ./etc/kafka-connect-elasticsearch/worker.properties ./etc/kafka-connect-elasticsearch/connector.properties> connectorlogs.log 但得到異常。 org.apache.kafka.common.config.ConfigException:缺少必需的配置「connector.class」,它沒有默認值。 – Renukaradhya

+0

我的worker.properties包含「group.id」= operative1。 「operative1.max.poll.records」= 1000,我的connector.properties包含正確的「connector.class」,但仍然出現此錯誤。 – Renukaradhya

+0

你需要'consumer.max.poll.records = 1000'在工作人員配置 – shikhar

0

有人解決。我在connect-avro-standalone.properties中添加了以下配置

group.id=mygroup 
consumer.max.poll.records=1000 

並運行下面的命令運行我的連接器。

sh ./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties ./etc/kafka-connect-elasticsearch/connect-elasticsearch-sink.properties