我是Storm的三叉戟新手。我打破了對TridentState的頭腦。至於我的理解三叉戟保持狀態(即元數據)每批次(無論是批量的所有元組通過維護數據庫事務ID完全處理),我不能完全肯定了下面的語句做什麼什麼是風暴中的三叉戟狀態?
TridentState urlToTweeters =
topology.newStaticState(getUrlToTweetersState());
誰能解釋當我們定義上面的代碼中到底發生了什麼?
我是Storm的三叉戟新手。我打破了對TridentState的頭腦。至於我的理解三叉戟保持狀態(即元數據)每批次(無論是批量的所有元組通過維護數據庫事務ID完全處理),我不能完全肯定了下面的語句做什麼什麼是風暴中的三叉戟狀態?
TridentState urlToTweeters =
topology.newStaticState(getUrlToTweetersState());
誰能解釋當我們定義上面的代碼中到底發生了什麼?
有三叉戟狀態on the storm wiki良好的文檔。簡單的回答你的問題是urlToTweeters
是一個可以查詢的狀態對象。我假定上面的語句是從trident tutorial,轉載如下:
TridentState urlToTweeters = topology.newStaticState(getUrlToTweetersState());
TridentState tweetersToFollowers = topology.newStaticState(getTweeterToFollowersState());
topology.newDRPCStream("reach")
.stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields("tweeters")).each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter"))
/* At this point we have the tweeters for each url passed in args */
.shuffle()
.stateQuery(tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers"))
.parallelismHint(200)
.each(new Fields("followers"), new ExpandList(), new Fields("follower"))
.groupBy(new Fields("follower"))
.aggregate(new One(), new Fields("one"))
.parallelismHint(20)
.aggregate(new Count(), new Fields("reach"));
在這個例子中,urlToTweeters
將存儲URL到高音揚聲器的映射,並從下一行定義的DRPC reach
查詢(取入URLs作爲它的參數)將最終產生影響。但是在路上(標有評論在線)你會看到每一個URL,即,在一個urlToTweeters
查詢結果的高音流。
你能幫助這個http://stackoverflow.com/questions/35445165/total-number-of-non每個鳴叫中重複的詞 – user1
我希望它永遠不會太晚回答,至少別人可能會覺得我的回答有用:)
所以,topology.newStaticState()
是可查詢的數據存儲的三叉戟的抽象。一種newStaticState()
參數應該是一個實現 - 基於法的合同 - 的storm.trident.state.StateFactory
。工廠應該執行makeState()
方法返回storm.trident.state.State
的實例。但是,如果您打算查詢您的狀態,則應該返回storm.trident.state.map.ReadOnlyMapState
的代碼,因爲普通storm.trident.state.State
沒有查詢實際數據源的方法(如果您嘗試使用除ReadOnlyMapState
之外的其他任何內容)。
那麼,讓我們試試吧!
一個虛擬的狀態下執行:
public static class ExampleStaticState implements ReadOnlyMapState<String> {
private final Map<String, String> dataSourceStub;
public ExampleStaticState() {
dataSourceStub = new HashMap<>();
dataSourceStub.put("tuple-00", "Trident");
dataSourceStub.put("tuple-01", "definitely");
dataSourceStub.put("tuple-02", "lacks");
dataSourceStub.put("tuple-03", "documentation");
}
@Override
public List<String> multiGet(List<List<Object>> keys) {
System.out.println("DEBUG: MultiGet, keys is " + keys);
List<String> result = new ArrayList<>();
for (List<Object> inputTuple : keys) {
result.add(dataSourceStub.get(inputTuple.get(0)));
}
return result;
}
@Override
public void beginCommit(Long txid) {
// never gets executed...
System.out.println("DEBUG: Begin commit, txid=" + txid);
}
@Override
public void commit(Long txid) {
// never gets executed...
System.out.println("DEBUG: Commit, txid=" + txid);
}
}
一廠:
public static class ExampleStaticStateFactory implements StateFactory {
@Override
public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
return new ExampleStaticState();
}
}
簡單psvm
(又名public static void main
):
public static void main(String... args) {
TridentTopology tridentTopology = new TridentTopology();
FeederBatchSpout spout = new FeederBatchSpout(Arrays.asList(new String[]{
"foo"
}));
TridentState state = tridentTopology.newStaticState(new ExampleStaticStateFactory());
tridentTopology
.newStream("spout", spout)
.stateQuery(state, new Fields("foo"), new MapGet(), new Fields("bar"))
.each(new Fields("foo", "bar"), new Debug())
;
Config conf = new Config();
conf.setNumWorkers(6);
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("tridentTopology", conf, tridentTopology.build());
spout.feed(Arrays.asList(new Values[]{
new Values("tuple-00"),
new Values("tuple-01"),
new Values("tuple-02"),
new Values("tuple-03")
}));
localCluster.shutdown();
}
最後,輸出:
DEBUG: MultiGet, keys is [[tuple-00], [tuple-01], [tuple-02], [tuple-03]]
DEBUG: [tuple-00, Trident]
DEBUG: [tuple-01, definitely]
DEBUG: [tuple-02, lacks]
DEBUG: [tuple-03, documentation]
您會看到,stateQuery()從輸入批次中獲取值並將其映射到「數據存儲」中找到的值。
潛水深一點,你可以看看MapGet
類(其實例用於拓撲內部查詢的傢伙)的來源,發現下面的有:
public class MapGet extends BaseQueryFunction<ReadOnlyMapState, Object> {
@Override
public List<Object> batchRetrieve(ReadOnlyMapState map, List<TridentTuple> keys) {
return map.multiGet((List) keys);
}
@Override
public void execute(TridentTuple tuple, Object result, TridentCollector collector) {
collector.emit(new Values(result));
}
}
所以引擎蓋下它只需調用ReadOnlyMapState
實現的multiGet()
方法,然後發出在數據存儲中找到的值,將它們添加到已經存在的元組中。你可以(儘管它可能不是最好的)創建你自己的實現BaseQueryFunction<ReadOnlyMapState, Object>
做更復雜的事情。
您可以在此上下文中定義「三叉戟」嗎?有很多事情叫三叉戟。 – Charles
上下文是「風暴」:https://github.com/nathanmarz/storm/wiki/Documentation#trident – Dan