0
我有一個cassandraSQLContext,我這樣做:星火查詢追加密鑰空間火花臨時表
cassandraSqlContext.setKeyspace("test");
因爲如果我不這樣做,它抱怨我設置默認密鑰空間。
現在,當我運行這段代碼:
def insertIntoCassandra(siteMetaData: MetaData, dataFrame: DataFrame): Unit ={
System.out.println(dataFrame.show())
val tableName = siteMetaData.getTableName.toLowerCase()
dataFrame.registerTempTable("spark_"+ tableName)
System.out.println("Registered the spark table to spark_" + tableName)
val columns = columnMap.get(siteMetaData.getTableName)
val query = cassandraQueryBuilder.buildInsertQuery("test", tableName, columns)
System.out.println("Query: " + query);
cassandraSqlContext.sql(query)
System.out.println("Query executed")
}
它給了我這個錯誤日誌:
Registered the spark table to spark_test
Query: INSERT INTO TABLE test.tablename SELECT **the columns here** FROM spark_tablename
17/02/28 04:15:53 ERROR JobScheduler: Error running job streaming job 1488255351000 ms.0
java.util.concurrent.ExecutionException: java.io.IOException: Couldn't find test.tablename or any similarly named keyspace and table pairs
我不明白的是爲什麼心不是cassandraSQLContext執行打印出來的查詢,爲什麼它會將密鑰空間追加到可觸發的火花中。
public String buildInsertQuery(String activeReplicaKeySpace, String tableName, String columns){
String sql = "INSERT INTO TABLE " + activeReplicaKeySpace + "." + tableName +
" SELECT " + columns + " FROM spark_" + tableName;
return sql;
}