2016-09-27 22 views
0

ElasticSearch版本 - 2.4.0彈性搜索:GOT java.lang.IllegalArgumentException異常:對象的編號通過必須是偶數,但爲[1]

日誌:

java.lang.IllegalArgumentException: The number of object passed must be even but was [1] 
     at org.elasticsearch.action.index.IndexRequest.source(IndexRequest.java:451) 
     at org.elasticsearch.action.index.IndexRequestBuilder.setSource(IndexRequestBuilder.java:186) 
     at org.apache.kafka.connect.elasticsearchschema.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:138) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:381) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:227) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142) 
     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) 

代碼:

鑑於
// This method will put the SinkRecords which are sent in bulk to Elastic Search with proper index and type. 
public void put(Collection<SinkRecord> sinkRecords) { 
    try { 
     // Gets a list of SinkRecord from Kafka broker. 
     List<SinkRecord> records = new ArrayList<SinkRecord>(sinkRecords); 
     for (int i = 0; i < records.size(); i++) { 
     BulkRequestBuilder bulkRequest = client.prepareBulk(); 
     // Looping through the SinkRecords and the size should be less than bulksize. 
     for (int j = 0; j < bulkSize && i < records.size(); j++, i++) { 
      SinkRecord record = records.get(i); 
      // Index and type is hardcoded and record.value() contains the Json message. 
      bulkRequest.add(client.prepareIndex("operative1", "test").setSource(record.value())); 
     } 
     i--; 
     // Executing bulk requests. 
     BulkResponse bulkResponse = bulkRequest.execute().actionGet(); 
     } 
    } catch (Exception e) { 
    } 
    } 

輸入是 - >{ "id1": "file", "value1": "File" }

請幫助解決這個問題。

+0

我甚至嘗試使用下面的代碼保持單個記錄並面臨同樣的問題。 IndexResponse response = client.prepareIndex(「operative1」,「test」).setSource(record.value())。execute() .actionGet(); – Renukaradhya

+1

[ElasticsearchSinkTask]中的'put'方法(https://github.com/DataReply/kafka-connect-elastic-search-sink/blob/master/src/main/java/org/apache/kafka/connect/elasticsearchschema /ElasticsearchSinkTask.java#L119)將一個Map(它是一個鍵,值對)傳遞給'setSource(...)'。也許你可以看看這裏。 – Kammeyer

+0

或者你可能會看看:[傳遞的對象的彈性搜索數量必須是偶數](https://stackoverflow.com/questions/39187097/elastic-search-number-of-object-passed-must-be-even ?rq = 1) – Kammeyer

回答

0

最終代碼看起來像這樣通過傳遞Map。

// This method will put the SinkRecords which are sent in bulk to Elastic Search with proper index and type. 
public void put(Collection<SinkRecord> sinkRecords) { 
    try { 
     ObjectMapper mapper = new ObjectMapper(); 
     // Gets a list of SinkRecord from Kafka broker. 
     List<SinkRecord> records = new ArrayList<SinkRecord>(sinkRecords); 
     for (int i = 0; i < records.size(); i++) { 
     BulkRequestBuilder bulkRequest = client.prepareBulk(); 
     // Looping through the SinkRecords and the size should be less than bulksize. 
     for (int j = 0; j < bulkSize && i < records.size(); j++, i++) { 
      SinkRecord record = records.get(i); 
      // Index and type is hardcoded and record.value() contains the Json message. 
      Map<String, Object> map = mapper.readValue((String) record.value(), new TypeReference<Map<String, Object>>() { 
      }); 
      bulkRequest.add(client.prepareIndex("operative1", "test").setSource(map)); 
     } 
     i--; 
     // Executing bulk requests. 
     BulkResponse bulkResponse = bulkRequest.execute().actionGet(); 
     } 
    } catch (Exception e) { 
    } 
    } 
相關問題