2017-10-08 83 views
2

我有一個Storm集羣連接到Kinesis Stream。消息看起來像這樣。如何根據消息中的值將元組發送到不同的螺栓

{ 
    _c: "a" 
} 

,或者它應該是

{ 
    _c: "b" 
} 

我想與_c = 「a」 到一個螺栓和_c = 「b」 的發送的元組到不同的螺栓。我如何實現這一目標?

這是使用GSON

@Override 
public void execute(Tuple tuple) { 
    String partitionKey = (String) tuple.getValueByField(SampleKinesisRecordScheme.FIELD_PARTITION_KEY); 
    String sequenceNumber = (String) tuple.getValueByField(SampleKinesisRecordScheme.FIELD_SEQUENCE_NUMBER); 
    byte[] payload = (byte[]) tuple.getValueByField(SampleKinesisRecordScheme.FIELD_RECORD_DATA); 

    ByteBuffer buffer = ByteBuffer.wrap(payload); 
    String data = null; 
    try { 
    data = decoder.decode(buffer).toString(); 

    HashMap < String, String > map = new Gson().fromJson(data, new TypeToken < HashMap < String, Object >>() {}.getType()); 

    this.outputCollector.emit(tuple, new Values(map)); 
    this.outputCollector.ack(tuple); 

    } catch (CharacterCodingException e) { 
    this.outputCollector.fail(tuple); 
    } 

} 

感謝

回答

0

您可以定義您的螺栓兩個流,然後聲明兩個outputstreams該分析從室壁運動的消息JSON對象螺栓:

@Override 
public void execute(Tuple tuple) { 
    // ... 
    // Some Code 
    // ... 
    if (_c =="a") { 
    collector.emit("stream1", tuple, new Values(_c)); 
    } else { 
    collector.emit("stream2", tuple, new Values(_c)); 
    } 

} 

@Override 
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { 
    outputFieldsDeclarer.declareStream("stream1", new Fields("_c")); 
    outputFieldsDeclarer.declareStream("stream2", new Fields("_c")); 
} } 

在您的拓撲中,您​​可以使用ShuffleGrouping中的選項傳遞Stream_id。

topology.setBolt("FirstBolt",new FirstBolt(),1);  
topology.setBolt("newBolt1", new Custombolt(),1).shuffleGrouping("FirstBolt", "stream1"); 
topology.setBolt("newBolt2", new Custombolt(),1).shuffleGrouping("FirstBolt", "stream2"); 

另一種可能性是將它發送到兩個螺栓,然後檢查兩個螺栓中的值並執行所需的代碼。

相關問題