目前,我有一個用例,我需要從RabbitMQ消息總線獲取消息,使用HDFS接收器附加消息大小(以字節爲單位)並輸出消息。Spring雲數據流(兔|處理器| hdfs)輸出二進制
首先,我創建了自己的處理器,它將大小附加到消息中。我這樣做的原因是因爲編碼需要是Google Protocol Buffer的編碼。
我的應用程序看起來如下:
stream create --name rabbit-to-hdfs --definition "rabbit | delim-protobuf | hdfs "
當HDFS水槽輸出我看到[B @ 12768762該消息。我已經Google'd四周,看到建議增加以下內容:
spring.cloud.stream.bindings.input.consumer.headerMode=raw
然而,這並沒有看到幫我了!這就是說,如果我更改應用程序中使用去一個文件中:
[input | processor ] | file --binary=true
然後,一切工作正常。但是,我喜歡HDFS接收器提供的翻轉功能。
任何想法?
是的,我實現的處理器返回一個GPB的字節數組。你是說它應該返回一個java.io.Serializable? –
我試過「stream deploy --name rabbit-to-log --properties」app.log.spring.cloud.stream.bindings.input.content-type = application/x-java-object; type = java.io。可序列化的「」沒有工作。你是說處理器應該返回一個java.io.Serializable而不是byte [] –
我重寫了處理器以返回一個從com.google.protobuf.GeneratedMessageV3繼承的Envelope對象。這個類繼承自Serializable。當取消註冊/註冊新應用程序並開始處理數據時,我收到一條錯誤消息「使用contentType [application/x-java-object; type = CUAVProtos $ Envelope] CUAVProtos $ Envelope」無法反序列化[CUAVProtos $ Envelope] –