2015-05-14 87 views
2

如何運行Trident Topology的JUnit測試,以允許元組在測試和驗證每個階段的輸出時流經拓撲?我嘗試過在Storm的測試框架中運行,但它沒有允許驗證和一致執行Trident。在Storm中運行Trident拓撲TrackedTopology單元測試

下面是一個帶有一些內嵌評論的示例拓撲,其中我遇到的問題最多。

import static org.junit.Assert.assertEquals; 
import java.util.Arrays; 
import java.util.List; 
import org.junit.Test; 
import storm.trident.TridentState; 
import storm.trident.TridentTopology; 
import storm.trident.operation.builtin.Count; 
import storm.trident.testing.MemoryMapState; 
import storm.trident.testing.Split; 
import backtype.storm.Config; 
import backtype.storm.ILocalCluster; 
import backtype.storm.Testing; 
import backtype.storm.testing.FeederSpout; 
import backtype.storm.testing.TestJob; 
import backtype.storm.testing.TrackedTopology; 
import backtype.storm.tuple.Fields; 
import backtype.storm.utils.Utils; 

public class WordCountTopologyTest { 

    @Test 
    public void testWordCountTopology() throws Exception { 
     Testing.withTrackedCluster(new WordCountTestJob()); 
    } 

    public class WordCountTestJob implements TestJob { 

     @Override 
     public void run(ILocalCluster cluster) throws Exception { 

      // Create the test topology to submit 
      TridentTopology termCountTopology = new TridentTopology(); 

      FeederSpout feeder = new FeederSpout(new Fields("text", "author")); 

      TridentState tridentState = termCountTopology.newStream("inputSpout", feeder) 
        .each(new Fields("text"), new Split(), new Fields("word")) 
        .groupBy(new Fields("word")) 
        .name("counter-output") 
        .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))     
        .parallelismHint(6); 

      TrackedTopology tracked = Testing.mkTrackedTopology(cluster, termCountTopology.build()); 

      // Feed some random data into the topology 
      feeder.feed(Arrays.asList("Nearly all men can stand adversity, but if you want to test a man's character, give him power.", "Abraham Lincoln")); 
      feeder.feed(Arrays.asList("No man has a good enough memory to be a successful liar.", "Abraham Lincoln")); 
      feeder.feed(Arrays.asList("Either write something worth reading or do something worth writing.", "Benjamin Franklin")); 
      feeder.feed(Arrays.asList("Well done is better than well said.", "Benjamin Franklin")); 

      cluster.submitTopology("word-count-testing", new Config(), tracked.getTopology()); 

      // (!!) Runs, but bad to sleep for any time when may run faster or slower on other systems 
      // Utils.sleep(5000); 

      // (!!) Fails with 5000ms Topology timeout 
      // Testing.trackedWait(tracked, 3); 

      /* 
      * (!!) Always 0. Trident creates the streams and bolts internally with 
      * different names, so how can we read them to verify? 
      */ 
      List outputTuples = Testing.readTuples(tracked, "counter-output"); 
      assertEquals(0, outputTuples.size()); 
     } 
    } 
} 

除此之外,我試着寫我自己BaseFilter到上存儲的所有元組流的結束標記,但似乎必須有一個更好的辦法。此外,這並不能解決以受控方式運行拓撲的問題。這是Trident支持的東西嗎?

回答

相關問題