的卡夫卡連接API如何設置max.poll.records我使用匯合3.0.1平臺,建設卡夫卡Elasticsearch連接器。爲此,我正在擴展SinkConnector和SinkTask(Kafka連接API)以獲取來自Kafka的數據。匯合平臺
如此代碼我正在擴大SinkConnector的taskConfigs方法返回「max.poll.records」在同一時間獲取只有100條記錄的一部分。但它不能正常工作,我正在同時獲取所有記錄,而且我沒有在規定的時間內進行補償。請任何一個可以幫我配置「max.poll.records」
public List<Map<String, String>> taskConfigs(int maxTasks) {
ArrayList<Map<String, String>> configs = new ArrayList<Map<String, String>>();
for (int i = 0; i < maxTasks; i++) {
Map<String, String> config = new HashMap<String, String>();
config.put(ConfigurationConstants.CLUSTER_NAME, clusterName);
config.put(ConfigurationConstants.HOSTS, hosts);
config.put(ConfigurationConstants.BULK_SIZE, bulkSize);
config.put(ConfigurationConstants.IDS, elasticSearchIds);
config.put(ConfigurationConstants.TOPICS_SATELLITE_DATA, topics);
config.put(ConfigurationConstants.PUBLISH_TOPIC, topicTopublish);
config.put(ConfigurationConstants.TYPES, elasticSearchTypes);
config.put("max.poll.records", "100");
configs.add(config);
}
return configs;
}
BTW,Confluent 3.1(今天發佈)包括一個Elasticsearch接收器連接器,以防萬一您的需要。 http://docs.confluent.io/3.1.0/connect/connect-elasticsearch/docs/index.html – shikhar