2016-11-23 203 views
2

風暴拓撲從卡夫卡讀取數據卡桑德拉羣連接和寫入卡桑德拉表如何通過從一個螺栓到另一個螺栓

在風暴我建立卡桑德拉羣連接和會話的準備方法。

cassandraCluster = Cluster.builder().withoutJMXReporting().withoutMetrics() 
      .addContactPoints(nodes) 
      .withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE) 
      .withReconnectionPolicy(new ExponentialReconnectionPolicy(100L, 
        TimeUnit.MINUTES.toMillis(5))) 
      .withLoadBalancingPolicy(
        new TokenAwarePolicy(new RoundRobinPolicy())) 
      .build(); 

session = cassandraCluster.connect(keyspace); 

在執行方法我可以處理的元組,並將其保存在卡桑德拉表

假設,如果我想從一個元組數據寫入到多個表 編寫單獨的螺栓爲每個表將是不錯的選擇。但我必須創建羣集連接和會話每個螺栓中的每個表。

但在每簇此鏈接單一連接將是性能不錯的主意 http://www.datastax.com/dev/blog/4-simple-rules-when-using-the-datastax-drivers-for-cassandra

沒有任何的你有創造一個螺栓羣連接並使用在其他螺栓任何這方面的想法?

+0

我不知道足夠多關於Apache風暴在這裏發表評論,但快速瀏覽的文檔似乎表明您攜帶在「螺栓」中進行謹慎的操作。您可能需要在Cassandra和驅動程序用戶的問題中對此進行更多解釋。這是正確的,你需要理想地保持一個會話打開。如果您可以在每個botl之間共享對象,那麼您是否可以不讓會話對象成爲跨每個「螺栓」共享的公共對象? – markc

+0

@markc因爲螺栓分佈在物理上分離的機器上,所以不可能跨螺栓共享對象。這裏最好的做法是每個螺栓保持一個集羣/會話,但這似乎與鏈接中描述的最佳實踐相矛盾。我對卡桑德拉不太瞭解,如果沒有問題的話。 –

+1

@RyanWalker好的謝謝你的清理。說得通。那麼爲每個螺栓創建一個會話可能是有意義的。 cassandra集羣可以連接多個客戶端,但建議保持一個會話打開的原因僅僅是爲了避免建立和拆除連接。只要螺栓本身是永久的,那麼會話對象可以是我想說的那個孩子。建立連接後,驅動程序將爲集羣提供連接池。請參閱:https://github.com/datastax/java-driver/tree/3.x/manual/pooling – markc

回答

0

這取決於風暴如何將螺栓和噴口分配給工人。您不能假定您可以共享螺栓之間的連接,因爲它們可能在不同的工作人員(讀取:JVM)中運行,或完全在不同的節點上運行。

見我的答案在這裏:Mongo connection pooling for Storm topology

可能看起來是這樣的僞代碼:

public class CassandraBolt extends BaseRichBolt { 
    private static final long serialVersionUID = 1L; 
    private static Logger LOG = LoggerFactory.getLogger(CassandraBolt.class); 
    OutputCollector _collector; 

    // whatever your cassandra session is 
    // has to be transient because session is not serializable 
    protected transient CassandraSession _session; 

    @SuppressWarnings("rawtypes") 
    @Override 
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { 
     _collector = collector; 

     // maybe get properties from stormConf instead of hard coding them 
     cassandraCluster = Cluster.builder().withoutJMXReporting().withoutMetrics() 
      .addContactPoints(nodes) 
      .withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE) 
      .withReconnectionPolicy(new ExponentialReconnectionPolicy(100L, 
        TimeUnit.MINUTES.toMillis(5))) 
      .withLoadBalancingPolicy(
        new TokenAwarePolicy(new RoundRobinPolicy())) 
      .build(); 

     _session = cassandraCluster.connect(keyspace); 
    } 

    @Override 
    public void execute(Tuple input) { 
     try { 
      // use _session to talk to cassandra 

     } catch (Exception e) { 
      LOG.error("CassandraBolt error", e); 
      _collector.reportError(e); 
     } 
    } 


    @Override 
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     // TODO Auto-generated method stub 
    } 
} 
+0

每個螺栓在不同的機器上運行在不同的JVM上。但它具有將數據從一個螺栓發送到另一個螺栓的機制。 我搜索的風暴有任何拓撲級別的方法,我可以直接從任何螺栓訪問 我們在cassandra和風暴中的選項是每個螺栓的集羣/會話連接。 –

+0

是的每個螺栓可以在不同的JVM中。沒有拓撲級別的方法可以共享。您應該將連接屬性傳遞給螺栓,並在螺栓的「準備」方法內創建連接。 –