2015-01-14 148 views
1

從一個拓撲結構發射元組到另一個拓撲結構有可能或很好嗎?風暴 - 拓撲結構到拓撲結構

比方說,在一個拓撲結構中,一個特定的螺栓正在將元組存儲到db中。在另一個拓撲結構中,我不想複製或創建用於存儲元組的同一個螺栓。那麼從第二個拓撲可以發射到第一個拓撲螺栓?

-Hariprasad

回答

0

這是目前不支持,你不能傳遞從一個拓撲到另一個元組。 根據您的使用情況,您爲什麼不使用其他螺栓(在同一拓撲結構中)訂閱db螺栓而不是運行單獨的拓撲結構

1

雖然不能將元組從一個拓撲直接傳遞到另一個拓撲,但可以使用排隊系統如Apache Kafka來完成你描述的內容。 Storm在最新發布的版本中包含Kafka噴嘴。

1

該設置需要兩個風暴拓撲(A和B)和一個Kafka主題。讓我們把它叫做「轉移」

在要發送數據到B的拓撲結構的拓撲,使用卡夫卡製片人:

[卡夫卡初始化代碼是直接從文檔採取:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example,顯然需要要爲您定製卡夫卡安裝]

public void Execute(Tuple input){ 
... 
    Properties props = new Properties(); 
    props.put("metadata.broker.list", "broker1:9092,broker2:9092 "); 
    props.put("serializer.class", "kafka.serializer.StringEncoder"); 
    props.put("partitioner.class", "example.producer.SimplePartitioner"); 
    props.put("request.required.acks", "1"); 

    ProducerConfig config = new ProducerConfig(props); 

    Producer<String, String> producer = new Producer<String, String (config); 
    String msg = ... 
    KeyedMessage<String, String> data = new KeyedMessage<String, String> 
     ("transfers", ip, msg); 
    producer.send(data); 
    producer.close(); 

在拓撲B,創建一個卡夫卡的噴嘴。當你初始化拓撲:

BrokerHosts hosts = new ZkHosts(zkConnString); 
SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, 
    UUID.randomUUID().toString()); 
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme()); 
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); 

// Now it's just like any other spout 
topologyBuilder.setSpout(kafkaSpout); 

這需要運行kafka,當然(請查看https://kafka.apache.org/08/quickstart.html)。

[編輯:再次讀你的問題:聽起來你有一個可重複使用的組件(保存元組),你想從兩個不同的拓撲調用,並且你試圖從另一個拓撲中調用一個。另一種方法是將此任務卸載到第三個拓撲,專門用於處理保存元組並僅創建需要在拓撲中保留的項的kafka消息。這樣,保存元組的所有事件都將以同樣的方式處理。]