2
我有一個Storm集羣連接到Kinesis Stream。消息看起來像這樣。如何根據消息中的值將元組發送到不同的螺栓
{
_c: "a"
}
,或者它應該是
{
_c: "b"
}
我想與_c = 「a」 到一個螺栓和_c = 「b」 的發送的元組到不同的螺栓。我如何實現這一目標?
這是使用GSON
@Override
public void execute(Tuple tuple) {
String partitionKey = (String) tuple.getValueByField(SampleKinesisRecordScheme.FIELD_PARTITION_KEY);
String sequenceNumber = (String) tuple.getValueByField(SampleKinesisRecordScheme.FIELD_SEQUENCE_NUMBER);
byte[] payload = (byte[]) tuple.getValueByField(SampleKinesisRecordScheme.FIELD_RECORD_DATA);
ByteBuffer buffer = ByteBuffer.wrap(payload);
String data = null;
try {
data = decoder.decode(buffer).toString();
HashMap < String, String > map = new Gson().fromJson(data, new TypeToken < HashMap < String, Object >>() {}.getType());
this.outputCollector.emit(tuple, new Values(map));
this.outputCollector.ack(tuple);
} catch (CharacterCodingException e) {
this.outputCollector.fail(tuple);
}
}
感謝