在卡夫卡0.11中添加了Headers到記錄(ProducerRecord & ConsumerRecord),在使用Kafka Streams處理主題時是否可以獲取這些標題?當在KStream
調用諸如map
方法提供了key
和記錄的value
的論點,但沒有辦法,我可以看到訪問headers
。如果我們可以通過map
而不是ConsumerRecord
這將是很好的。是否可以使用Kafka Streams訪問消息標題?
ex。
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
.map((key, value) -> ...) // can I get access to headers in methods like map, filter, aggregate, etc?
...
像這樣的工作:
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
.map((record) -> {
record.headers();
record.key();
record.value();
})
...
爲了闡明Matthias所說的話:是的,Kafka Streams中的Processor API可以訪問記錄元數據,如主題名稱,分區號,偏移量等。 Kafka Streams中的DSL不允許您訪問。但是,因爲你可以將處理器API和DSL,你仍然可以編寫使用DSL的'變換()'或'transformValues()'功能,它允許您通過訪問記錄元數據的基於DSL的流處理應用在處理器API的處理器/變換器中。 –
感謝大家提供的信息,我會密切關注元數據添加到DSL級別的情況,以便可以更新此答案。 –
@ MatthiasJ.Sax和@ MichaelG.Noll:在https://cwiki.apache.org/confluence/display/KAFKA/KIP-159%3A+Introducing+Rich+functions+to+Streams,對於'RecordContext'建議,它似乎沒有暴露頭部。那是會添加的東西嗎? –