如果使用Samza的OutgoingMessageEnvelope使用這種格式來發送消息:Samza在發送消息時是否自動創建分區?
public OutgoingMessageEnvelope(SystemStream systemStream,
java.lang.Object partitionKey,
java.lang.Object key,
java.lang.Object message)
Constructs a new OutgoingMessageEnvelope from specified components.
Parameters:
systemStream - Object representing the appropriate stream of which this envelope will be sent on.
partitionKey - A key representing which partition of the systemStream to send this envelope on.
key - A deserialized key to be used for the message.
message - A deserialized message to be sent in this envelope.
和調用流任務的過程中()方法中這種方法,並希望將傳入郵件到適當的分區,將Samza創建你調用該方法時的分區?
E.g.
MessageA = {"id": "idA", "key": "keyA", "body":"some details"}
MessageB = {"id": "idB", "key": "keyB", "body":"some more details"}
如果我稱之爲內流任務的process()
其中msg
是一個消息實例:
public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
// ...
String partition = msg["id"]
String key = msg["key"]
collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "PartitionedMessages"), id, key, msg));
// ...
這是否會自動創建分區,IDA和美洲開發銀行爲我(即我需要創造這些分區前我發信息給他們)?我希望能夠將消息路由到適當的分區,並且能夠使用單獨的消息密鑰記錄壓縮。
非常感謝,這是一個非常明確和有用的答案。 – John