從一個拓撲結構發射元組到另一個拓撲結構有可能或很好嗎?風暴 - 拓撲結構到拓撲結構
比方說,在一個拓撲結構中,一個特定的螺栓正在將元組存儲到db中。在另一個拓撲結構中,我不想複製或創建用於存儲元組的同一個螺栓。那麼從第二個拓撲可以發射到第一個拓撲螺栓?
-Hariprasad
從一個拓撲結構發射元組到另一個拓撲結構有可能或很好嗎?風暴 - 拓撲結構到拓撲結構
比方說,在一個拓撲結構中,一個特定的螺栓正在將元組存儲到db中。在另一個拓撲結構中,我不想複製或創建用於存儲元組的同一個螺栓。那麼從第二個拓撲可以發射到第一個拓撲螺栓?
-Hariprasad
這是目前不支持,你不能傳遞從一個拓撲到另一個元組。 根據您的使用情況,您爲什麼不使用其他螺栓(在同一拓撲結構中)訂閱db螺栓而不是運行單獨的拓撲結構
雖然不能將元組從一個拓撲直接傳遞到另一個拓撲,但可以使用排隊系統如Apache Kafka來完成你描述的內容。 Storm在最新發布的版本中包含Kafka噴嘴。
該設置需要兩個風暴拓撲(A和B)和一個Kafka主題。讓我們把它叫做「轉移」
在要發送數據到B的拓撲結構的拓撲,使用卡夫卡製片人:
[卡夫卡初始化代碼是直接從文檔採取:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example,顯然需要要爲您定製卡夫卡安裝]
public void Execute(Tuple input){
...
Properties props = new Properties();
props.put("metadata.broker.list", "broker1:9092,broker2:9092 ");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class", "example.producer.SimplePartitioner");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String (config);
String msg = ...
KeyedMessage<String, String> data = new KeyedMessage<String, String>
("transfers", ip, msg);
producer.send(data);
producer.close();
在拓撲B,創建一個卡夫卡的噴嘴。當你初始化拓撲:
BrokerHosts hosts = new ZkHosts(zkConnString);
SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName,
UUID.randomUUID().toString());
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
// Now it's just like any other spout
topologyBuilder.setSpout(kafkaSpout);
這需要運行kafka,當然(請查看https://kafka.apache.org/08/quickstart.html)。
[編輯:再次讀你的問題:聽起來你有一個可重複使用的組件(保存元組),你想從兩個不同的拓撲調用,並且你試圖從另一個拓撲中調用一個。另一種方法是將此任務卸載到第三個拓撲,專門用於處理保存元組並僅創建需要在拓撲中保留的項的kafka消息。這樣,保存元組的所有事件都將以同樣的方式處理。]