2017-10-13 132 views
2

在卡夫卡0.11中添加了Headers到記錄(ProducerRecord & ConsumerRecord),在使用Kafka Streams處理主題時是否可以獲取這些標題?當在KStream調用諸如map方法提供了key和記錄的value的論點,但沒有辦法,我可以看到訪問headers。如果我們可以通過map而不是ConsumerRecord這將是很好的。是否可以使用Kafka Streams訪問消息標題?

ex。

KStreamBuilder kStreamBuilder = new KStreamBuilder(); 
KStream<String, String> stream = kStreamBuilder.stream("some-topic"); 
stream 
    .map((key, value) -> ...) // can I get access to headers in methods like map, filter, aggregate, etc? 
    ... 

像這樣的工作:

KStreamBuilder kStreamBuilder = new KStreamBuilder(); 
KStream<String, String> stream = kStreamBuilder.stream("some-topic"); 
stream 
    .map((record) -> { 
     record.headers(); 
     record.key(); 
     record.value(); 
    }) 
    ... 

回答

5

記錄頭是目前不流API進行訪問。有一個JIRA增加,雖然這一功能:通過https://issues.apache.org/jira/browse/KAFKA-5632

在當前卡夫卡(0.11和1.0,這將是釋放不久)就可以通過處理器API訪問記錄元數據(即,通過transform()transformValues(),或process())給出「上下文」對象。它公開主題,分區,偏移量和時間戳。 Cf https://docs.confluent.io/current/streams/developer-guide.html#applying-processors-and-transformers-processor-api-integration

元數據在DSL級別上不可用。但是,還有一些工作正在進行中,以擴展DSL:https://cwiki.apache.org/confluence/display/KAFKA/KIP-159%3A+Introducing+Rich+functions+to+Streams

+0

爲了闡明Matthias所說的話:是的,Kafka Streams中的Processor API可以訪問記錄元數據,如主題名稱,分區號,偏移量等。 Kafka Streams中的DSL不允許您訪問。但是,因爲你可以將處理器API和DSL,你仍然可以編寫使用DSL的'變換()'或'transformValues()'功能,它允許您通過訪問記錄元數據的基於DSL的流處理應用在處理器API的處理器/變換器中。 –

+0

感謝大家提供的信息,我會密切關注元數據添加到DSL級別的情況,以便可以更新此答案。 –

+0

@ MatthiasJ.Sax和@ MichaelG.Noll:在https://cwiki.apache.org/confluence/display/KAFKA/KIP-159%3A+Introducing+Rich+functions+to+Streams,對於'RecordContext'建議,它似乎沒有暴露頭部。那是會添加的東西嗎? –

相關問題