2017-07-04 118 views
0

我有一個使用Kafka的基於Spring Cloud Stream的微服務。Spring Cloud stream partitionKeyExpression錯誤計算

我用4個分區創建了一個kafka主題。

我配置在我YML以下內容:partitionKey可變

spring: 
    cloud: 
    stream: 
     bindings: 
     SYNC_TABLE: 
      content-type: application/json 
      partitionKeyExpression: payload.partitionKey 
      partitionCount: 4 
      destination: ${envTopicPrefix}.LEGACY_TABLE 

在我的代碼有一個包含我的消息類(在其超類):

@Data 
@EqualsAndHashCode(callSuper=true) 
@ToString(callSuper=true) 
public class TransactionResponse extends GeneralOutputMessage{ 

} 

@Data 
@ToString 
public class GeneralOutputMessage { 

    private String operationType; 
    private List<String> affectedFields; 
    private Object data; 
    private String eventId; 
    private String eventName; 
    private String partitionKey; 
} 

我正在發送TransactionsResponse對象作爲消息:

final TransactionResponse transactionResponse = handler.handleEvent(event); 
if (transactionResponse != null) { 
    outputChannels.tableSync().send(MessageBuilder.withPayload(transactionResponse).build()); 
    log.info("Message Sent: {}", transactionResponse); 
} 

我的期望是春天的雲流將採取關鍵payload.partitionKey,計算其hashCode()%4,並將事件發送到該分區。

但是,邏輯是完全隨機的。下面是一些例子:

Math.abs( 「111615631」 .hashCode()%4)= 1。然而,在消息被髮送給分區號3.

Math.abs( 「110019882」。 hashCode()%4)= 2。但是,該消息被髮送到分區號0.

Math.abs(「943152574」.hashCode()%4)= 0.此消息確實得到發送分區編號爲0.

Math.abs(「943198862」.hashCode()%4)= 0.但是,消息被髮送到分區號r 2.

我正在使用Dalston.SR1發行版。

我在這裏錯過了什麼?

謝謝。

更新:

只是試圖用相同的partitionKey(但略有不同的消息體)發送相同的事件。即使分區密鑰相同,該消息也會轉到兩個不同的分區。看起來Spring Cloud Stream完全忽略了partitionKeyExpression。

回答

0

是我的錯,我忘了補充生產者:在陽明部分:

spring: 
    cloud: 
    stream: 
     bindings: 
     SYNC_TABLE: 
      content-type: application/json 
      producer: 
      partitionKeyExpression: payload.partitionKey 
      partitionCount: 4 
      destination: ${envTopicPrefix}.LEGACY_TABLE 
相關問題