2015-09-28 102 views
1

我們目前在集羣拓撲模式下使用Apache Storm 0.9.5來處理Amazon Kinesis記錄(噴出)並將它們存儲到Redshift數據倉庫(螺栓)中。我們的Storm集羣部署在AWS中,由1個nimbus + UI節點,1個zookeeper節點和3個supervisor + logviewer節點組成。我們的拓撲配置支持處理多的Kinesis流和每一個數據流,它包括:Apache Storm一次性處理

  • 一個室壁運動流口監聽傳入記錄
  • 一個紅移螺栓插入記錄到數據倉庫

拓撲:

final TopologyBuilder topologyBuilder = new TopologyBuilder(); 

// for every configured kinesis stream 
final List<KinesisStreamSpout> kinesisStreamSpouts = kinesisStreamService.getKinesisStreamSpouts(); 
for (final KinesisStreamSpout kinesisStreamSpout : kinesisStreamSpouts) { 
    final String spoutId = kinesisStreamSpout.getSpoutId(); 
    topologyBuilder.setSpout(spoutId, kinesisStreamSpout.getKinesisSpout()); 

    // set the corresponding redshift bolt 
    final String streamName = kinesisStreamSpout.getStreamName(); 
    final RedshiftBolt redshiftBolt = new RedshiftBolt(streamName); 
    topologyBuilder.setBolt(redshiftBolt.getId(), 
     redshiftBolt, stormProperties.getNumberOfWorkersPerStream()).shuffleGrouping(spoutId); 
} 

return topologyBuilder.createTopology(); 

該系統的一個bugbear一直無法保證一次只能處理輸入消息導致具有相同商業密鑰的多條記錄插入到目標數據庫中。爲了解問題的嚴重程度,我們運行了一個受控測試,發現大約三分之一的輸入記錄已提交處理多次。

根據this thread(目前尚未得到答覆),我們也考慮使用Trident來保證一次只能處理,但也得出結論認爲系統內置冪原理更重要至少一次語義)而不是增加複雜性,降低性能並生成狀態,因爲這個other article建議。

我們現在正在尋求關於在支持羣集的方式下在現有拓撲中實現冪等性的最佳方式的建議。到目前爲止,我們傾向於引入一個可以通過元組消息ID鍵值的RedisBolt。有沒有現成的模式來實現這個使用Apache Storm?

+0

你確定,你在Kinesis裏面沒有重複嗎? 1/3值對我來說看起來非常高... –

+0

重複提交分析基於Amazon Kinesis spout組件從ShardId構造的唯一messageId:SequenceNumber值。換句話說,同一業務有效載荷的多個Kinesis記錄將被系統分配一個不同的元組messageId。 –

+0

好吧,那麼你的拓撲結構就會出現一些問題......你有很多失敗的元組嗎? –

回答

0

如果您不想使用Trident,您可能需要閱讀以下關於「事務拓撲」的文章。這是Trident背後的概念,您仍然可以「手動」應用它。這似乎是你的用例很好的模式:https://storm.apache.org/documentation/Transactional-topologies.html

此外,我想補充一點,風暴(像任何其他系統,如Apache Flink [聲明:我是Flink的提交者]和Apache Spark Streaming)只能確保一次處理該系統。如果數據被轉發到外部系統,那麼只有當且僅當外部系統可以支持冪等操作時才能實現一次。

相關問題