2017-02-23 225 views
0

連續讀取數據,我已經經歷了Reading from Cassandra using Spark Streaming並通過tutorial-1tutorial-2鏈接不見了。星火:從卡桑德拉

它是公平地說,卡桑德拉 - 星火整合目前不提供任何開箱從卡桑德拉不斷得到更新,並將其流式傳輸到其他系統,如HDFS?

通過不斷的,我的意思是,因爲過去的已經改變(插入或更新)通過星火獲取表只得到那些行。如果有太多這樣的行,應該有一個選項來限制行數,並且隨後的火花獲取應該從中斷的地方開始。至少一次保證是好的,但確切的說 - 一次將是一個巨大的歡迎。

如果不支持,支持它的一種方法可能是在每個cassandra表中有一個輔助列updated_time,需要通過風暴查詢,然後使用該列進行查詢。或者每個包含ID的表的輔助表,被更改的行的時間戳。有沒有人試過這個?

回答

0

我不認爲Apache Cassandra具有開箱即用的功能。在內部[某段時間],它以順序方式存儲數據上的所有操作,但是它是按節點進行的,並且最終會被壓縮(以節省空間)。坦率地說,卡桑德拉(和其他大多數數據庫一樣)的承諾是提供最新的數據視圖(在分佈式環境中本身可能非常棘手),但並不是數據如何變化的完整歷史。所以如果你仍然想在Cassandra中得到這樣的信息(並在Spark中處理它),你必須自己做一些額外的工作:設計專用表(或添加合成列),照顧分區,保存偏移跟蹤進度等

Cassandra是OK的時間序列數據,但在你的情況下,我只想用流媒體解決方案(如卡夫卡),而不是發明了它考慮。

0

我同意Ralkie所說的,但是如果你用這個用例綁定到C *,我想提出一個更多的解決方案。該解決方案假定您完全控制架構和攝取。這不是一個流媒體解決方案,雖然它可能會被尷尬地拼湊成一個。

您是否考慮過使用由timebucket和murmur_hash_of_one_or_more_clustering_columnssome_int_designed_limit_row_width組成的組合鍵?這樣,您可以將您的時間安排設置爲1分鐘,5分鐘,1小時等,具體取決於您需要分析/存檔數據的「實時」方式。需要基於一個或多個聚類列的雜音散列來幫助定位C *簇中的數據(如果您經常查找特定的聚類列,則這是一個可怕的解決方案)。

例如,採取的IoT使用情況下的傳感器報告在每分鐘,並有一些傳感器讀數可以被表示爲整數。

create table if not exists iottable { 
    timebucket bigint, 
    sensorbucket int, 
    sensorid varchar, 
    sensorvalue int, 
    primary key ((timebucket, sensorbucket), sensorid) 
} with caching = 'none' 
    and compaction = { 'class': 'com.jeffjirsa.cassandra.db.compaction.TimeWindowedCompaction' }; 

請注意使用TimeWindowedCompaction。我不確定你使用的C *版本是什麼;但對於2.x系列,我會遠離DateTieredCompaction。我不能說它在3.x中表現如何。無論如何,您應該在確定架構和壓縮策略之前進行廣泛的測試和基準測試。

另外請注意,該模式可能導致hotspotting,因爲它是容易受到比其他人報告往往傳感器。再次,不知道用例,很難提供完美的解決方案 - 這僅僅是一個例子。如果您不關心爲特定傳感器(或列)讀取C *,則根本不必使用聚類列,並且您可以簡單地使用timeUUID或隨機用於雜音哈希分桶。

無論您如何決定對數據進行分區,這樣的模式都允許您使用repartitionByCassandraReplicajoinWithCassandraTable來提取在給定時間段內寫入的數據。