2017-08-16 22 views
0

目前我卡住以下問題: 我正在使用KafkaConsumer從卡夫卡主題閱讀消息。消息是字符串並具有以下格式: { "a" : "b", "a1" : "b1", "c2" : "c3" } 它們保存在FlowFile的有效內容中。NiFi,流量與KafkaConsumer寫作json

我想將該字符串轉換爲json或理想的csv,但不知道如何去做。

我是NiFi的新手,並且儘可能多地進行了研究,但是我發現的答案是關於從json轉換爲avro或類似的,但從不將字符串轉換爲json或avro。 我還發現Kafka消息是在FlowFile的有效載荷中,而不是在屬性中,所以我不知道如何讓它接受我的手,因爲這些示例總是涉及屬性。

所以簡而言之:我可以將FlowFile(一個字符串)的有效載荷轉換爲帶有某些內置處理器的json/cvs。

回答

0

如果你的消息是在FlowFile,按以下順序可能會有所幫助:

1)使用AttributesToJson到有用消息轉換成JSON。 2)使用EvaluateJsonPath來提取有效負載消息。在你的情況下,卡夫卡消息。然後,您可以將提取的消息傳遞給csv代。

這篇文章可以幫助到JSON轉換成CSV:Convert Json To CSV

0

我落得這樣做:

  1. ConsumeKafka給我的字符串:

{ "a" : "b", "a1" : "b1" }

  • EvaluateJsonPath通過添加屬性創建屬性
  • a -> $.a //results in attribute named a with value b

    a1 -> $.a1 //results in attribute named a1 with value b1

  • ReplaceText獲取從EvaluateJsonPath的屬性,以形成一個單獨的CSV格式化:
  • Replacement value -> ${'a'},${'a1'}

    這結果爲單行,但是沒有NEW LINE

    b,b1

    要添加新行追加\ n'\ n'「\ n」沒有工作。 什麼工作是按Shift + Enter,同時鍵入替換值字段,這導致創建一個空的新行。