2015-11-24 48 views
1

從Cassandra表中加載數據時,spark分區表示具有相同分區鍵的所有行。但是,當我使用相同的分區鍵在spark中創建數據,並使用.repartitionByCassandraReplica(..)方法重新分區新RDD時,它會在不同的spark分區中結束?如何使用由Spark-Cassandra連接器定義的分區方案來實現spark中的一致分區?基於cassandra表分區鍵將數據保存在spark中

鏈接下載我測試

版和其它信息

  • 火花:1.3
  • 卡桑德拉:2.1
  • 連接器:1.3.1
  • 火花節點(5)和卡斯*羣集節點(4)運行在不同的數據中心

代碼提取。下載使用上述鏈接代碼更多細節

步驟1:數據裝載到8個火花分區

Map<String, String> map = new HashMap<String, String>(); 
CassandraTableScanJavaRDD<TestTable> tableRdd = javaFunctions(conf) 
.cassandraTable("testkeyspace", "testtable", mapRowTo(TestTable.class, map)); 

步驟2:磁盤分割數據轉換成8個分區

.repartitionByCassandraReplica(
     "testkeyspace", 
     "testtable", 
     partitionNumPerHost, 
     someColumns("id"), 
     mapToRow(TestTable.class, map)); 

步驟3:打印rdds的分區ID和值

rdd.mapPartitionsWithIndex(...{ 
@Override 
public Iterator<String> call(..) throws Exception { 
List<String> list = new ArrayList<String>(); 
list.add("PartitionId-" + integer); 

while (itr.hasNext()) { 
    TestTable value = itr.next(); 
    list.add(Integer.toString(value.getId())); 
} 
return list.iterator(); 
} 
}, true).collect(); 

步驟4:結果的快照印刷在分區1兩個不同RDDS但期望成爲相同

負載個R dd值

---------------------------- 
Table load - PartitionId -1 
---------------------------- 
15 
22 

-------------------------------------- 
Repartitioned values - PartitionId -1 
-------------------------------------- 
33 
16 

回答

1

磁盤分割由卡桑德拉複製品不確定性放置鑰匙。目前有一張票可以改變這種情況。

https://datastax-oss.atlassian.net/projects/SPARKC/issues/SPARKC-278

現在一種解決方法是在Partitionspernode參數設置爲1

+0

感謝。此功能何時到達的任何日期?這種解決方法會限制我們在火花中只有很少的分區,這並不理想。 – Saravanan

+0

我還有一個相關的問題。傳遞給.cassandraTable(..)的readConf對象佔用了所有spark分區的平坦數量,而.repartitionByCassandraReplica(..)爲每個Cassandra節點分配一個分區。建議在兩個API中保留相同數量的總分區。 – Saravanan

相關問題