使用以下代碼,我發送Elasticsearch文件進行索引。我試圖將基本對象轉換爲JSON並通過生產者發送。然而,每一個消息(如從控制檯選中)追加jibberish字符,如 - 噸{ 「的productId」:2455卡夫卡製作人發送無效字符
public boolean sendMessage()
{
PageRequest page = new PageRequest(0, 1);
Product p = product.findByName("Cream", page).getContent().get(0);
String json = "";
ObjectMapper mapper = new ObjectMapper();
try {
json = mapper.writeValueAsString(p);
} catch (JsonProcessingException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
logger.info("JSON = " + json);
boolean status = inputToKafka.send(org.springframework.integration.support.MessageBuilder.withPayload(json).build());
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return status;
}
外向配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
<int:channel id="inputToKafka">
<int:queue/>
</int:channel>
<int-kafka:outbound-channel-adapter
id="kafkaOutboundChannelAdapter"
kafka-producer-context-ref="kafkaProducerContext"
channel="inputToKafka">
<int:poller fixed-delay="1000" time-unit="MILLISECONDS" receive-timeout="0" task-executor="taskExecutor"/>
</int-kafka:outbound-channel-adapter>
<task:executor id="taskExecutor" pool-size="5" keep-alive="120" queue-capacity="500"/>
<int-kafka:producer-context id="kafkaProducerContext">
<int-kafka:producer-configurations>
<int-kafka:producer-configuration broker-list="localhost:9092"
topic="test_topic"
compression-codec="default"/>
</int-kafka:producer-configurations>
</int-kafka:producer-context>
</beans>
任何線索?使用
感謝您的答覆@Val,但即使添加給定的行後,也會返回相同的輸出。還有什麼想法? –
生成JSON後設置斷點的任何方法(例如在'logger.info()'行)並查看JSON字符串中的內容?你的卡夫卡消費者是否也看到不好的角色,或者它只是在你的控制檯? – Val
在使用者端收到錯誤的字符,這會返回JSON解析錯誤。 'logger.info()'返回變量_json_的正確字符串。我猜測這些字符是在有效載荷建立後追加的。 –