2015-10-04 30 views
4

使用以下代碼,我發送Elasticsearch文件進行索引。我試圖將基本對象轉換爲JSON並通過生產者發送。然而,每一個消息(如從控制檯選中)追加jibberish字符,如 - 噸{ 「的productId」:2455卡夫卡製作人發送無效字符

public boolean sendMessage() 
{ 
    PageRequest page = new PageRequest(0, 1); 
    Product p = product.findByName("Cream", page).getContent().get(0); 
    String json = ""; 
    ObjectMapper mapper = new ObjectMapper(); 
    try { 
     json = mapper.writeValueAsString(p); 
    } catch (JsonProcessingException e1) { 
     // TODO Auto-generated catch block 
     e1.printStackTrace(); 
    }  
    logger.info("JSON = " + json); 

    boolean status = inputToKafka.send(org.springframework.integration.support.MessageBuilder.withPayload(json).build()); 
    try { 
     Thread.sleep(10000); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
    return status; 
} 

外向配置

<?xml version="1.0" encoding="UTF-8"?> 
<beans xmlns="http://www.springframework.org/schema/beans" 
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
     xmlns:int="http://www.springframework.org/schema/integration" 
     xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka" 
     xmlns:task="http://www.springframework.org/schema/task" 
     xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd 
     http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd 
     http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd 
     http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd"> 

    <int:channel id="inputToKafka"> 
     <int:queue/> 
    </int:channel> 

    <int-kafka:outbound-channel-adapter 
      id="kafkaOutboundChannelAdapter" 
      kafka-producer-context-ref="kafkaProducerContext" 
      channel="inputToKafka"> 
     <int:poller fixed-delay="1000" time-unit="MILLISECONDS" receive-timeout="0" task-executor="taskExecutor"/> 
    </int-kafka:outbound-channel-adapter> 

    <task:executor id="taskExecutor" pool-size="5" keep-alive="120" queue-capacity="500"/> 

    <int-kafka:producer-context id="kafkaProducerContext"> 
     <int-kafka:producer-configurations> 
      <int-kafka:producer-configuration broker-list="localhost:9092" 
               topic="test_topic" 
               compression-codec="default"/> 
     </int-kafka:producer-configurations> 
    </int-kafka:producer-context> 

    </beans> 

任何線索?使用

插件:Spring Extension Kafka

回答

1

我今天遇到這個問題,如下圖所示得到它由生產者配置設置正確的值串級解決:

<int-kafka:producer-configuration 
       broker-list="localhost:9092" topic="headers['kafka_topic']" 
       key-class-type="java.lang.String" value-class-type="java.lang.String" 
       key-serializer="kafkaSerializer" value-serializer="kafkaSerializer"/> 

<bean id="kafkaSerializer" class="org.apache.kafka.common.serialization.StringSerializer" /> 
0

這些可能是製表符(因爲縮進的JSON),您的主機不解釋好。

如果您禁用對象映射器生成的輸出的縮進,那些字符可能會消失。

try { 
    mapper.disable(SerializationFeature.INDENT_OUTPUT);  <---- add this line 
    json = mapper.writeValueAsString(p); 
} catch (JsonProcessingException e1) { 
    // TODO Auto-generated catch block 
    e1.printStackTrace(); 
}  
+0

感謝您的答覆@Val,但即使添加給定的行後,也會返回相同的輸出。還有什麼想法? –

+0

生成JSON後設置斷點的任何方法(例如在'logger.info()'行)並查看JSON字符串中的內容?你的卡夫卡消費者是否也看到不好的角色,或者它只是在你的控制檯? – Val

+0

在使用者端收到錯誤的字符,這會返回JSON解析錯誤。 'logger.info()'返回變量_json_的正確字符串。我猜測這些字符是在有效載荷建立後追加的。 –

0

卡夫卡不會做這樣的事情。調試您發送給Kafka製作人的String消息。 如果您從URL或HTML表單收到此消息,則可能需要在發送給製作人之前先解碼您的消息。

例如URLDecoder.decode(消息,「UTF-8」)