2017-09-22 86 views
0

目前,我有一個用例,我需要從RabbitMQ消息總線獲取消息,使用HDFS接收器附加消息大小(以字節爲單位)並輸出消息。Spring雲數據流(兔|處理器| hdfs)輸出二進制

首先,我創建了自己的處理器,它將大小附加到消息中。我這樣做的原因是因爲編碼需要是Google Protocol Buffer的編碼。

我的應用程序看起來如下:

stream create --name rabbit-to-hdfs --definition "rabbit | delim-protobuf | hdfs " 

當HDFS水槽輸出我看到[B @ 12768762該消息。我已經Google'd四周,看到建議增加以下內容:

spring.cloud.stream.bindings.input.consumer.headerMode=raw 

然而,這並沒有看到幫我了!這就是說,如果我更改應用程序中使用去一個文件中:

[input | processor ] | file --binary=true 

然後,一切工作正常。但是,我喜歡HDFS接收器提供的翻轉功能。

任何想法?

回答

0

文件正在工作,因爲它只是傾倒它收到的字節,但看着HDFS接收器,它似乎需要使用java.io.Serializable對象作爲輸入。但在你的情況下,你是從protobuf對象發送一個字節數組(我假設這是怎麼回事)

+0

是的,我實現的處理器返回一個GPB的字節數組。你是說它應該返回一個java.io.Serializable? –

+0

我試過「stream deploy --name rabbit-to-log --properties」app.log.spring.cloud.stream.bindings.input.content-type = application/x-java-object; type = java.io。可序列化的「」沒有工作。你是說處理器應該返回一個java.io.Serializable而不是byte [] –

+0

我重寫了處理器以返回一個從com.google.protobuf.GeneratedMessageV3繼承的Envelope對象。這個類繼承自Serializable。當取消註冊/註冊新應用程序並開始處理數據時,我收到一條錯誤消息「使用contentType [application/x-java-object; type = CUAVProtos $ Envelope] CUAVProtos $ Envelope」無法反序列化[CUAVProtos $ Envelope] –

0

這些類型不兼容,這就是問題所在。通過在SCS中設置contentType,您只是要求框架使用java序列化來調用writeObject。但是由於您使用的是已經是序列化框架的protobuf,它不會起作用。這裏的問題是,水槽真的看起來(我不熟悉接收器代碼)期望一個序列化,但你沒有提供一個。你可以做的是修改接收器應用程序或提供知道如何從protobuf轉換爲Serializable的自定義轉換器,甚至不知道這是否有道理是誠實的。