2014-02-08 34 views
2

我想插入一個簡單的行到卡桑德拉2.0.5,風暴版本0.9.0.1中的表。插入行使用風暴三叉戟cassandra

我的測試如下:

我有由一個ID(int)和句子(文本)科拉姆的表。 id是主鍵。

我的噴口生成句子,並添加一個ID(代碼中的靜態增量)。

這是我的拓撲結構:

TridentTopology topology = new TridentTopology(); 
StateFactory cassandraStateFactory = CassandraMapState.nonTransactional(options); 
Fields fields = new Fields("id", "sentence"); 
MyTridentTupleMapper tupleMapper = new MyTridentTupleMapper(keyspace, fields); 
CassandraUpdater updater = new CassandraUpdater(tupleMapper); 
TridentState wordCounts = topology.newStream("spout1", spout) 
      .each(new Fields("sentence"), new AddId(), new Fields("id")) 
      .partitionPersist(cassandraStateFactory, fields, updater); 

LocalCluster cluster = new LocalCluster(); 
cluster.submitTopology("test", config, topology.build());  

爲MyTridentTupleMapper代碼:

https://github.com/guywald/trident-cassandra-read-write-examples/blob/master/src/test/java/com/guywald/storm/trident/cassandra/MyTridentTupleMapper.java

我得到以下異常:

2014-02-08 22:20:14 ERROR executor:0 - 
java.lang.RuntimeException: java.lang.ClassCastException: storm.trident.state.map.SnapshottableMap cannot be cast to com.hmsonline.storm.cassandra.trident.CassandraState 
    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:90) 
    at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61) 
    at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62) 
    at backtype.storm.daemon.executor$fn__3498$fn__3510$fn__3557.invoke(executor.clj:730) 
    at backtype.storm.util$async_loop$fn__444.invoke(util.clj:403) 
    at clojure.lang.AFn.run(AFn.java:24) 
    at java.lang.Thread.run(Thread.java:744) 

我不知道爲什麼它返回這一點,並會感謝幫助。

回答

0

它看起來像CassandraUpdater預計CassandraState,而CassandraMapState.nonTransactional創建不兼容的SnapshottableMap。 我相信常見(或異國情調的自定義)MapState更新將與CassandraMapState一起使用。這裏有一個關於何時使用State vs MapState的很好的解釋:https://groups.google.com/forum/#!topic/storm-user/TASr2zWyzKs

我認爲應該使用CassandraStateFactory作爲您的示例的狀態工廠。

相關問題