1
從Cassandra表中加載數據時,spark分區表示具有相同分區鍵的所有行。但是,當我使用相同的分區鍵在spark中創建數據,並使用.repartitionByCassandraReplica(..)方法重新分區新RDD時,它會在不同的spark分區中結束?如何使用由Spark-Cassandra連接器定義的分區方案來實現spark中的一致分區?基於cassandra表分區鍵將數據保存在spark中
鏈接下載我測試
- .CQL with the keyspace and table schema定製列表和火花的工作代碼。
- Spark job等類。
版和其它信息
- 火花:1.3
- 卡桑德拉:2.1
- 連接器:1.3.1
- 火花節點(5)和卡斯*羣集節點(4)運行在不同的數據中心
代碼提取。下載使用上述鏈接代碼更多細節
步驟1:數據裝載到8個火花分區
Map<String, String> map = new HashMap<String, String>();
CassandraTableScanJavaRDD<TestTable> tableRdd = javaFunctions(conf)
.cassandraTable("testkeyspace", "testtable", mapRowTo(TestTable.class, map));
步驟2:磁盤分割數據轉換成8個分區
.repartitionByCassandraReplica(
"testkeyspace",
"testtable",
partitionNumPerHost,
someColumns("id"),
mapToRow(TestTable.class, map));
步驟3:打印rdds的分區ID和值
rdd.mapPartitionsWithIndex(...{
@Override
public Iterator<String> call(..) throws Exception {
List<String> list = new ArrayList<String>();
list.add("PartitionId-" + integer);
while (itr.hasNext()) {
TestTable value = itr.next();
list.add(Integer.toString(value.getId()));
}
return list.iterator();
}
}, true).collect();
步驟4:結果的快照印刷在分區1兩個不同RDDS但期望成爲相同
負載個R dd值
----------------------------
Table load - PartitionId -1
----------------------------
15
22
--------------------------------------
Repartitioned values - PartitionId -1
--------------------------------------
33
16
感謝。此功能何時到達的任何日期?這種解決方法會限制我們在火花中只有很少的分區,這並不理想。 – Saravanan
我還有一個相關的問題。傳遞給.cassandraTable(..)的readConf對象佔用了所有spark分區的平坦數量,而.repartitionByCassandraReplica(..)爲每個Cassandra節點分配一個分區。建議在兩個API中保留相同數量的總分區。 – Saravanan