2013-07-13 41 views
7

我是Storm的三叉戟新手。我打破了對TridentState的頭腦。至於我的理解三叉戟保持狀態(即元數據)每批次(無論是批量的所有元組通過維護數據庫事務ID完全處理),我不能完全肯定了下面的語句做什麼什麼是風暴中的三叉戟狀態?

TridentState urlToTweeters = 
    topology.newStaticState(getUrlToTweetersState()); 

誰能解釋當我們定義上面的代碼中到底發生了什麼?

+0

您可以在此上下文中定義「三叉戟」嗎?有很多事情叫三叉戟。 – Charles

+1

上下文是「風暴」:https://github.com/nathanmarz/storm/wiki/Documentation#trident – Dan

回答

0

有三叉戟狀態on the storm wiki良好的文檔。簡單的回答你的問題是urlToTweeters是一個可以查詢的狀態對象。我假定上面的語句是從trident tutorial,轉載如下:

TridentState urlToTweeters = topology.newStaticState(getUrlToTweetersState()); 
TridentState tweetersToFollowers = topology.newStaticState(getTweeterToFollowersState()); 
topology.newDRPCStream("reach") 
    .stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields("tweeters")).each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter")) 
    /* At this point we have the tweeters for each url passed in args */ 
    .shuffle()   
    .stateQuery(tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers")) 
    .parallelismHint(200) 
    .each(new Fields("followers"), new ExpandList(), new Fields("follower")) 
    .groupBy(new Fields("follower")) 
    .aggregate(new One(), new Fields("one")) 
    .parallelismHint(20) 
    .aggregate(new Count(), new Fields("reach")); 

在這個例子中,urlToTweeters將存儲URL到高音揚聲器的映射,並從下一行定義的DRPC reach查詢(取入URLs作爲它的參數)將最終產生影響。但是在路上(標有評論在線)你會看到每一個URL,即,在一個urlToTweeters查詢結果的高音流。

+0

你能幫助這個http://stackoverflow.com/questions/35445165/total-number-of-non每個鳴叫中重複的詞 – user1

9

我希望它永遠不會太晚回答,至少別人可能會覺得我的回答有用:)

所以,topology.newStaticState()是可查詢的數據存儲的三叉戟的抽象。一種newStaticState()參數應該是一個實現 - 基於法的合同 - 的storm.trident.state.StateFactory。工廠應該執行makeState()方法返回storm.trident.state.State的實例。但是,如果您打算查詢您的狀態,則應該返回storm.trident.state.map.ReadOnlyMapState的代碼,因爲普通storm.trident.state.State沒有查詢實際數據源的方法(如果您嘗試使用除ReadOnlyMapState之外的其他任何內容)。

那麼,讓我們試試吧!

一個虛擬的狀態下執行:

public static class ExampleStaticState implements ReadOnlyMapState<String> { 

    private final Map<String, String> dataSourceStub; 

    public ExampleStaticState() { 
     dataSourceStub = new HashMap<>(); 
     dataSourceStub.put("tuple-00", "Trident"); 
     dataSourceStub.put("tuple-01", "definitely"); 
     dataSourceStub.put("tuple-02", "lacks"); 
     dataSourceStub.put("tuple-03", "documentation"); 
    } 

    @Override 
    public List<String> multiGet(List<List<Object>> keys) { 

     System.out.println("DEBUG: MultiGet, keys is " + keys); 

     List<String> result = new ArrayList<>(); 

     for (List<Object> inputTuple : keys) { 
      result.add(dataSourceStub.get(inputTuple.get(0))); 
     } 

     return result; 
    } 

    @Override 
    public void beginCommit(Long txid) { 
     // never gets executed... 
     System.out.println("DEBUG: Begin commit, txid=" + txid); 
    } 

    @Override 
    public void commit(Long txid) { 
     // never gets executed... 
     System.out.println("DEBUG: Commit, txid=" + txid); 
    } 
} 

一廠:

public static class ExampleStaticStateFactory implements StateFactory { 
    @Override 
    public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { 
     return new ExampleStaticState(); 
    } 
} 

簡單psvm(又名public static void main):

public static void main(String... args) { 
    TridentTopology tridentTopology = new TridentTopology(); 
    FeederBatchSpout spout = new FeederBatchSpout(Arrays.asList(new String[]{ 
      "foo" 
    })); 
    TridentState state = tridentTopology.newStaticState(new ExampleStaticStateFactory()); 
    tridentTopology 
      .newStream("spout", spout) 
      .stateQuery(state, new Fields("foo"), new MapGet(), new Fields("bar")) 
      .each(new Fields("foo", "bar"), new Debug()) 
      ; 

    Config conf = new Config(); 
    conf.setNumWorkers(6); 

    LocalCluster localCluster = new LocalCluster(); 
    localCluster.submitTopology("tridentTopology", conf, tridentTopology.build()); 

    spout.feed(Arrays.asList(new Values[]{ 
      new Values("tuple-00"), 
      new Values("tuple-01"), 
      new Values("tuple-02"), 
      new Values("tuple-03") 
    })); 

    localCluster.shutdown(); 
} 

最後,輸出:

DEBUG: MultiGet, keys is [[tuple-00], [tuple-01], [tuple-02], [tuple-03]] 
DEBUG: [tuple-00, Trident] 
DEBUG: [tuple-01, definitely] 
DEBUG: [tuple-02, lacks] 
DEBUG: [tuple-03, documentation] 

您會看到,stateQuery()從輸入批次中獲取值並將其映射到「數據存儲」中找到的值。

潛水深一點,你可以看看MapGet類(其實例用於拓撲內部查詢的傢伙)的來源,發現下面的有:

public class MapGet extends BaseQueryFunction<ReadOnlyMapState, Object> { 
    @Override 
    public List<Object> batchRetrieve(ReadOnlyMapState map, List<TridentTuple> keys) { 
     return map.multiGet((List) keys); 
    }  

    @Override 
    public void execute(TridentTuple tuple, Object result, TridentCollector collector) { 
     collector.emit(new Values(result)); 
    }  
} 

所以引擎蓋下它只需調用ReadOnlyMapState實現的multiGet()方法,然後發出在數據存儲中找到的值,將它們添加到已經存在的元組中。你可以(儘管它可能不是最好的)創建你自己的實現BaseQueryFunction<ReadOnlyMapState, Object>做更復雜的事情。

+1

謝謝...它永遠不會太晚,當談到學習... – Ezhil

+0

你能幫助這個http://stackoverflow.com/questions/35445165/total-number-of - 壬重複字合每個鳴叫 – user1

相關問題