2016-07-12 34 views
0

我正在使用pysparkpyspark-cassandra更新卡桑德拉行時的奇怪行爲

我已經注意到使用COPYsstableloader,現在pysparksaveToCassandra上卡桑德拉(3.0.x3.6.x)的多個版本的這種行爲。

我有以下模式

CREATE TABLE test (
    id int, 
    time timestamp, 
    a int, 
    b int, 
    c int, 
    PRIMARY KEY ((id), time) 
) WITH CLUSTERING ORDER BY (time DESC); 

,後面的數據

(1, datetime.datetime(2015, 3, 1, 0, 18, 18, tzinfo=<UTC>), 1, 0, 0) 
(1, datetime.datetime(2015, 3, 1, 0, 19, 12, tzinfo=<UTC>), 0, 1, 0) 
(1, datetime.datetime(2015, 3, 1, 0, 22, 59, tzinfo=<UTC>), 1, 0, 0) 
(1, datetime.datetime(2015, 3, 1, 0, 23, 52, tzinfo=<UTC>), 0, 1, 0) 
(1, datetime.datetime(2015, 3, 1, 0, 32, 2, tzinfo=<UTC>), 1, 1, 0) 
(1, datetime.datetime(2015, 3, 1, 0, 32, 8, tzinfo=<UTC>), 0, 2, 0) 
(1, datetime.datetime(2015, 3, 1, 0, 43, 30, tzinfo=<UTC>), 1, 1, 0) 
(1, datetime.datetime(2015, 3, 1, 0, 44, 12, tzinfo=<UTC>), 0, 2, 0) 
(1, datetime.datetime(2015, 3, 1, 0, 48, 49, tzinfo=<UTC>), 1, 1, 0) 
(1, datetime.datetime(2015, 3, 1, 0, 49, 7, tzinfo=<UTC>), 0, 2, 0) 
(1, datetime.datetime(2015, 3, 1, 0, 50, 5, tzinfo=<UTC>), 1, 1, 0) 
(1, datetime.datetime(2015, 3, 1, 0, 50, 53, tzinfo=<UTC>), 0, 2, 0) 
(1, datetime.datetime(2015, 3, 1, 0, 51, 53, tzinfo=<UTC>), 1, 1, 0) 
(1, datetime.datetime(2015, 3, 1, 0, 51, 59, tzinfo=<UTC>), 0, 2, 0) 
(1, datetime.datetime(2015, 3, 1, 0, 54, 35, tzinfo=<UTC>), 1, 1, 0) 
(1, datetime.datetime(2015, 3, 1, 0, 55, 28, tzinfo=<UTC>), 0, 2, 0) 
(1, datetime.datetime(2015, 3, 1, 0, 55, 55, tzinfo=<UTC>), 1, 2, 0) 
(1, datetime.datetime(2015, 3, 1, 0, 56, 24, tzinfo=<UTC>), 0, 3, 0) 
(1, datetime.datetime(2015, 3, 1, 1, 11, 14, tzinfo=<UTC>), 1, 2, 0) 
(1, datetime.datetime(2015, 3, 1, 1, 11, 17, tzinfo=<UTC>), 2, 1, 0) 
(1, datetime.datetime(2015, 3, 1, 1, 12, 8, tzinfo=<UTC>), 1, 2, 0) 
(1, datetime.datetime(2015, 3, 1, 1, 12, 10, tzinfo=<UTC>), 0, 3, 0) 
(1, datetime.datetime(2015, 3, 1, 1, 17, 43, tzinfo=<UTC>), 1, 2, 0) 
(1, datetime.datetime(2015, 3, 1, 1, 17, 49, tzinfo=<UTC>), 0, 3, 0) 
(1, datetime.datetime(2015, 3, 1, 1, 24, 12, tzinfo=<UTC>), 1, 2, 0) 
(1, datetime.datetime(2015, 3, 1, 1, 24, 18, tzinfo=<UTC>), 2, 1, 0) 
(1, datetime.datetime(2015, 3, 1, 1, 24, 18, tzinfo=<UTC>), 1, 2, 0) 
(1, datetime.datetime(2015, 3, 1, 1, 24, 24, tzinfo=<UTC>), 2, 1, 0) 

在接近數據的結尾,存在具有相同的時間戳的兩行。

(1, datetime.datetime(2015, 3, 1, 1, 24, 18, tzinfo=<UTC>), 2, 1, 0) 
(1, datetime.datetime(2015, 3, 1, 1, 24, 18, tzinfo=<UTC>), 1, 2, 0) 

這是我的理解,當我保存到卡桑德拉,其中一個將「贏」 - 將只有一行。

使用

rdd.saveToCassandra(keyspace, table, ['id', 'time', 'a', 'b', 'c']) 

寫卡桑德拉後也不行似乎已經贏了。相反,這些行似乎已經「合併」了。

1 | 2015-03-01 01:17:43+0000 |  1 |  2 |  0 
    1 | 2015-03-01 01:17:49+0000 |  0 |  3 |  0 
    1 | 2015-03-01 01:24:12+0000 |  1 |  2 |  0 
    1 | 2015-03-01 01:24:18+0000 |  2 |  2 |  0 
    1 | 2015-03-01 01:24:24+0000 |  2 |  1 |  0 

而非2015-03-01 01:24:18+0000含有(1, 2, 0)(2, 1, 0),它包含(2, 2, 0)

這裏發生了什麼?我不能爲我的生活找出這種行爲正在導致。

回答

1

這是一個鮮爲人知的效果,它來自數據的一起批處理。批處理寫入將同一時間戳分配給批處理中的所有插入。接下來,如果使用確切完成兩個寫入相同的時間戳,那麼有一個特殊的合併規則,因爲沒有「最後」寫入。 Spark Cassandra Connector默認使用內部分區批處理,所以如果您有這種類型的值破壞,很可能會發生這種情況。

兩個行爲相同寫入時間戳是基於較大值的合併。

給定表(鍵,A,B)

Batch 
Insert "foo", 2, 1 
Insert "foo", 1, 2 
End batch 

將批料給出了兩種突變相同的時間戳。卡桑德拉不能選擇一個「最後書寫」,因爲他們都在同一時間發生,而只是選擇了兩者的更大的價值。合併的結果將是

"foo", 2, 2 
+0

謝謝,我懷疑像這樣的事情正在發生,但無法找到任何有關它的信息。 實際上,人們如何處理這個問題?只要確保你有毫秒精確的歷史數據? – dacox

+0

根據你的目標,你可以做幾件事情。 大部分時間用戶只是試圖保證在同一批次中不會有重複的主鍵。如果你想在C *中維護這兩個記錄,你需要區分它們的一些方式(如毫秒或TimeUUID) 某些用戶使用不同的時間戳來強制執行排序(即如果每個插入都有升序,那麼它們將被應用按順序)。由於批量是自動構建的,因此您並沒有真正擁有此選項。這也失去了重複。 – RussS