2017-03-26 137 views
0

我試圖在卡桑德拉的一小部分數據上運行一個Spark任務。 我手裏有一個RDD的鍵(分區和集羣列),我想只在這些鍵上運行我的作業。Spark Cassandra連接器加入集羣密鑰

type CassandraKey = (String, String, String, String) 
val columns = SomeColumns(ColumnName("pkey1"),ColumnName("pkey2"),ColumnName("pkey3"),ColumnName("ckey1")) 
val repartitionedKeys: CassandraPartitionedRDD[CassandraKey] = keys.repartitionByCassandraReplica("keyspace", "table", partitionKeyMapper = columns) 
val selectedRows: CassandraJoinRDD[CassandraKey, CassandraRow] = 
    repartitionedKeys.joinWithCassandraTable[CassandraRow](keyspace, table).on(joinColumns = columns) 
selectedRows.collect() 

我比BoundStatementBuilder收到以下錯誤:19

java.lang.IllegalArgumentException: ckey1 is not a column defined in this metadata 

我的表架構是如下:

CREATE TABLE "keyspace".table (
pkey1 text, 
pkey2 text, 
pkey3 text, 
ckey1 text, 
ckey2 text, 
ckey3 timestamp, 
data text, 
PRIMARY KEY ((pkey1, pkey2, pkey3), ckey1, ckey2, ckey3) 
) 

看着代碼,我可以看到,在BoundStatementBuilder columnTypes正在從在ReplicaLocator.keyByReplicas啓動的虛擬查詢中解析。此查詢用於從表中檢索分區標記,以及僅在分區鍵上構建的where子句。

此外,我可以看到,在RDDFunction.repartitionByCassandraReplica:183給定partitionKeyMapper被忽略,但是那似乎沒有引起任何問題。

我使用的連接器1.5.1版

回答

1

的「重新分割」的部分只能是在分區鍵,以便不存在指定的列或者如果你只選擇分區鍵列。只能通過撥打joinWithCassandraTable來指定所有連接列。

+0

隱藏的錯誤消息,只發生在RDD操作....可能會引發方法調用。 –

+0

Spark是懶惰的,直到運行時纔會知道模式,即當調用操作時 – RussS

+0

,但可以在方法調用中執行所使用的虛擬查詢。 –

相關問題