2017-02-28 85 views
0

我有一個cassandraSQLContext,我這樣做:星火查詢追加密鑰空間火花臨時表

cassandraSqlContext.setKeyspace("test"); 

因爲如果我不這樣做,它抱怨我設置默認密鑰空間。

現在,當我運行這段代碼:

 def insertIntoCassandra(siteMetaData: MetaData, dataFrame: DataFrame): Unit ={ 
     System.out.println(dataFrame.show()) 
     val tableName = siteMetaData.getTableName.toLowerCase() 
    dataFrame.registerTempTable("spark_"+ tableName) 
    System.out.println("Registered the spark table to spark_" + tableName) 

    val columns = columnMap.get(siteMetaData.getTableName) 
     val query = cassandraQueryBuilder.buildInsertQuery("test", tableName, columns) 
     System.out.println("Query: " + query); 
    cassandraSqlContext.sql(query) 
     System.out.println("Query executed") 
    } 

它給了我這個錯誤日誌:

Registered the spark table to spark_test 
Query: INSERT INTO TABLE test.tablename SELECT **the columns here** FROM spark_tablename 
17/02/28 04:15:53 ERROR JobScheduler: Error running job streaming job 1488255351000 ms.0 
java.util.concurrent.ExecutionException: java.io.IOException: Couldn't find test.tablename or any similarly named keyspace and table pairs 

我不明白的是爲什麼心不是cassandraSQLContext執行打印出來的查詢,爲什麼它會將密鑰空間追加到可觸發的火花中。

public String buildInsertQuery(String activeReplicaKeySpace, String tableName, String columns){ 
    String sql = "INSERT INTO TABLE " + activeReplicaKeySpace + "." + tableName + 
     " SELECT " + columns + " FROM spark_" + tableName; 
    return sql; 
    } 

回答

0

所以,問題是我使用的方法之一,我被實例化這是與cassandraSQLContext傳遞給insertIntoCassandra法衝突的新cassandraSQLContext cassandraSQLContext.In的兩個不同的實例。